Skip to content

Commit

Permalink
We should always Ack in dispatch to DLQ
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjohansen committed Jul 17, 2023
1 parent cf22797 commit 8db07c6
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,15 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
}
}

// Defaulting to Ack as this always hits the DLQ.
func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
start := time.Now()
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
logging.FromContext(ctx).Warn("failed creating event from delivery, err: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return
}
Expand All @@ -290,35 +290,34 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err := d.Reporter.ReportEventCount(args, statusCode); err != nil {
if err = d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return
} else if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result := ceClient.Send(ctx, *response)
_, isSuccess := getStatus(ctx, result)
result = ceClient.Send(ctx, *response)
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)
err = msg.Nack(false, false) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
Expand Down

0 comments on commit 8db07c6

Please sign in to comment.