From d84c38971549dda6eb2803d83756e340fd40c297 Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Fri, 22 Mar 2024 14:23:54 +0800 Subject: [PATCH] fix: goroutine leak for tasker (#38) Signed-off-by: rfyiamcool --- pkg/tasker/tasker.go | 39 +++++++++++++++++++++++++-------------- pkg/tasker/tasker_test.go | 24 ++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/pkg/tasker/tasker.go b/pkg/tasker/tasker.go index 75bdd4c..ed0e5dc 100644 --- a/pkg/tasker/tasker.go +++ b/pkg/tasker/tasker.go @@ -95,7 +95,17 @@ func New(opt Option) *Tasker { logger = log.New(file, "", log.LstdFlags) } - return &Tasker{Log: logger, loc: loc, gron: gron, exprs: exprs, tasks: tasks, verbose: opt.Verbose} + ctx, cancel := context.WithCancel(context.Background()) + return &Tasker{ + Log: logger, + loc: loc, + gron: gron, + exprs: exprs, + tasks: tasks, + verbose: opt.Verbose, + ctx: ctx, + ctxCancel: cancel, + } } // WithContext adds a parent context to the Tasker struct @@ -105,14 +115,6 @@ func (t *Tasker) WithContext(ctx context.Context) *Tasker { return t } -func (t *Tasker) ctxDone() { - <-t.ctx.Done() - if t.verbose { - t.Log.Printf("[tasker] received signal on context.Done, aborting") - } - t.abort = true -} - // Taskify creates TaskFunc out of plain command wrt given options. func (t *Tasker) Taskify(cmd string, opt Option) TaskFunc { sh := Shell(opt.Shell) @@ -259,6 +261,11 @@ func (t *Tasker) Run() { // Stop the task manager. func (t *Tasker) Stop() { + t.stop() +} + +func (t *Tasker) stop() { + t.ctxCancel() t.abort = true } @@ -282,16 +289,20 @@ func (t *Tasker) doSetup() { break } } - if t.ctx != nil { - go t.ctxDone() - } sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) go func() { - <-sig - t.abort = true + select { + case <-sig: + case <-t.ctx.Done(): + if t.verbose { + t.Log.Printf("[tasker] received signal on context.Done, aborting") + } + } + + t.stop() }() } diff --git a/pkg/tasker/tasker_test.go b/pkg/tasker/tasker_test.go index 06bc753..a60c80d 100644 --- a/pkg/tasker/tasker_test.go +++ b/pkg/tasker/tasker_test.go @@ -202,7 +202,7 @@ func TestConcurrency(t *testing.T) { } func TestStopTasker(t *testing.T) { - t.Run("Run", func(t *testing.T) { + t.Run("call stop()", func(t *testing.T) { taskr := New(Option{Verbose: true, Out: "../../test/tasker.out"}) var incr int @@ -216,7 +216,27 @@ func TestStopTasker(t *testing.T) { taskr.Stop() }() taskr.Run() - fmt.Println(incr) + + if incr != 1 { + t.Errorf("the task should run 1x, not %dx", incr) + } + }) + + t.Run("cancel context", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + taskr := New(Option{Verbose: true, Out: "../../test/tasker.out"}).WithContext(ctx) + + var incr int + taskr.Task("* * * * * *", func(ctx context.Context) (int, error) { + incr++ + return 0, nil + }, false) + + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + taskr.Run() if incr != 1 { t.Errorf("the task should run 1x, not %dx", incr)