代码拉取完成,页面将自动刷新
同步操作将从 Gitee 极速下载/goreplay 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package main
import (
"bufio"
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
type fileInputReader struct {
reader *bufio.Reader
data []byte
file io.ReadCloser
timestamp int64
closed int32 // Value of 0 indicates that the file is still open.
s3 bool
}
func (f *fileInputReader) parseNext() error {
payloadSeparatorAsBytes := []byte(payloadSeparator)
var buffer bytes.Buffer
for {
line, err := f.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
Debug(1, err)
} else {
f.Close()
}
return err
}
if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
asBytes := buffer.Bytes()
meta := payloadMeta(asBytes)
f.timestamp, _ = strconv.ParseInt(string(meta[2]), 10, 64)
f.data = asBytes[:len(asBytes)-1]
return nil
}
buffer.Write(line)
}
}
func (f *fileInputReader) ReadPayload() []byte {
defer f.parseNext()
return f.data
}
// Close closes this plugin
func (f *fileInputReader) Close() error {
if atomic.LoadInt32(&f.closed) == 0 {
atomic.StoreInt32(&f.closed, 1)
f.file.Close()
}
return nil
}
func newFileInputReader(path string) *fileInputReader {
var file io.ReadCloser
var err error
if strings.HasPrefix(path, "s3://") {
file = NewS3ReadCloser(path)
} else {
file, err = os.Open(path)
}
if err != nil {
Debug(0, fmt.Sprintf("[INPUT-FILE] err: %q", err))
return nil
}
r := &fileInputReader{file: file, closed: 0}
if strings.HasSuffix(path, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
Debug(0, fmt.Sprintf("[INPUT-FILE] err: %q", err))
return nil
}
r.reader = bufio.NewReader(gzReader)
} else {
r.reader = bufio.NewReader(file)
}
r.parseNext()
return r
}
// FileInput can read requests generated by FileOutput
type FileInput struct {
mu sync.Mutex
data chan []byte
exit chan bool
path string
readers []*fileInputReader
speedFactor float64
loop bool
}
// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte, 1000)
i.exit = make(chan bool)
i.path = path
i.speedFactor = 1
i.loop = loop
if err := i.init(); err != nil {
return
}
go i.emit()
return
}
func (i *FileInput) init() (err error) {
defer i.mu.Unlock()
i.mu.Lock()
var matches []string
if strings.HasPrefix(i.path, "s3://") {
sess := session.Must(session.NewSession(awsConfig()))
svc := s3.New(sess)
bucket, key := parseS3Url(i.path)
params := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: aws.String(key),
}
resp, err := svc.ListObjects(params)
if err != nil {
Debug(0, "[INPUT-FILE] Error while retreiving list of files from S3", i.path, err)
return err
}
for _, c := range resp.Contents {
matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
}
} else if matches, err = filepath.Glob(i.path); err != nil {
Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err)
return
}
if len(matches) == 0 {
Debug(0, "[INPUT-FILE] No files match pattern: ", i.path)
return errors.New("No matching files")
}
i.readers = make([]*fileInputReader, len(matches))
for idx, p := range matches {
i.readers[idx] = newFileInputReader(p)
}
return nil
}
// PluginRead reads message from this plugin
func (i *FileInput) PluginRead() (*Message, error) {
var msg Message
select {
case <-i.exit:
return nil, ErrorStopped
case buf := <-i.data:
msg.Meta, msg.Data = payloadMetaWithBody(buf)
return &msg, nil
}
}
func (i *FileInput) String() string {
return "File input: " + i.path
}
// Find reader with smallest timestamp e.g next payload in row
func (i *FileInput) nextReader() (next *fileInputReader) {
for _, r := range i.readers {
if r == nil || atomic.LoadInt32(&r.closed) != 0 {
continue
}
if next == nil || r.timestamp < next.timestamp {
next = r
continue
}
}
return
}
func (i *FileInput) emit() {
var lastTime int64 = -1
for {
select {
case <-i.exit:
return
default:
}
reader := i.nextReader()
if reader == nil {
if i.loop {
i.init()
lastTime = -1
continue
} else {
break
}
}
if lastTime != -1 {
diff := reader.timestamp - lastTime
lastTime = reader.timestamp
if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}
time.Sleep(time.Duration(diff))
} else {
lastTime = reader.timestamp
}
// Recheck if we have exited since last check.
select {
case <-i.exit:
return
default:
i.data <- reader.ReadPayload()
}
}
Debug(0, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
}
// Close closes this plugin
func (i *FileInput) Close() error {
defer i.mu.Unlock()
i.mu.Lock()
close(i.exit)
for _, r := range i.readers {
r.Close()
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。