Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-3354 subscription publish undetermined order #1171

Merged
merged 6 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 72 additions & 5 deletions pkg/lib/database/subscription.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package database

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/types"
"golang.org/x/exp/slices"
)

type subscription struct {
ids []string
quit chan struct{}
closed bool
ch chan *types.Struct
wg sync.WaitGroup
ids []string
quit chan struct{}
closed bool
ch chan *types.Struct
wg sync.WaitGroup
publishQueue mb.MB[*types.Struct]
processQueueOnce sync.Once
sync.RWMutex
}

Expand All @@ -23,7 +28,12 @@
RecordChan() chan *types.Struct
Subscribe(ids []string) (added []string)
Subscriptions() []string
// Publish is blocking
// returns false if the subscription is closed or the id is not subscribed
Publish(id string, msg *types.Struct) bool
// PublishAsync is non-blocking and guarantees the order of messages
// returns false if the subscription is closed or the id is not subscribed
PublishAsync(id string, msg *types.Struct) bool
}

func (sub *subscription) RecordChan() chan *types.Struct {
Expand All @@ -40,6 +50,7 @@
sub.Unlock()
close(sub.quit)

sub.publishQueue.Close()
sub.wg.Wait()
close(sub.ch)
}
Expand All @@ -60,6 +71,62 @@
return
}

// should be called via sub.processQueueOnce
func (sub *subscription) processQueue() {
go func() {
<-sub.quit
err := sub.publishQueue.Close()
if err != nil && !errors.Is(err, mb.ErrClosed) {
log.Errorf("subscription %p failed to close async queue: %s", sub, err)
}
unprocessed := sub.publishQueue.Len()
if unprocessed > 0 {
log.Warnf("subscription %p has %d unprocessed messages in the async queue", sub, unprocessed)
}
}()

var (
msg *types.Struct
err error
)
for {
// no need for cancellation here, because we close the queue itself on quit and it will return
msg, err = sub.publishQueue.WaitOne(context.Background())
if err != nil {
if !errors.Is(err, mb.ErrClosed) {
log.Errorf("subscription %p failed to get message from async queue: %s", sub, err)
}
return
}
select {

Check warning on line 101 in pkg/lib/database/subscription.go

View workflow job for this annotation

GitHub Actions / lint

S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
case sub.ch <- msg:
continue
}
}
}

// PublishAsync is non-blocking and guarantees the order of messages
// returns false if the subscription is closed or the id is not subscribed
func (sub *subscription) PublishAsync(id string, msg *types.Struct) bool {
sub.RLock()
if sub.closed {
sub.RUnlock()
return false
}
if !slices.Contains(sub.ids, id) {
sub.RUnlock()
return false
}
sub.RUnlock()
sub.processQueueOnce.Do(func() {
go sub.processQueue()
})
log.Debugf("objStore subscription sendasync %s %p", id, sub)
// we have unlimited buffer, so it should never block, no need for context cancellation
err := sub.publishQueue.Add(context.Background(), msg)
return err == nil
}

func (sub *subscription) Publish(id string, msg *types.Struct) bool {
sub.RLock()
if sub.closed {
Expand Down
29 changes: 21 additions & 8 deletions pkg/lib/localstore/objectstore/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package objectstore
import (
"fmt"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -647,15 +648,27 @@ func TestQueryByIdAndSubscribeForChanges(t *testing.T) {
}
})

t.Run("update details", func(t *testing.T) {
err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", "name1 updated")))
require.NoError(t, err)
t.Run("update details order", func(t *testing.T) {
for i := 1; i <= 1000; i++ {
err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", fmt.Sprintf("%d", i))))
require.NoError(t, err)
}

select {
case rec := <-recordsCh:
assert.Equal(t, "name1 updated", pbtypes.GetString(rec, bundle.RelationKeyName.String()))
case <-time.After(10 * time.Millisecond):
require.Fail(t, "update has not been received")
prev := 0
for {
select {
case rec := <-recordsCh:
name := pbtypes.GetString(rec, bundle.RelationKeyName.String())
num, err := strconv.Atoi(name)
require.NoError(t, err)
require.Equal(t, prev+1, num)
if num == 1000 {
return
}
prev = num
case <-time.After(10 * time.Millisecond):
require.Fail(t, "update has not been received")
}
}
})
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/lib/localstore/objectstore/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ func (s *dsObjectStore) sendUpdatesToSubscriptions(id string, details *types.Str
Details: detCopy,
})
}
for i := range s.subscriptions {
go func(sub database.Subscription) {
_ = sub.Publish(id, detCopy)
}(s.subscriptions[i])
for _, sub := range s.subscriptions {
_ = sub.PublishAsync(id, detCopy)
}
}
Loading