Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

fix: show logs for workflows with more than 100 tasks #114

Merged
merged 1 commit into from
Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cli/internal/pkg/aws/cwl/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type LogPaginator interface {

type Interface interface {
GetLogsPaginated(input GetLogsInput) LogPaginator
StreamLogs(ctx context.Context, logGroupName string, streams ...string) chan StreamEvent
StreamLogs(ctx context.Context, logGroupName string, streams ...string) <-chan StreamEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between returning chan vs <-chan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are two different types. <-chan is a read-only channel. Which is important in this case. We do not want to allow any one other then owner to be able to write into it.

}

type cwlInterface interface {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/internal/pkg/aws/cwl/stream_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type StreamEvent struct {
Err error
}

func (c Client) StreamLogs(ctx context.Context, logGroupName string, streams ...string) chan StreamEvent {
func (c Client) StreamLogs(ctx context.Context, logGroupName string, streams ...string) <-chan StreamEvent {
stream := make(chan StreamEvent)
go func() {
defer func() { close(stream) }()
Expand Down
81 changes: 77 additions & 4 deletions packages/cli/internal/pkg/cli/logs_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
ctx "context"
"fmt"
"sync"
"time"

"github.com/araddon/dateparse"
Expand Down Expand Up @@ -42,7 +43,7 @@ Use a question mark for OR, such as "?ERROR ?WARN". Filter out terms with a minu
)

var printLn = func(args ...interface{}) {
_, _ = fmt.Println(args)
_, _ = fmt.Println(args...)
}

type logsSharedVars struct {
Expand Down Expand Up @@ -117,9 +118,13 @@ func (o *logsSharedOpts) parseTime(vars logsSharedVars) error {
return nil
}

func (o *logsSharedOpts) followLogGroup(logGroupName string, streams ...string) error {
stream := o.cwlClient.StreamLogs(ctx.Background(), logGroupName, streams...)
for event := range stream {
func (o *logsSharedOpts) followLogGroup(logGroupName string) error {
channel := o.cwlClient.StreamLogs(ctx.Background(), logGroupName)
return o.displayEventFromChannel(channel)
}

func (o *logsSharedOpts) displayEventFromChannel(channel <-chan cwl.StreamEvent) error {
for event := range channel {
if event.Err != nil {
return event.Err
}
Expand All @@ -134,6 +139,64 @@ func (o *logsSharedOpts) followLogGroup(logGroupName string, streams ...string)
return nil
}

func (o *logsSharedOpts) followLogStreams(logGroupName string, streams ...string) error {
const maxLogStreams = 100
streamingCtx, cancelFunc := ctx.WithCancel(ctx.Background())
defer cancelFunc()
streamBatches := splitToBatchesBy(maxLogStreams, streams)
var eventChannels []<-chan cwl.StreamEvent

for _, batch := range streamBatches {
eventChannel := o.cwlClient.StreamLogs(streamingCtx, logGroupName, batch...)
eventChannels = append(eventChannels, eventChannel)
}

return o.displayEventFromChannel(fanInChannels(streamingCtx, eventChannels...))
}

func fanInChannels(commonCtx ctx.Context, channels ...<-chan cwl.StreamEvent) <-chan cwl.StreamEvent {
var waitGroup sync.WaitGroup
multiplexedChannel := make(chan cwl.StreamEvent)

multiplexFunc := func(events <-chan cwl.StreamEvent) {
defer waitGroup.Done()
for event := range events {
select {
case <-commonCtx.Done():
return
case multiplexedChannel <- event:
}
}
}

waitGroup.Add(len(channels))
for _, c := range channels {
go multiplexFunc(c)
}

go func() {
waitGroup.Wait()
close(multiplexedChannel)
}()

return multiplexedChannel
}

func splitToBatchesBy(batchSize int, strs []string) [][]string {
var batches [][]string
totalStrings := len(strs)
batchStart := 0
for batchStart < totalStrings {
batchEnd := batchStart + batchSize
if batchEnd > totalStrings {
batchEnd = totalStrings
}
batches = append(batches, strs[batchStart:batchEnd])
batchStart = batchEnd
}
return batches
}

func (o *logsSharedOpts) displayLogGroup(logGroupName string, startTime, endTime *time.Time, filter string, streams ...string) error {
output := o.cwlClient.GetLogsPaginated(cwl.GetLogsInput{
LogGroupName: logGroupName,
Expand All @@ -153,3 +216,13 @@ func (o *logsSharedOpts) displayLogGroup(logGroupName string, startTime, endTime
}
return nil
}

func (o *logsSharedOpts) displayLogStreams(logGroupName string, startTime, endTime *time.Time, filter string, streams ...string) error {
for _, stream := range streams {
err := o.displayLogGroup(logGroupName, startTime, endTime, filter, stream)
if err != nil {
return err
}
}
return nil
}
76 changes: 76 additions & 0 deletions packages/cli/internal/pkg/cli/logs_core_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cli

import (
"context"
"testing"
"time"

"github.com/aws/amazon-genomics-cli/internal/pkg/aws/cwl"
"github.com/stretchr/testify/assert"
)

Expand All @@ -19,3 +21,77 @@ func Test_logsSharedOpts_setDefaultEndTimeIfEmpty_NoFlags_DefaultsToOneHourBack(
assert.Nil(t, opts.endTime)
}
}

func Test_splitToBatchesBy(t *testing.T) {
tests := map[string]struct {
batchSize int
strings []string
expected [][]string
}{
"nil": {
batchSize: 123,
strings: nil,
expected: nil,
},
"empty": {
batchSize: 123,
strings: []string{},
expected: nil,
},
"one not complete batch": {
batchSize: 123,
strings: []string{"foo"},
expected: [][]string{{"foo"}},
},
"one complete batch": {
batchSize: 2,
strings: []string{"foo", "bar"},
expected: [][]string{{"foo", "bar"}},
},
"two complete batches": {
batchSize: 1,
strings: []string{"foo", "bar"},
expected: [][]string{{"foo"}, {"bar"}},
},
"one complete and one not complete": {
batchSize: 2,
strings: []string{"foo1", "bar1", "foo2"},
expected: [][]string{{"foo1", "bar1"}, {"foo2"}},
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
actual := splitToBatchesBy(tt.batchSize, tt.strings)
assert.Equal(t, tt.expected, actual)
})
}
}

func Test_fanInChannels_nominal(t *testing.T) {
sourceFunc := func(ctx context.Context, strings ...string) <-chan cwl.StreamEvent {
channel := make(chan cwl.StreamEvent)
go func() {
defer close(channel)
for _, s := range strings {
channel <- cwl.StreamEvent{Logs: []string{s}}
}
}()
return channel
}

ctx := context.Background()
src1 := sourceFunc(ctx, "foo1", "bar1")
src2 := sourceFunc(ctx, "foo2", "bar2", "something else")
src3 := sourceFunc(ctx, "singleton3")
src4 := sourceFunc(ctx)

combined := fanInChannels(ctx, src1, src2, src3, src4)

var actual []string
for event := range combined {
actual = append(actual, event.Logs[0])
}
expected := []string{"foo1", "bar1", "foo2", "bar2", "something else", "singleton3"}
assert.ElementsMatch(t, actual, expected)
}
34 changes: 18 additions & 16 deletions packages/cli/internal/pkg/cli/logs_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,17 @@ func (o *logsWorkflowOpts) Execute() error {
return nil
}

streams, err := o.getStreamsForJobs(notCachedJobIds)
streamNames, err := o.getStreamsForJobs(notCachedJobIds)
if err != nil {
return err
}

logGroupName := "/aws/batch/job"
if o.tail {
_ = o.followLogGroup(logGroupName, streams...)
return o.followLogStreams(logGroupName, streamNames...)
} else {
return o.displayLogGroup(logGroupName, o.startTime, o.endTime, o.filter, streams...)
return o.displayLogStreams(logGroupName, o.startTime, o.endTime, o.filter, streamNames...)
}

return nil
}

func filterCachedJobIds(ids []string) []string {
Expand Down Expand Up @@ -153,19 +151,23 @@ func (o *logsWorkflowOpts) getJobIds() ([]string, error) {
}

func (o *logsWorkflowOpts) getStreamsForJobs(jobIds []string) ([]string, error) {
jobs, err := o.batchClient.GetJobs(jobIds)
if err != nil {
return nil, err
}
streams := make([]string, len(jobs))
for i, job := range jobs {
if job.LogStreamName == "" {
log.Debug().Msgf("No log stream found for job '%s' ('%s')", job.JobName, job.JobId)
continue
const maxBatchJobs = 100
idsBatches := splitToBatchesBy(maxBatchJobs, jobIds)
var streams []string
for _, idsBatch := range idsBatches {
jobs, err := o.batchClient.GetJobs(idsBatch)
if err != nil {
return nil, err
}
for _, job := range jobs {
if job.LogStreamName == "" {
log.Debug().Msgf("No log stream found for job '%s' ('%s')", job.JobName, job.JobId)
continue
}
streams = append(streams, job.LogStreamName)
}
streams[i] = job.LogStreamName
}
return streams, err
return streams, nil
}

// BuildLogsWorkflowCommand builds the command to output the content of Cloudwatch log streams
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/internal/pkg/mocks/aws/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/cli/internal/pkg/mocks/manager/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.