diff --git a/input_file.go b/input_file.go index 3fa6d116..8f45c993 100644 --- a/input_file.go +++ b/input_file.go @@ -6,8 +6,10 @@ import ( "compress/gzip" "container/heap" "errors" + "expvar" "fmt" "io" + "math" "os" "path/filepath" "strconv" @@ -51,9 +53,6 @@ func (h *payloadQueue) Pop() interface{} { } func (h payloadQueue) Idx(i int) *filePayload { - h.RLock() - defer h.RUnlock() - return h.s[i] } @@ -196,10 +195,13 @@ type FileInput struct { speedFactor float64 loop bool readDepth int + dryRun bool + + stats *expvar.Map } // NewFileInput constructor for FileInput. Accepts file path as argument. -func NewFileInput(path string, loop bool, readDepth int) (i *FileInput) { +func NewFileInput(path string, loop bool, readDepth int, dryRun bool) (i *FileInput) { i = new(FileInput) i.data = make(chan []byte, 1000) i.exit = make(chan bool) @@ -207,6 +209,8 @@ func NewFileInput(path string, loop bool, readDepth int) (i *FileInput) { i.speedFactor = 1 i.loop = loop i.readDepth = readDepth + i.stats = expvar.NewMap("file-" + path) + i.dryRun = dryRun if err := i.init(); err != nil { return @@ -246,7 +250,6 @@ func (i *FileInput) init() (err error) { } else if matches, err = filepath.Glob(i.path); err != nil { Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err) return - } if len(matches) == 0 { @@ -260,6 +263,8 @@ func (i *FileInput) init() (err error) { i.readers[idx] = newFileInputReader(p, i.readDepth) } + i.stats.Add("reader_count", int64(len(matches))) + return nil } @@ -270,6 +275,7 @@ func (i *FileInput) PluginRead() (*Message, error) { case <-i.exit: return nil, ErrorStopped case buf := <-i.data: + i.stats.Add("read_from", 1) msg.Meta, msg.Data = payloadMetaWithBody(buf) return &msg, nil } @@ -292,7 +298,7 @@ func (i *FileInput) nextReader() (next *fileInputReader) { continue } - if next == nil || r.queue.Idx(0).timestamp > next.queue.Idx(0).timestamp { + if next == nil || r.queue.Idx(0).timestamp < next.queue.Idx(0).timestamp { next = r continue } @@ -304,6 +310,11 @@ func (i *FileInput) nextReader() (next *fileInputReader) { func (i *FileInput) emit() { var lastTime int64 = -1 + var maxWait, firstWait, minWait int64 + minWait = math.MaxInt64 + + i.stats.Add("negative_wait", 0) + for { select { case <-i.exit: @@ -325,18 +336,39 @@ func (i *FileInput) emit() { reader.queue.RLock() payload := heap.Pop(&reader.queue).(*filePayload) + i.stats.Add("total_counter", 1) + i.stats.Add("total_bytes", int64(len(payload.data))) reader.queue.RUnlock() if lastTime != -1 { diff := payload.timestamp - lastTime + if firstWait == 0 { + firstWait = diff + } + if i.speedFactor != 1 { diff = int64(float64(diff) / i.speedFactor) } if diff >= 0 { lastTime = payload.timestamp - time.Sleep(time.Duration(diff)) + + if !i.dryRun { + time.Sleep(time.Duration(diff)) + } + + i.stats.Add("total_wait", diff) + + if diff > maxWait { + maxWait = diff + } + + if diff < minWait { + minWait = diff + } + } else { + i.stats.Add("negative_wait", 1) } } else { lastTime = payload.timestamp @@ -347,12 +379,30 @@ func (i *FileInput) emit() { case <-i.exit: return default: - i.data <- payload.data + if !i.dryRun { + i.data <- payload.data + } } } + i.stats.Set("first_wait", time.Duration(firstWait)) + i.stats.Set("max_wait", time.Duration(maxWait)) + i.stats.Set("min_wait", time.Duration(minWait)) + Debug(0, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path)) + if i.dryRun { + fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n", + i.stats.Get("total_counter"), + i.stats.Get("reader_count"), + i.stats.Get("total_bytes"), + i.stats.Get("max_wait"), + i.stats.Get("min_wait"), + i.stats.Get("first_wait"), + time.Duration(i.stats.Get("total_wait").(*expvar.Int).Value()), + i.stats.Get("negative_wait"), + ) + } } // Close closes this plugin diff --git a/input_file_test.go b/input_file_test.go index c6dcd8d4..7fef0f82 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -104,7 +104,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { file2.Write([]byte(payloadSeparator)) file2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100) + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false) for i := '1'; i <= '4'; i++ { msg, _ := input.PluginRead() @@ -130,7 +130,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) { file.Write([]byte("1 3 250000000\nrequest3")) file.Write([]byte(payloadSeparator)) - input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100) + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100, false) start := time.Now().UnixNano() for i := 0; i < 3; i++ { @@ -170,7 +170,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { file2.Write([]byte(payloadSeparator)) file2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100) + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false) for i := '1'; i <= '4'; i++ { msg, _ := input.PluginRead() @@ -198,7 +198,7 @@ func TestInputFileLoop(t *testing.T) { file.Write([]byte(payloadSeparator)) file.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100) + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100, false) // Even if we have just 2 requests in file, it should indifinitly loop for i := 0; i < 1000; i++ { @@ -226,7 +226,7 @@ func TestInputFileCompressed(t *testing.T) { name2 := output2.file.Name() output2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100) + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false) for i := 0; i < 2000; i++ { input.PluginRead() } @@ -326,7 +326,7 @@ func CreateCaptureFile(requestGenerator *RequestGenerator) *CaptureFile { func ReadFromCaptureFile(captureFile *os.File, count int, callback writeCallback) (err error) { wg := new(sync.WaitGroup) - input := NewFileInput(captureFile.Name(), false, 100) + input := NewFileInput(captureFile.Name(), false, 100, false) output := NewTestOutput(func(msg *Message) { callback(msg) wg.Done() diff --git a/input_raw_test.go b/input_raw_test.go index d844f6a6..969e5a7d 100644 --- a/input_raw_test.go +++ b/input_raw_test.go @@ -233,10 +233,11 @@ func TestInputRAWChunkedEncoding(t *testing.T) { originAddr := strings.Replace(origin.Listener.Addr().String(), "[::]", "127.0.0.1", -1) conf := RAWInputConfig{ - Engine: capture.EnginePcap, - Expire: time.Second, - Protocol: ProtocolHTTP, - TrackResponse: true, + Engine: capture.EnginePcap, + Expire: time.Second, + Protocol: ProtocolHTTP, + TrackResponse: true, + AllowIncomplete: true, } input := NewRAWInput(originAddr, conf) diff --git a/output_file_test.go b/output_file_test.go index 53794fbc..4190df87 100644 --- a/output_file_test.go +++ b/output_file_test.go @@ -39,7 +39,7 @@ func TestFileOutput(t *testing.T) { emitter.Close() var counter int64 - input2 := NewFileInput("/tmp/test_requests.gor", false, 100) + input2 := NewFileInput("/tmp/test_requests.gor", false, 100, false) output2 := NewTestOutput(func(*Message) { atomic.AddInt64(&counter, 1) wg.Done() diff --git a/output_http.go b/output_http.go index 43c8240e..cb0c0deb 100644 --- a/output_http.go +++ b/output_http.go @@ -215,6 +215,7 @@ func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) { if !isRequestPayload(msg.Meta) { return } + uuid := payloadID(msg.Meta) start := time.Now() resp, err := client.Send(msg.Data) diff --git a/plugins.go b/plugins.go index 1f1363c9..54d4e7c1 100644 --- a/plugins.go +++ b/plugins.go @@ -118,7 +118,7 @@ func NewPlugins() *InOutPlugins { } for _, options := range Settings.InputFile { - plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth) + plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileDryRun) } for _, path := range Settings.OutputFile { diff --git a/s3_test.go b/s3_test.go index f1f85ff6..65453d35 100644 --- a/s3_test.go +++ b/s3_test.go @@ -127,7 +127,7 @@ func TestInputFileFromS3(t *testing.T) { <-output.closeCh } - input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd, 100), false) + input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd), false, 100, false) buf := make([]byte, 1000) for i := 0; i <= 19999; i++ { diff --git a/settings.go b/settings.go index 2ecdfc73..c64c7cb9 100644 --- a/settings.go +++ b/settings.go @@ -48,6 +48,7 @@ type AppSettings struct { InputFile MultiOption `json:"input-file"` InputFileLoop bool `json:"input-file-loop"` InputFileReadDepth int `json:"input-file-read-depth"` + InputFileDryRun bool `json:"input-file-dry-run"` OutputFile MultiOption `json:"output-file"` OutputFileConfig FileOutputConfig @@ -115,6 +116,7 @@ func init() { flag.Var(&Settings.InputFile, "input-file", "Read requests from file: \n\tgor --input-file ./requests.gor --output-http staging.com") flag.BoolVar(&Settings.InputFileLoop, "input-file-loop", false, "Loop input files, useful for performance testing.") flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance") + flag.BoolVar(&Settings.InputFileDryRun, "input-file-dry-run", false, "Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.") flag.Var(&Settings.OutputFile, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor") flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")