Skip to content

Commit

Permalink
Merge pull request #343 from sljeff/asynctask-health-check
Browse files Browse the repository at this point in the history
feat(health): support asynctask health check
  • Loading branch information
fenngwd authored Aug 18, 2021
2 parents 5e527d3 + 738b056 commit 40bae7d
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 52 deletions.
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.16.0 (2021-08-16)

- 改用 `shanbay/amqp`
- 增加 asynctask ext 的健康检查
- 使用方法:`curl 127.0.0.1:9000/health?timeout=5&queue=gobay.task_sub`

# 0.14.0 (2020-11-19)

- sentryext 收集当前栈信息,让 sentry web 界面上可以展开
Expand Down
135 changes: 129 additions & 6 deletions extensions/asynctaskext/asynctaskext.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
/*
How can I check that a running asynctask worker can otherwise process a new message within 5 seconds (health check)?
- `curl 127.0.0.1:5000/health?timeout=5&queue=gobay.task_sub`
- `curl 127.0.0.1:5000/health?timeout=5` **default queue**
*/
package asynctaskext

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/backends/result"
machineryConfig "github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/shanbay/gobay"
)

const (
healthCheckTaskName = "gobay-asynctask-health-check"
)

type AsyncTaskExt struct {
NS string
app *gobay.Application
config *machineryConfig.Config
server *machinery.Server
workers []*machinery.Worker

lock sync.Mutex
healthCheckCompleteChan chan string
healthHandlerRegistered bool
}

func (t *AsyncTaskExt) Object() interface{} {
Expand All @@ -36,6 +54,7 @@ func (t *AsyncTaskExt) Init(app *gobay.Application) error {
return errors.New("lack of NS")
}
t.app = app
t.healthCheckCompleteChan = make(chan string, 1)
config := app.Config()
config = gobay.GetConfigByPrefix(config, t.NS, true)
t.config = &machineryConfig.Config{}
Expand All @@ -51,7 +70,7 @@ func (t *AsyncTaskExt) Init(app *gobay.Application) error {
return err
}
t.server = server
return nil
return t.registerHealthCheck()
}

func (t *AsyncTaskExt) Close() error {
Expand All @@ -73,17 +92,29 @@ func (t *AsyncTaskExt) RegisterWorkerHandlers(handlers map[string]interface{}) e

//StartWorker start a worker that consume task messages for queue
func (t *AsyncTaskExt) StartWorker(queue string, concurrency int) error {
hostName, err := os.Hostname()
if err != nil {
log.ERROR.Printf("get host name failed: %v", err)
}
t.lock.Lock()

if queue == "" {
queue = t.config.DefaultQueue
}
tag := fmt.Sprintf("%s@%s", queue, hostName)
tag := t.genConsumerTag(queue)
worker := t.server.NewWorker(tag, concurrency)
worker.Queue = queue
t.workers = append(t.workers, worker)

// run health check http server
if !t.healthHandlerRegistered {
t.healthHandlerRegistered = true
healthSrv := http.Server{Addr: ":5000"}
http.Handle("/health", http.HandlerFunc(t.healthHttpHandler))
go func() {
if err := healthSrv.ListenAndServe(); err != nil {
log.FATAL.Printf("error when start prometheus server: %v\n", err)
}
}()
}

t.lock.Unlock()
return worker.Launch()
}

Expand All @@ -106,3 +137,95 @@ func (t *AsyncTaskExt) SendTaskWithContext(ctx context.Context, sign *tasks.Sign
}
return asyncResult, nil
}

func (t *AsyncTaskExt) genConsumerTag(queue string) string {
hostName, err := os.Hostname()
if err != nil {
log.ERROR.Printf("get host name failed: %v", err)
}
return fmt.Sprintf("%s@%s", queue, hostName)
}

func (t *AsyncTaskExt) registerHealthCheck() error {
return t.server.RegisterTask(healthCheckTaskName, func(healthCheckUUID string) error {
select {
case t.healthCheckCompleteChan <- healthCheckUUID: // success and send uuid
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("send health check result error: %v", healthCheckUUID)
}
})
}

// Send a health check. Expect it to be processed within taskExecutionTimeout, otherwise it is considered unhealthy and return err
func (t *AsyncTaskExt) checkHealth(consumerTag string, taskExecutionTimeout time.Duration) error {
// clear channel
select {
case <-t.healthCheckCompleteChan:
default:
}

broker := t.server.GetBroker()
healthCheckUUID, err := uuid.NewUUID()
if err != nil {
return err
}
if err := broker.PublishToLocal(consumerTag, &tasks.Signature{
UUID: healthCheckUUID.String(),
Name: healthCheckTaskName,
Args: []tasks.Arg{
{Type: "string", Value: healthCheckUUID.String()},
},
}, 5*time.Second); err != nil {
return err
}

// wait for task execution success
select {
case successUUID := <-t.healthCheckCompleteChan:
if successUUID == healthCheckUUID.String() {
return nil
}
case <-time.After(taskExecutionTimeout):
}
return fmt.Errorf("health check execution fail: %v", healthCheckUUID.String())
}

// HTTP handler that triggers the health check
func (t *AsyncTaskExt) healthHttpHandler(w http.ResponseWriter, r *http.Request) {
// get params
params := r.URL.Query()
if len(params["timeout"]) != 1 {
w.WriteHeader(http.StatusBadRequest)
if _, err := w.Write([]byte("no timeout")); err != nil {
panic(err)
}
return
}
timeoutInSeconds, err := strconv.Atoi(params["timeout"][0])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if _, err = w.Write([]byte(err.Error())); err != nil {
panic(err)
}
return
}
queue := t.config.DefaultQueue
if len(params["queue"]) == 1 && params["queue"][0] != "" {
queue = params["queue"][0]
}
consumerTag := t.genConsumerTag(queue)

// send health check
if err := t.checkHealth(consumerTag, time.Duration(timeoutInSeconds)*time.Second); err != nil {
w.WriteHeader(http.StatusBadRequest)
if _, err = w.Write([]byte(err.Error())); err != nil {
panic(err)
}
return
}
w.WriteHeader(http.StatusOK)
if _, err = w.Write([]byte("OK")); err != nil {
panic(err)
}
}
21 changes: 21 additions & 0 deletions extensions/asynctaskext/asynctaskext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package asynctaskext
import (
"context"
"log"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -43,11 +44,31 @@ func TestPushConsume(t *testing.T) {
t.Error(err)
}
}()
time.Sleep(500 * time.Millisecond) // Make sure the worker is started
go func() {
if err := task.StartWorker("gobay.task_sub", 1); err != nil {
t.Error(err)
}
}()
time.Sleep(500 * time.Millisecond) // Make sure the workers is started

// health check
resp, err := http.Get("http://127.0.0.1:5000/health?timeout=5&queue=gobay.task_sub")
if err != nil || resp.StatusCode != http.StatusOK {
t.Errorf("%v %s", resp, err)
}
resp, err = http.Get("http://127.0.0.1:5000/health?timeout=5&queue=")
if err != nil || resp.StatusCode != http.StatusOK {
t.Errorf("%v %s", resp, err)
}
resp, err = http.Get("http://127.0.0.1:5000/health?timeout=5")
if err != nil || resp.StatusCode != http.StatusOK {
t.Errorf("%v %s", resp, err)
}
resp, err = http.Get("http://127.0.0.1:5000/health?timeout=5&queue=nosuchqueue")
if err != nil || resp.StatusCode != http.StatusBadRequest {
t.Errorf("%v %s", resp, err)
}

signs := []*tasks.Signature{
{
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.5.1-0.20200311113236-681ffa848bae
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/hashicorp/go-multierror v1.1.0
github.com/iancoleman/strcase v0.1.3
Expand Down Expand Up @@ -42,6 +43,7 @@ require (
)

replace (
github.com/RichardKnop/machinery => github.com/RichardKnop/machinery v1.9.7
github.com/RichardKnop/machinery => github.com/shanbay/machinery v1.10.7-0.20210806005400-45afbf2ee694
github.com/facebook/ent => github.com/shanbay/ent v0.4.0
github.com/streadway/amqp => github.com/shanbay/amqp v1.0.1-0.20210728052407-b63250c049f2
)
Loading

0 comments on commit 40bae7d

Please sign in to comment.