Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-3354 subscription publish undetermined order #1171

Merged
merged 6 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 73 additions & 5 deletions pkg/lib/database/subscription.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package database

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/types"
"golang.org/x/exp/slices"
)

type subscription struct {
ids []string
quit chan struct{}
closed bool
ch chan *types.Struct
wg sync.WaitGroup
ids []string
quit chan struct{}
closed bool
ch chan *types.Struct
wg sync.WaitGroup
publishQueue mb.MB[*types.Struct]
processQueueOnce sync.Once
sync.RWMutex
}

Expand All @@ -23,7 +28,12 @@
RecordChan() chan *types.Struct
Subscribe(ids []string) (added []string)
Subscriptions() []string
// Publish is blocking
// returns false if the subscription is closed or the id is not subscribed
Publish(id string, msg *types.Struct) bool
// PublishAsync is non-blocking and guarantees the order of messages
// returns false if the subscription is closed or the id is not subscribed
PublishAsync(id string, msg *types.Struct) bool
}

func (sub *subscription) RecordChan() chan *types.Struct {
Expand All @@ -40,6 +50,7 @@
sub.Unlock()
close(sub.quit)

sub.publishQueue.Close()
sub.wg.Wait()
close(sub.ch)
}
Expand All @@ -60,6 +71,63 @@
return
}

// should be called via sub.processQueueOnce
func (sub *subscription) processQueue() {
go func() {
select {

Check warning on line 77 in pkg/lib/database/subscription.go

View workflow job for this annotation

GitHub Actions / lint

S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
case <-sub.quit:
err := sub.publishQueue.Close()
if err != nil && !errors.Is(err, mb.ErrClosed) {
log.Errorf("subscription %p failed to close async queue: %s", sub, err)
}
unprocessed := sub.publishQueue.Len()
if unprocessed > 0 {
log.Errorf("subscription %p has %d unprocessed messages in the async queue", sub, unprocessed)
}
}
}()

var (
msg *types.Struct
err error
)
for {
// no need for cancellation here, because we close the queue itself on quit and it will return
msg, err = sub.publishQueue.WaitOne(context.Background())
if err != nil {
if !errors.Is(err, mb.ErrClosed) {
log.Errorf("subscription %p failed to get message from async queue: %s", sub, err)
}
return
}
select {

Check warning on line 103 in pkg/lib/database/subscription.go

View workflow job for this annotation

GitHub Actions / lint

S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
case sub.ch <- msg:
continue
}
}
}
Copy link

@coderabbitai coderabbitai bot Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processQueue method handles the asynchronous processing of messages. However, the use of select with a single case in lines 77 and 103 can be simplified to a direct send/receive operation.

- select {
- case <-sub.quit:
+ <-sub.quit
- select {
- case sub.ch <- msg:
+ sub.ch <- msg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks dear little ear, I've fixed this. Here is your carrot 🥕

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@requilence, thank you for the update and for the delicious carrot! 🥕 I'm glad to hear that the changes have been made. If there's anything else you need, just let me know!


If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!


// PublishAsync is non-blocking and guarantees the order of messages
// returns false if the subscription is closed or the id is not subscribed
func (sub *subscription) PublishAsync(id string, msg *types.Struct) bool {
sub.RLock()
if sub.closed {
sub.RUnlock()
return false
}
if !slices.Contains(sub.ids, id) {
sub.RUnlock()
return false
}
sub.RUnlock()
sub.processQueueOnce.Do(func() {
go sub.processQueue()
})
log.Debugf("objStore subscription sendasync %s %p", id, sub)
err := sub.publishQueue.Add(context.Background(), msg)
return err == nil
}

func (sub *subscription) Publish(id string, msg *types.Struct) bool {
sub.RLock()
if sub.closed {
Expand Down
29 changes: 21 additions & 8 deletions pkg/lib/localstore/objectstore/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package objectstore
import (
"fmt"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -647,15 +648,27 @@ func TestQueryByIdAndSubscribeForChanges(t *testing.T) {
}
})

t.Run("update details", func(t *testing.T) {
err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", "name1 updated")))
require.NoError(t, err)
t.Run("update details order", func(t *testing.T) {
for i := 1; i <= 1000; i++ {
err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", fmt.Sprintf("%d", i))))
require.NoError(t, err)
}

select {
case rec := <-recordsCh:
assert.Equal(t, "name1 updated", pbtypes.GetString(rec, bundle.RelationKeyName.String()))
case <-time.After(10 * time.Millisecond):
require.Fail(t, "update has not been received")
prev := 0
for {
select {
case rec := <-recordsCh:
name := pbtypes.GetString(rec, bundle.RelationKeyName.String())
num, err := strconv.Atoi(name)
require.NoError(t, err)
require.Equal(t, prev+1, num)
if num == 1000 {
return
}
prev = num
case <-time.After(10 * time.Millisecond):
require.Fail(t, "update has not been received")
}
}
})
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/lib/localstore/objectstore/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ func (s *dsObjectStore) sendUpdatesToSubscriptions(id string, details *types.Str
Details: detCopy,
})
}
for i := range s.subscriptions {
go func(sub database.Subscription) {
_ = sub.Publish(id, detCopy)
}(s.subscriptions[i])
for _, sub := range s.subscriptions {
_ = sub.PublishAsync(id, detCopy)
}
}
Loading