Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ETA with delay #28

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions integration-tests/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ func testPanic(server Server, t *testing.T) {
}

func testDelay(server Server, t *testing.T) {
now := time.Now().UTC()
eta := now.Add(100 * time.Millisecond)
task := newDelayTask(eta)
delay := 100 * time.Millisecond
eta := time.Now().UTC().Add(delay)
task := newDelayTask(delay)
asyncResult, err := server.SendTask(task)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -480,9 +480,9 @@ func newMultipleReturnTask(arg1, arg2 string, fail bool) *tasks.Signature {
}
}

func newDelayTask(eta time.Time) *tasks.Signature {
func newDelayTask(delay time.Duration) *tasks.Signature {
return &tasks.Signature{
Name: "delay_test",
ETA: &eta,
Name: "delay_test",
Delay: delay,
}
}
12 changes: 3 additions & 9 deletions v1/brokers/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,10 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
return fmt.Errorf("JSON marshal error: %s", err)
}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// delay the task
if signature.ETA != nil {
now := time.Now().UTC()

if signature.ETA.After(now) {
delayMs := int64(signature.ETA.Sub(now) / time.Millisecond)

return b.delay(signature, delayMs)
}
if signature.Delay > 0 {
return b.delay(signature, signature.Delay.Milliseconds())
}

queue := b.GetConfig().DefaultQueue
Expand Down
22 changes: 8 additions & 14 deletions v1/brokers/azure/storage_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,15 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
messageBody := string(msg)
enqueueOptions := &azqueue.EnqueueMessageOptions{}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// and is not a fifo queue, set a delay in seconds for the task.
if signature.ETA != nil {
now := time.Now().UTC()
delay := signature.ETA.Sub(now)
if delay > 0 {
if delay > maxDelay {
log.ERROR.Printf("max visibility timeout exceeded sending %s. defaulting to max.", signature.Name)
delay = maxDelay
}
delaysS := int32(delay.Seconds())
enqueueOptions.VisibilityTimeout = &delaysS
if signature.Delay > 0 {
if signature.Delay > maxDelay {
log.ERROR.Printf("max visibility timeout exceeded sending %s. defaulting to max.", signature.Name)
signature.Delay = maxDelay
}
delaysS := int32(signature.Delay.Seconds())
enqueueOptions.VisibilityTimeout = &delaysS
}

ttlSeconds := int32(b.cfg.TTL.Seconds())
Expand Down Expand Up @@ -173,9 +169,7 @@ func (b *Broker) extend(by time.Duration, signature *tasks.Signature) error {
func (b *Broker) RetryMessage(signature *tasks.Signature) {
b.AdjustRoutingKey(signature)

delay := signature.ETA.Sub(time.Now().UTC())

delayS := int32(delay.Seconds())
delayS := int32(signature.Delay.Seconds())

_, err := b.cfg.Client.NewQueueClient(signature.RoutingKey).UpdateMessage(
context.Background(),
Expand Down
14 changes: 5 additions & 9 deletions v1/brokers/redis/goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,12 @@ func (b *BrokerGR) Publish(ctx context.Context, signature *tasks.Signature) erro
return fmt.Errorf("JSON marshal error: %s", err)
}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// delay the task
if signature.ETA != nil {
now := time.Now().UTC()

if signature.ETA.After(now) {
score := signature.ETA.UnixNano()
err = b.rclient.ZAdd(context.Background(), b.redisDelayedTasksKey, redis.Z{Score: float64(score), Member: msg}).Err()
return err
}
if signature.Delay > 0 {
score := time.Now().Add(signature.Delay).UnixNano()
err = b.rclient.ZAdd(context.Background(), b.redisDelayedTasksKey, redis.Z{Score: float64(score), Member: msg}).Err()
return err
}

err = b.rclient.RPush(context.Background(), signature.RoutingKey, msg).Err()
Expand Down
14 changes: 5 additions & 9 deletions v1/brokers/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,12 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
conn := b.open()
defer conn.Close()

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// delay the task
if signature.ETA != nil {
now := time.Now().UTC()

if signature.ETA.After(now) {
score := signature.ETA.UnixNano()
_, err = conn.Do("ZADD", b.redisDelayedTasksKey, score, msg)
return err
}
if signature.Delay > 0 {
score := time.Now().Add(signature.Delay).UnixNano()
_, err = conn.Do("ZADD", b.redisDelayedTasksKey, score, msg)
return err
}

_, err = conn.Do("RPUSH", signature.RoutingKey, msg)
Expand Down
21 changes: 8 additions & 13 deletions v1/brokers/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,14 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
MsgInput.MessageGroupId = aws.String(MsgGroupID)
}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// and is not a fifo queue, set a delay in seconds for the task.
if signature.ETA != nil && !strings.HasSuffix(signature.RoutingKey, ".fifo") {
now := time.Now().UTC()
delay := signature.ETA.Sub(now)
if delay > 0 {
if delay > maxAWSSQSDelay {
log.ERROR.Printf("max AWS SQS delay exceeded sending %s. defaulting to max.", signature.Name)
delay = maxAWSSQSDelay
}
MsgInput.DelaySeconds = aws.Int64(int64(delay.Seconds()))
if signature.Delay > 0 && !strings.HasSuffix(signature.RoutingKey, ".fifo") {
if signature.Delay > maxAWSSQSDelay {
log.ERROR.Printf("max AWS SQS delay exceeded sending %s. defaulting to max.", signature.Name)
signature.Delay = maxAWSSQSDelay
}
MsgInput.DelaySeconds = aws.Int64(int64(signature.Delay.Seconds()))
}

result, err := b.service.SendMessageWithContext(ctx, MsgInput)
Expand Down Expand Up @@ -199,13 +195,12 @@ func (b *Broker) extend(by time.Duration, signature *tasks.Signature) error {
func (b *Broker) RetryMessage(signature *tasks.Signature) {
b.AdjustRoutingKey(signature)

delay := signature.ETA.Sub(time.Now().UTC())
delay = restrictVisibilityTimeoutDelay(delay, signature.ReceivedAt)
signature.Delay = restrictVisibilityTimeoutDelay(signature.Delay, signature.ReceivedAt)

visibilityInput := &awssqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(b.GetConfig().Broker + "/" + signature.RoutingKey),
ReceiptHandle: &signature.SQSReceiptHandle,
VisibilityTimeout: aws.Int64(int64(delay.Seconds())),
VisibilityTimeout: aws.Int64(int64(signature.Delay.Seconds())),
}

_, err := b.service.ChangeMessageVisibility(visibilityInput)
Expand Down
4 changes: 2 additions & 2 deletions v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.
signature.Headers = tracing.HeadersWithSpan(signature.Headers, span)
}

if signature.ETA != nil {
span.SetTag("signature.eta", signature.ETA.String())
if signature.Delay > 0 {
span.SetTag("signature.eta", signature.Delay.String())
}

// Make sure result backend is defined
Expand Down
25 changes: 2 additions & 23 deletions v1/tasks/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,8 @@ import (
"time"
)

// ErrRetryTaskLater ...
type ErrRetryTaskLater struct {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer use ErrRetryTaskLater -- we always use ErrKeepAndRetryTaskLater

name, msg string
retryIn time.Duration
}

// RetryIn returns time.Duration from now when task should be retried
func (e ErrRetryTaskLater) RetryIn() time.Duration {
return e.retryIn
}

// Error implements the error interface
func (e ErrRetryTaskLater) Error() string {
return fmt.Sprintf("Task error: %s Will retry in: %s", e.msg, e.retryIn)
}

// NewErrRetryTaskLater returns new ErrRetryTaskLater instance
func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater {
return ErrRetryTaskLater{msg: msg, retryIn: retryIn}
}

// Retriable is interface that retriable errors should implement

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for fixing this cursed word most of all

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that I'd invented this spelling and was sitting here wondering what was wrong with me, but I just checked and it actually wasn't me and I feel a whole lot better about myself now.

type Retriable interface {
// Retryable is interface that retryable errors should implement
type Retryable interface {
RetryIn() time.Duration
error
}
Expand Down
2 changes: 1 addition & 1 deletion v1/tasks/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Signature struct {
UUID string
Name string
RoutingKey string
ETA *time.Time
Delay time.Duration
IngestionTime *time.Time
FirstReceived *time.Time
ReceivedAt time.Time
Expand Down
12 changes: 6 additions & 6 deletions v1/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func New(taskFunc interface{}, args []Arg) (*Task, error) {
// Call attempts to call the task with the supplied arguments.
//
// `err` is set in the return value in two cases:
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
func (t *Task) Call() (taskResults []*TaskResult, err error) {
// retrieve the span from the task's context and finish it as soon as this function returns
span := opentracing.SpanFromContext(t.Context)
Expand Down Expand Up @@ -202,16 +202,16 @@ func (t *Task) Call() (taskResults []*TaskResult, err error) {
return nil, ErrLastReturnValueMustBeError
}

_, isRetriable := asError.(Retriable)
_, isRetryable := asError.(Retryable)

if span != nil {
if !isRetriable {
if !isRetryable {
span.LogFields(opentracing_log.Error(asError))
} else {
span.SetTag("warning", asError)
}

span.SetTag("can_retry", isRetriable)
span.SetTag("can_retry", isRetryable)
span.SetTag("did_fail", true)
}

Expand Down
23 changes: 5 additions & 18 deletions v1/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,6 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten
// Call the task
results, err := task.Call()
if err != nil {
// If a tasks.ErrRetryTaskLater was returned from the task,
// retry the task after specified duration
retryTaskLaterErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
if ok {
if worker.taskRetryErrorHandler != nil {
worker.taskRetryErrorHandler(err, signature)
}
return worker.retryTaskIn(signature, retryTaskLaterErr.RetryIn())
}

keepAndRetryErr, ok := interface{}(err).(tasks.ErrKeepAndRetryTaskLater)
if ok {
if worker.taskRetryErrorHandler != nil {
Expand Down Expand Up @@ -266,8 +256,7 @@ func (worker *Worker) taskRetry(signature *tasks.Signature) error {
signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)

// Delay task by signature.RetryTimeout seconds
eta := time.Now().UTC().Add(time.Second * time.Duration(signature.RetryTimeout))
signature.ETA = &eta
signature.Delay = time.Second * time.Duration(signature.RetryTimeout)

log.INFO.Printf("Task %s failed. Going to retry in %d seconds.", signature.UUID, signature.RetryTimeout)

Expand All @@ -276,16 +265,15 @@ func (worker *Worker) taskRetry(signature *tasks.Signature) error {
return err
}

// taskRetryIn republishes the task to the queue with ETA of now + retryIn.Seconds()
// taskRetryIn republishes the task to the queue retryIn
func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Duration) error {
// Update task state to RETRY
if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
return fmt.Errorf("Set state to 'retry' for task %s returned error: %s", signature.UUID, err)
}

// Delay task by retryIn duration
eta := time.Now().UTC().Add(retryIn)
signature.ETA = &eta
signature.Delay = retryIn

// Increase the attempt count, but leave RetryCount alone because it's for the default retry behavior.
signature.AttemptCount++
Expand All @@ -297,16 +285,15 @@ func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Durat
return err
}

// keepAndRetryTaskIn attempts to keep the message on the queue but with a new ETA of now + retryIn.Seconds()
// keepAndRetryTaskIn attempts to keep the message on the queue but with a delay of retryIn
func (worker *Worker) keepAndRetryTaskIn(signature *tasks.Signature, retryIn time.Duration) error {
// Update task state to RETRY
if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
return fmt.Errorf("Set state to 'retry' for task %s returned error: %w", signature.UUID, err)
}

// Delay task by retryIn duration
eta := time.Now().UTC().Add(retryIn)
signature.ETA = &eta
signature.Delay = retryIn

// Increase the attempt count, but leave RetryCount alone because it's for the default retry behavior. This will
// only matter if the broker does not support RetryMessage and falls back to sending a new task.
Expand Down