加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
input_file.go 4.96 KB
一键复制 编辑 原始数据 按行查看 历史
Erik Schweller 提交于 2021-06-08 15:43 . Go report corrections (#939)
package main
import (
"bufio"
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
type fileInputReader struct {
reader *bufio.Reader
data []byte
file io.ReadCloser
timestamp int64
closed int32 // Value of 0 indicates that the file is still open.
s3 bool
}
func (f *fileInputReader) parseNext() error {
payloadSeparatorAsBytes := []byte(payloadSeparator)
var buffer bytes.Buffer
for {
line, err := f.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
Debug(1, err)
} else {
f.Close()
}
return err
}
if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
asBytes := buffer.Bytes()
meta := payloadMeta(asBytes)
f.timestamp, _ = strconv.ParseInt(string(meta[2]), 10, 64)
f.data = asBytes[:len(asBytes)-1]
return nil
}
buffer.Write(line)
}
}
func (f *fileInputReader) ReadPayload() []byte {
defer f.parseNext()
return f.data
}
// Close closes this plugin
func (f *fileInputReader) Close() error {
if atomic.LoadInt32(&f.closed) == 0 {
atomic.StoreInt32(&f.closed, 1)
f.file.Close()
}
return nil
}
func newFileInputReader(path string) *fileInputReader {
var file io.ReadCloser
var err error
if strings.HasPrefix(path, "s3://") {
file = NewS3ReadCloser(path)
} else {
file, err = os.Open(path)
}
if err != nil {
Debug(0, fmt.Sprintf("[INPUT-FILE] err: %q", err))
return nil
}
r := &fileInputReader{file: file, closed: 0}
if strings.HasSuffix(path, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
Debug(0, fmt.Sprintf("[INPUT-FILE] err: %q", err))
return nil
}
r.reader = bufio.NewReader(gzReader)
} else {
r.reader = bufio.NewReader(file)
}
r.parseNext()
return r
}
// FileInput can read requests generated by FileOutput
type FileInput struct {
mu sync.Mutex
data chan []byte
exit chan bool
path string
readers []*fileInputReader
speedFactor float64
loop bool
}
// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte, 1000)
i.exit = make(chan bool)
i.path = path
i.speedFactor = 1
i.loop = loop
if err := i.init(); err != nil {
return
}
go i.emit()
return
}
func (i *FileInput) init() (err error) {
defer i.mu.Unlock()
i.mu.Lock()
var matches []string
if strings.HasPrefix(i.path, "s3://") {
sess := session.Must(session.NewSession(awsConfig()))
svc := s3.New(sess)
bucket, key := parseS3Url(i.path)
params := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: aws.String(key),
}
resp, err := svc.ListObjects(params)
if err != nil {
Debug(0, "[INPUT-FILE] Error while retreiving list of files from S3", i.path, err)
return err
}
for _, c := range resp.Contents {
matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
}
} else if matches, err = filepath.Glob(i.path); err != nil {
Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err)
return
}
if len(matches) == 0 {
Debug(0, "[INPUT-FILE] No files match pattern: ", i.path)
return errors.New("No matching files")
}
i.readers = make([]*fileInputReader, len(matches))
for idx, p := range matches {
i.readers[idx] = newFileInputReader(p)
}
return nil
}
// PluginRead reads message from this plugin
func (i *FileInput) PluginRead() (*Message, error) {
var msg Message
select {
case <-i.exit:
return nil, ErrorStopped
case buf := <-i.data:
msg.Meta, msg.Data = payloadMetaWithBody(buf)
return &msg, nil
}
}
func (i *FileInput) String() string {
return "File input: " + i.path
}
// Find reader with smallest timestamp e.g next payload in row
func (i *FileInput) nextReader() (next *fileInputReader) {
for _, r := range i.readers {
if r == nil || atomic.LoadInt32(&r.closed) != 0 {
continue
}
if next == nil || r.timestamp < next.timestamp {
next = r
continue
}
}
return
}
func (i *FileInput) emit() {
var lastTime int64 = -1
for {
select {
case <-i.exit:
return
default:
}
reader := i.nextReader()
if reader == nil {
if i.loop {
i.init()
lastTime = -1
continue
} else {
break
}
}
if lastTime != -1 {
diff := reader.timestamp - lastTime
lastTime = reader.timestamp
if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}
time.Sleep(time.Duration(diff))
} else {
lastTime = reader.timestamp
}
// Recheck if we have exited since last check.
select {
case <-i.exit:
return
default:
i.data <- reader.ReadPayload()
}
}
Debug(0, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
}
// Close closes this plugin
func (i *FileInput) Close() error {
defer i.mu.Unlock()
i.mu.Lock()
close(i.exit)
for _, r := range i.readers {
r.Close()
}
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化