Skip to content

Commit

Permalink
Merge pull request #346 from hj24/master
Browse files Browse the repository at this point in the history
feat: asynctask startworker 时增加是否允许 healthcheck 的选项
  • Loading branch information
fenngwd authored Aug 30, 2021
2 parents 4e7aee2 + 053994d commit 25cd406
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 31 deletions.
4 changes: 2 additions & 2 deletions extensions/asynctaskext/asynctaskext.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (t *AsyncTaskExt) RegisterWorkerHandlers(handlers map[string]interface{}) e
}

//StartWorker start a worker that consume task messages for queue
func (t *AsyncTaskExt) StartWorker(queue string, concurrency int) error {
func (t *AsyncTaskExt) StartWorker(queue string, concurrency int, enableHealthCheck bool) error {
t.lock.Lock()

if queue == "" {
Expand All @@ -103,7 +103,7 @@ func (t *AsyncTaskExt) StartWorker(queue string, concurrency int) error {
t.workers = append(t.workers, worker)

// run health check http server
if !t.healthHandlerRegistered {
if enableHealthCheck && !t.healthHandlerRegistered {
t.healthHandlerRegistered = true
healthSrv := http.Server{Addr: ":5000"}
http.Handle("/health", http.HandlerFunc(t.healthHttpHandler))
Expand Down
76 changes: 53 additions & 23 deletions extensions/asynctaskext/asynctaskext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,67 @@ import (

"github.com/RichardKnop/machinery/v1/backends/result"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/stretchr/testify/assert"

"github.com/shanbay/gobay"
)

var (
task AsyncTaskExt
taskOne AsyncTaskExt
taskTwo AsyncTaskExt
)

func init() {
task = AsyncTaskExt{NS: "asynctask_"}
taskOne = AsyncTaskExt{NS: "one_asynctask_"}
taskTwo = AsyncTaskExt{NS: "two_asynctask_"}

app, _ := gobay.CreateApp(
"../../testdata",
"testing",
map[gobay.Key]gobay.Extension{
"asynctask": &task,
"oneasynctask": &taskOne,
"twoasynctask": &taskTwo,
},
)
if err := app.Init(); err != nil {
log.Panic(err)
}
}

func TaskAdd(args ...int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}

func TaskSub(arg1, arg2 int64) (int64, error) {
return arg1 - arg2, nil
}

func TaskSubWithContext(ctx context.Context, arg1, arg2 int64) (int64, error) {
if err := ctx.Err(); err != nil {
return 0, err
}
return arg1 - arg2, nil
}

func TestPushConsume(t *testing.T) {
if err := task.RegisterWorkerHandlers(map[string]interface{}{
if err := taskOne.RegisterWorkerHandlers(map[string]interface{}{
"add": TaskAdd, "sub": TaskSub, "subCtx": TaskSubWithContext,
}); err != nil {
t.Error(err)
}
go func() {
// use default queue
if err := task.StartWorker("", 1); err != nil {
if err := taskOne.StartWorker("", 1, true); err != nil {
t.Error(err)
}
}()
time.Sleep(500 * time.Millisecond) // Make sure the worker is started
go func() {
if err := task.StartWorker("gobay.task_sub", 1); err != nil {
if err := taskOne.StartWorker("gobay.task_sub", 1, true); err != nil {
t.Error(err)
}
}()
Expand Down Expand Up @@ -123,9 +146,9 @@ func TestPushConsume(t *testing.T) {
err error
)
if sign.Name == "subCtx" {
asyncResult, err = task.SendTaskWithContext(context.Background(), sign)
asyncResult, err = taskOne.SendTaskWithContext(context.Background(), sign)
} else {
asyncResult, err = task.SendTask(sign)
asyncResult, err = taskOne.SendTask(sign)
}
if err != nil {
t.Error(err)
Expand All @@ -136,21 +159,28 @@ func TestPushConsume(t *testing.T) {
}
}
}
func TaskAdd(args ...int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}

func TaskSub(arg1, arg2 int64) (int64, error) {
return arg1 - arg2, nil
}
func TestMultiTaskExtStartWorker(t *testing.T) {
t.Run("1: 第一个 task StartWorker, 允许 healthcheck, 正常", func(t *testing.T) {
go func() {
// use default queue
if err := taskOne.StartWorker("", 1, true); err != nil {
t.Error(err)
}
}()
})

func TaskSubWithContext(ctx context.Context, arg1, arg2 int64) (int64, error) {
if err := ctx.Err(); err != nil {
return 0, err
}
return arg1 - arg2, nil
t.Run("2: 第二个 task StartWorker, 不允许 healthcheck, 正常运行", func(t *testing.T) {
go func() {
if err := taskTwo.StartWorker("", 1, false); err != nil {
t.Error(err)
}
}()
})

t.Run("3: 第二个 task StartWorker, 允许 healthcheck, 会 panic", func(t *testing.T) {
assert.Panics(t, func() {
_ = taskTwo.StartWorker("", 1, true)
})
})
}
19 changes: 13 additions & 6 deletions testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ defaults: &defaults
stub_health_retrybackoff: 50ms
stub_health_retrytimes: 3

asynctask_concurrency: 10
asynctask_broker: "redis://127.0.0.1:6379/8"
asynctask_default_queue: "gobay.task"
asynctask_result_backend: "redis://127.0.0.1:6379/8"
asynctask_results_expire_in: 1
asynctask_redis: {}
one_asynctask_concurrency: 10
one_asynctask_broker: "redis://127.0.0.1:6379/8"
one_asynctask_default_queue: "gobay.task.one"
one_asynctask_result_backend: "redis://127.0.0.1:6379/8"
one_asynctask_results_expire_in: 1
one_asynctask_redis: {}

two_asynctask_concurrency: 10
two_asynctask_broker: "redis://127.0.0.1:6379/8"
two_asynctask_default_queue: "gobay.task.two"
two_asynctask_result_backend: "redis://127.0.0.1:6379/8"
two_asynctask_results_expire_in: 1
two_asynctask_redis: {}

db_driver: sqlite3
db_url: ":memory:"
Expand Down

0 comments on commit 25cd406

Please sign in to comment.