Skip to content

Commit

Permalink
fix: show logs for workflows with more than 100 tasks (aws#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
IllyaYalovyy authored and tneely committed Nov 11, 2021
1 parent 99491e7 commit e50bfe0
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 26 deletions.
2 changes: 1 addition & 1 deletion packages/cli/internal/pkg/aws/cwl/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type LogPaginator interface {

type Interface interface {
GetLogsPaginated(input GetLogsInput) LogPaginator
StreamLogs(ctx context.Context, logGroupName string, streams ...string) chan StreamEvent
StreamLogs(ctx context.Context, logGroupName string, streams ...string) <-chan StreamEvent
}

type cwlInterface interface {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/internal/pkg/aws/cwl/stream_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type StreamEvent struct {
Err error
}

func (c Client) StreamLogs(ctx context.Context, logGroupName string, streams ...string) chan StreamEvent {
func (c Client) StreamLogs(ctx context.Context, logGroupName string, streams ...string) <-chan StreamEvent {
stream := make(chan StreamEvent)
go func() {
defer func() { close(stream) }()
Expand Down
81 changes: 77 additions & 4 deletions packages/cli/internal/pkg/cli/logs_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
ctx "context"
"fmt"
"sync"
"time"

"github.com/araddon/dateparse"
Expand Down Expand Up @@ -42,7 +43,7 @@ Use a question mark for OR, such as "?ERROR ?WARN". Filter out terms with a minu
)

var printLn = func(args ...interface{}) {
_, _ = fmt.Println(args)
_, _ = fmt.Println(args...)
}

type logsSharedVars struct {
Expand Down Expand Up @@ -117,9 +118,13 @@ func (o *logsSharedOpts) parseTime(vars logsSharedVars) error {
return nil
}

func (o *logsSharedOpts) followLogGroup(logGroupName string, streams ...string) error {
stream := o.cwlClient.StreamLogs(ctx.Background(), logGroupName, streams...)
for event := range stream {
func (o *logsSharedOpts) followLogGroup(logGroupName string) error {
channel := o.cwlClient.StreamLogs(ctx.Background(), logGroupName)
return o.displayEventFromChannel(channel)
}

func (o *logsSharedOpts) displayEventFromChannel(channel <-chan cwl.StreamEvent) error {
for event := range channel {
if event.Err != nil {
return event.Err
}
Expand All @@ -134,6 +139,64 @@ func (o *logsSharedOpts) followLogGroup(logGroupName string, streams ...string)
return nil
}

func (o *logsSharedOpts) followLogStreams(logGroupName string, streams ...string) error {
const maxLogStreams = 100
streamingCtx, cancelFunc := ctx.WithCancel(ctx.Background())
defer cancelFunc()
streamBatches := splitToBatchesBy(maxLogStreams, streams)
var eventChannels []<-chan cwl.StreamEvent

for _, batch := range streamBatches {
eventChannel := o.cwlClient.StreamLogs(streamingCtx, logGroupName, batch...)
eventChannels = append(eventChannels, eventChannel)
}

return o.displayEventFromChannel(fanInChannels(streamingCtx, eventChannels...))
}

func fanInChannels(commonCtx ctx.Context, channels ...<-chan cwl.StreamEvent) <-chan cwl.StreamEvent {
var waitGroup sync.WaitGroup
multiplexedChannel := make(chan cwl.StreamEvent)

multiplexFunc := func(events <-chan cwl.StreamEvent) {
defer waitGroup.Done()
for event := range events {
select {
case <-commonCtx.Done():
return
case multiplexedChannel <- event:
}
}
}

waitGroup.Add(len(channels))
for _, c := range channels {
go multiplexFunc(c)
}

go func() {
waitGroup.Wait()
close(multiplexedChannel)
}()

return multiplexedChannel
}

func splitToBatchesBy(batchSize int, strs []string) [][]string {
var batches [][]string
totalStrings := len(strs)
batchStart := 0
for batchStart < totalStrings {
batchEnd := batchStart + batchSize
if batchEnd > totalStrings {
batchEnd = totalStrings
}
batches = append(batches, strs[batchStart:batchEnd])
batchStart = batchEnd
}
return batches
}

func (o *logsSharedOpts) displayLogGroup(logGroupName string, startTime, endTime *time.Time, filter string, streams ...string) error {
output := o.cwlClient.GetLogsPaginated(cwl.GetLogsInput{
LogGroupName: logGroupName,
Expand All @@ -153,3 +216,13 @@ func (o *logsSharedOpts) displayLogGroup(logGroupName string, startTime, endTime
}
return nil
}

func (o *logsSharedOpts) displayLogStreams(logGroupName string, startTime, endTime *time.Time, filter string, streams ...string) error {
for _, stream := range streams {
err := o.displayLogGroup(logGroupName, startTime, endTime, filter, stream)
if err != nil {
return err
}
}
return nil
}
76 changes: 76 additions & 0 deletions packages/cli/internal/pkg/cli/logs_core_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cli

import (
"context"
"testing"
"time"

"github.com/aws/amazon-genomics-cli/internal/pkg/aws/cwl"
"github.com/stretchr/testify/assert"
)

Expand All @@ -19,3 +21,77 @@ func Test_logsSharedOpts_setDefaultEndTimeIfEmpty_NoFlags_DefaultsToOneHourBack(
assert.Nil(t, opts.endTime)
}
}

func Test_splitToBatchesBy(t *testing.T) {
tests := map[string]struct {
batchSize int
strings []string
expected [][]string
}{
"nil": {
batchSize: 123,
strings: nil,
expected: nil,
},
"empty": {
batchSize: 123,
strings: []string{},
expected: nil,
},
"one not complete batch": {
batchSize: 123,
strings: []string{"foo"},
expected: [][]string{{"foo"}},
},
"one complete batch": {
batchSize: 2,
strings: []string{"foo", "bar"},
expected: [][]string{{"foo", "bar"}},
},
"two complete batches": {
batchSize: 1,
strings: []string{"foo", "bar"},
expected: [][]string{{"foo"}, {"bar"}},
},
"one complete and one not complete": {
batchSize: 2,
strings: []string{"foo1", "bar1", "foo2"},
expected: [][]string{{"foo1", "bar1"}, {"foo2"}},
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
actual := splitToBatchesBy(tt.batchSize, tt.strings)
assert.Equal(t, tt.expected, actual)
})
}
}

func Test_fanInChannels_nominal(t *testing.T) {
sourceFunc := func(ctx context.Context, strings ...string) <-chan cwl.StreamEvent {
channel := make(chan cwl.StreamEvent)
go func() {
defer close(channel)
for _, s := range strings {
channel <- cwl.StreamEvent{Logs: []string{s}}
}
}()
return channel
}

ctx := context.Background()
src1 := sourceFunc(ctx, "foo1", "bar1")
src2 := sourceFunc(ctx, "foo2", "bar2", "something else")
src3 := sourceFunc(ctx, "singleton3")
src4 := sourceFunc(ctx)

combined := fanInChannels(ctx, src1, src2, src3, src4)

var actual []string
for event := range combined {
actual = append(actual, event.Logs[0])
}
expected := []string{"foo1", "bar1", "foo2", "bar2", "something else", "singleton3"}
assert.ElementsMatch(t, actual, expected)
}
34 changes: 18 additions & 16 deletions packages/cli/internal/pkg/cli/logs_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,17 @@ func (o *logsWorkflowOpts) Execute() error {
return nil
}

streams, err := o.getStreamsForJobs(notCachedJobIds)
streamNames, err := o.getStreamsForJobs(notCachedJobIds)
if err != nil {
return err
}

logGroupName := "/aws/batch/job"
if o.tail {
_ = o.followLogGroup(logGroupName, streams...)
return o.followLogStreams(logGroupName, streamNames...)
} else {
return o.displayLogGroup(logGroupName, o.startTime, o.endTime, o.filter, streams...)
return o.displayLogStreams(logGroupName, o.startTime, o.endTime, o.filter, streamNames...)
}

return nil
}

func filterCachedJobIds(ids []string) []string {
Expand Down Expand Up @@ -153,19 +151,23 @@ func (o *logsWorkflowOpts) getJobIds() ([]string, error) {
}

func (o *logsWorkflowOpts) getStreamsForJobs(jobIds []string) ([]string, error) {
jobs, err := o.batchClient.GetJobs(jobIds)
if err != nil {
return nil, err
}
streams := make([]string, len(jobs))
for i, job := range jobs {
if job.LogStreamName == "" {
log.Debug().Msgf("No log stream found for job '%s' ('%s')", job.JobName, job.JobId)
continue
const maxBatchJobs = 100
idsBatches := splitToBatchesBy(maxBatchJobs, jobIds)
var streams []string
for _, idsBatch := range idsBatches {
jobs, err := o.batchClient.GetJobs(idsBatch)
if err != nil {
return nil, err
}
for _, job := range jobs {
if job.LogStreamName == "" {
log.Debug().Msgf("No log stream found for job '%s' ('%s')", job.JobName, job.JobId)
continue
}
streams = append(streams, job.LogStreamName)
}
streams[i] = job.LogStreamName
}
return streams, err
return streams, nil
}

// BuildLogsWorkflowCommand builds the command to output the content of Cloudwatch log streams
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/internal/pkg/mocks/aws/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/cli/internal/pkg/mocks/manager/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e50bfe0

Please sign in to comment.