-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GO-3354 subscription publish undetermined order #1171
GO-3354 subscription publish undetermined order #1171
Conversation
Warning Rate Limit Exceeded@requilence has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 29 minutes and 41 seconds before requesting another review. How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. WalkthroughThe recent updates focus on enhancing the asynchronous processing capabilities within the subscription and local store systems. New methods for non-blocking message publishing ensure ordered delivery, while tests are adjusted to verify the sequential update process. The overall aim is to improve efficiency and reliability in message handling and data updates across the system. Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Out of diff range and nitpick comments (1)
pkg/lib/database/subscription.go (1)
75-108
: TheprocessQueue
method correctly handles the asynchronous processing of messages. However, consider handling the case wherecontext.Background()
might need cancellation support if the system requires graceful shutdowns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
pkg/lib/database/subscription.go
Outdated
// should be called via sub.processQueueOnce | ||
func (sub *subscription) processQueue() { | ||
go func() { | ||
select { | ||
case <-sub.quit: | ||
err := sub.publishQueue.Close() | ||
if err != nil && !errors.Is(err, mb.ErrClosed) { | ||
log.Errorf("subscription %p failed to close async queue: %s", sub, err) | ||
} | ||
unprocessed := sub.publishQueue.Len() | ||
if unprocessed > 0 { | ||
log.Errorf("subscription %p has %d unprocessed messages in the async queue", sub, unprocessed) | ||
} | ||
} | ||
}() | ||
|
||
var ( | ||
msg *types.Struct | ||
err error | ||
) | ||
for { | ||
// no need for cancellation here, because we close the queue itself on quit and it will return | ||
msg, err = sub.publishQueue.WaitOne(context.Background()) | ||
if err != nil { | ||
if !errors.Is(err, mb.ErrClosed) { | ||
log.Errorf("subscription %p failed to get message from async queue: %s", sub, err) | ||
} | ||
return | ||
} | ||
select { | ||
case sub.ch <- msg: | ||
continue | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The processQueue
method handles the asynchronous processing of messages. However, the use of select
with a single case in lines 77 and 103 can be simplified to a direct send/receive operation.
- select {
- case <-sub.quit:
+ <-sub.quit
- select {
- case sub.ch <- msg:
+ sub.ch <- msg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks dear little ear, I've fixed this. Here is your carrot 🥕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@requilence, thank you for the update and for the delicious carrot! 🥕 I'm glad to hear that the changes have been made. If there's anything else you need, just let me know!
If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!
Coverage provided by /~https://github.com/seriousben/go-patch-cover-action |
Summary by CodeRabbit
New Features
PublishAsync
to ensure non-blocking operations while maintaining the order of messages.Enhancements
Close
method to ensure proper closure of thepublishQueue
.Publish
method with a subscription status check prior to publishing.Bug Fixes
Refactor
sendUpdatesToSubscriptions
to enhance performance and readability by using range-based loops.