From 165bf5f715f916c72334f358b1480f86177e6fa2 Mon Sep 17 00:00:00 2001 From: Illya Yalovyy Date: Fri, 22 Oct 2021 12:07:59 -0700 Subject: [PATCH] fix: show logs for workflows with more than 100 tasks --- .../cli/internal/pkg/aws/cwl/interface.go | 2 +- .../cli/internal/pkg/aws/cwl/stream_logs.go | 2 +- packages/cli/internal/pkg/cli/logs_core.go | 81 ++++++++++++++++++- .../cli/internal/pkg/cli/logs_core_test.go | 76 +++++++++++++++++ .../cli/internal/pkg/cli/logs_workflow.go | 34 ++++---- .../internal/pkg/mocks/aws/mock_interfaces.go | 4 +- .../pkg/mocks/manager/mock_interfaces.go | 4 +- 7 files changed, 177 insertions(+), 26 deletions(-) diff --git a/packages/cli/internal/pkg/aws/cwl/interface.go b/packages/cli/internal/pkg/aws/cwl/interface.go index 9e48dff4..a5ba35a0 100644 --- a/packages/cli/internal/pkg/aws/cwl/interface.go +++ b/packages/cli/internal/pkg/aws/cwl/interface.go @@ -13,7 +13,7 @@ type LogPaginator interface { type Interface interface { GetLogsPaginated(input GetLogsInput) LogPaginator - StreamLogs(ctx context.Context, logGroupName string, streams ...string) chan StreamEvent + StreamLogs(ctx context.Context, logGroupName string, streams ...string) <-chan StreamEvent } type cwlInterface interface { diff --git a/packages/cli/internal/pkg/aws/cwl/stream_logs.go b/packages/cli/internal/pkg/aws/cwl/stream_logs.go index 6164ef49..a5fa888a 100644 --- a/packages/cli/internal/pkg/aws/cwl/stream_logs.go +++ b/packages/cli/internal/pkg/aws/cwl/stream_logs.go @@ -18,7 +18,7 @@ type StreamEvent struct { Err error } -func (c Client) StreamLogs(ctx context.Context, logGroupName string, streams ...string) chan StreamEvent { +func (c Client) StreamLogs(ctx context.Context, logGroupName string, streams ...string) <-chan StreamEvent { stream := make(chan StreamEvent) go func() { defer func() { close(stream) }() diff --git a/packages/cli/internal/pkg/cli/logs_core.go b/packages/cli/internal/pkg/cli/logs_core.go index 08eba2be..c074ccf2 100644 --- a/packages/cli/internal/pkg/cli/logs_core.go +++ b/packages/cli/internal/pkg/cli/logs_core.go @@ -3,6 +3,7 @@ package cli import ( ctx "context" "fmt" + "sync" "time" "github.com/araddon/dateparse" @@ -42,7 +43,7 @@ Use a question mark for OR, such as "?ERROR ?WARN". Filter out terms with a minu ) var printLn = func(args ...interface{}) { - _, _ = fmt.Println(args) + _, _ = fmt.Println(args...) } type logsSharedVars struct { @@ -117,9 +118,13 @@ func (o *logsSharedOpts) parseTime(vars logsSharedVars) error { return nil } -func (o *logsSharedOpts) followLogGroup(logGroupName string, streams ...string) error { - stream := o.cwlClient.StreamLogs(ctx.Background(), logGroupName, streams...) - for event := range stream { +func (o *logsSharedOpts) followLogGroup(logGroupName string) error { + channel := o.cwlClient.StreamLogs(ctx.Background(), logGroupName) + return o.displayEventFromChannel(channel) +} + +func (o *logsSharedOpts) displayEventFromChannel(channel <-chan cwl.StreamEvent) error { + for event := range channel { if event.Err != nil { return event.Err } @@ -134,6 +139,64 @@ func (o *logsSharedOpts) followLogGroup(logGroupName string, streams ...string) return nil } +func (o *logsSharedOpts) followLogStreams(logGroupName string, streams ...string) error { + const maxLogStreams = 100 + streamingCtx, cancelFunc := ctx.WithCancel(ctx.Background()) + defer cancelFunc() + streamBatches := splitToBatchesBy(maxLogStreams, streams) + var eventChannels []<-chan cwl.StreamEvent + + for _, batch := range streamBatches { + eventChannel := o.cwlClient.StreamLogs(streamingCtx, logGroupName, batch...) + eventChannels = append(eventChannels, eventChannel) + } + + return o.displayEventFromChannel(fanInChannels(streamingCtx, eventChannels...)) +} + +func fanInChannels(commonCtx ctx.Context, channels ...<-chan cwl.StreamEvent) <-chan cwl.StreamEvent { + var waitGroup sync.WaitGroup + multiplexedChannel := make(chan cwl.StreamEvent) + + multiplexFunc := func(events <-chan cwl.StreamEvent) { + defer waitGroup.Done() + for event := range events { + select { + case <-commonCtx.Done(): + return + case multiplexedChannel <- event: + } + } + } + + waitGroup.Add(len(channels)) + for _, c := range channels { + go multiplexFunc(c) + } + + go func() { + waitGroup.Wait() + close(multiplexedChannel) + }() + + return multiplexedChannel +} + +func splitToBatchesBy(batchSize int, strs []string) [][]string { + var batches [][]string + totalStrings := len(strs) + batchStart := 0 + for batchStart < totalStrings { + batchEnd := batchStart + batchSize + if batchEnd > totalStrings { + batchEnd = totalStrings + } + batches = append(batches, strs[batchStart:batchEnd]) + batchStart = batchEnd + } + return batches +} + func (o *logsSharedOpts) displayLogGroup(logGroupName string, startTime, endTime *time.Time, filter string, streams ...string) error { output := o.cwlClient.GetLogsPaginated(cwl.GetLogsInput{ LogGroupName: logGroupName, @@ -153,3 +216,13 @@ func (o *logsSharedOpts) displayLogGroup(logGroupName string, startTime, endTime } return nil } + +func (o *logsSharedOpts) displayLogStreams(logGroupName string, startTime, endTime *time.Time, filter string, streams ...string) error { + for _, stream := range streams { + err := o.displayLogGroup(logGroupName, startTime, endTime, filter, stream) + if err != nil { + return err + } + } + return nil +} diff --git a/packages/cli/internal/pkg/cli/logs_core_test.go b/packages/cli/internal/pkg/cli/logs_core_test.go index 5c8b0421..9b6d041a 100644 --- a/packages/cli/internal/pkg/cli/logs_core_test.go +++ b/packages/cli/internal/pkg/cli/logs_core_test.go @@ -1,9 +1,11 @@ package cli import ( + "context" "testing" "time" + "github.com/aws/amazon-genomics-cli/internal/pkg/aws/cwl" "github.com/stretchr/testify/assert" ) @@ -19,3 +21,77 @@ func Test_logsSharedOpts_setDefaultEndTimeIfEmpty_NoFlags_DefaultsToOneHourBack( assert.Nil(t, opts.endTime) } } + +func Test_splitToBatchesBy(t *testing.T) { + tests := map[string]struct { + batchSize int + strings []string + expected [][]string + }{ + "nil": { + batchSize: 123, + strings: nil, + expected: nil, + }, + "empty": { + batchSize: 123, + strings: []string{}, + expected: nil, + }, + "one not complete batch": { + batchSize: 123, + strings: []string{"foo"}, + expected: [][]string{{"foo"}}, + }, + "one complete batch": { + batchSize: 2, + strings: []string{"foo", "bar"}, + expected: [][]string{{"foo", "bar"}}, + }, + "two complete batches": { + batchSize: 1, + strings: []string{"foo", "bar"}, + expected: [][]string{{"foo"}, {"bar"}}, + }, + "one complete and one not complete": { + batchSize: 2, + strings: []string{"foo1", "bar1", "foo2"}, + expected: [][]string{{"foo1", "bar1"}, {"foo2"}}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + actual := splitToBatchesBy(tt.batchSize, tt.strings) + assert.Equal(t, tt.expected, actual) + }) + } +} + +func Test_fanInChannels_nominal(t *testing.T) { + sourceFunc := func(ctx context.Context, strings ...string) <-chan cwl.StreamEvent { + channel := make(chan cwl.StreamEvent) + go func() { + defer close(channel) + for _, s := range strings { + channel <- cwl.StreamEvent{Logs: []string{s}} + } + }() + return channel + } + + ctx := context.Background() + src1 := sourceFunc(ctx, "foo1", "bar1") + src2 := sourceFunc(ctx, "foo2", "bar2", "something else") + src3 := sourceFunc(ctx, "singleton3") + src4 := sourceFunc(ctx) + + combined := fanInChannels(ctx, src1, src2, src3, src4) + + var actual []string + for event := range combined { + actual = append(actual, event.Logs[0]) + } + expected := []string{"foo1", "bar1", "foo2", "bar2", "something else", "singleton3"} + assert.ElementsMatch(t, actual, expected) +} diff --git a/packages/cli/internal/pkg/cli/logs_workflow.go b/packages/cli/internal/pkg/cli/logs_workflow.go index 3c0768f6..95bb0f9c 100644 --- a/packages/cli/internal/pkg/cli/logs_workflow.go +++ b/packages/cli/internal/pkg/cli/logs_workflow.go @@ -92,19 +92,17 @@ func (o *logsWorkflowOpts) Execute() error { return nil } - streams, err := o.getStreamsForJobs(notCachedJobIds) + streamNames, err := o.getStreamsForJobs(notCachedJobIds) if err != nil { return err } logGroupName := "/aws/batch/job" if o.tail { - _ = o.followLogGroup(logGroupName, streams...) + return o.followLogStreams(logGroupName, streamNames...) } else { - return o.displayLogGroup(logGroupName, o.startTime, o.endTime, o.filter, streams...) + return o.displayLogStreams(logGroupName, o.startTime, o.endTime, o.filter, streamNames...) } - - return nil } func filterCachedJobIds(ids []string) []string { @@ -153,19 +151,23 @@ func (o *logsWorkflowOpts) getJobIds() ([]string, error) { } func (o *logsWorkflowOpts) getStreamsForJobs(jobIds []string) ([]string, error) { - jobs, err := o.batchClient.GetJobs(jobIds) - if err != nil { - return nil, err - } - streams := make([]string, len(jobs)) - for i, job := range jobs { - if job.LogStreamName == "" { - log.Debug().Msgf("No log stream found for job '%s' ('%s')", job.JobName, job.JobId) - continue + const maxBatchJobs = 100 + idsBatches := splitToBatchesBy(maxBatchJobs, jobIds) + var streams []string + for _, idsBatch := range idsBatches { + jobs, err := o.batchClient.GetJobs(idsBatch) + if err != nil { + return nil, err + } + for _, job := range jobs { + if job.LogStreamName == "" { + log.Debug().Msgf("No log stream found for job '%s' ('%s')", job.JobName, job.JobId) + continue + } + streams = append(streams, job.LogStreamName) } - streams[i] = job.LogStreamName } - return streams, err + return streams, nil } // BuildLogsWorkflowCommand builds the command to output the content of Cloudwatch log streams diff --git a/packages/cli/internal/pkg/mocks/aws/mock_interfaces.go b/packages/cli/internal/pkg/mocks/aws/mock_interfaces.go index 1026bddc..8a5c1a02 100644 --- a/packages/cli/internal/pkg/mocks/aws/mock_interfaces.go +++ b/packages/cli/internal/pkg/mocks/aws/mock_interfaces.go @@ -432,14 +432,14 @@ func (mr *MockCwlClientMockRecorder) GetLogsPaginated(input interface{}) *gomock } // StreamLogs mocks base method. -func (m *MockCwlClient) StreamLogs(ctx context.Context, logGroupName string, streams ...string) chan cwl.StreamEvent { +func (m *MockCwlClient) StreamLogs(ctx context.Context, logGroupName string, streams ...string) <-chan cwl.StreamEvent { m.ctrl.T.Helper() varargs := []interface{}{ctx, logGroupName} for _, a := range streams { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "StreamLogs", varargs...) - ret0, _ := ret[0].(chan cwl.StreamEvent) + ret0, _ := ret[0].(<-chan cwl.StreamEvent) return ret0 } diff --git a/packages/cli/internal/pkg/mocks/manager/mock_interfaces.go b/packages/cli/internal/pkg/mocks/manager/mock_interfaces.go index 46c79360..0de12d2c 100644 --- a/packages/cli/internal/pkg/mocks/manager/mock_interfaces.go +++ b/packages/cli/internal/pkg/mocks/manager/mock_interfaces.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: interfaces.go +// Source: ./internal/pkg/mocks/manager/interfaces.go -// Package mock_managermocks is a generated GoMock package. +// Package managermocks is a generated GoMock package. package managermocks import (