diff --git a/pkg/lib/database/subscription.go b/pkg/lib/database/subscription.go index dad8879905..ebb83ae006 100644 --- a/pkg/lib/database/subscription.go +++ b/pkg/lib/database/subscription.go @@ -1,20 +1,23 @@ package database import ( - "fmt" + "context" + "errors" "sync" - "time" + "github.com/cheggaaa/mb/v3" "github.com/gogo/protobuf/types" "golang.org/x/exp/slices" ) type subscription struct { - ids []string - quit chan struct{} - closed bool - ch chan *types.Struct - wg sync.WaitGroup + ids []string + quit chan struct{} + closed bool + ch chan *types.Struct + wg sync.WaitGroup + publishQueue mb.MB[*types.Struct] + processQueueOnce sync.Once sync.RWMutex } @@ -23,7 +26,9 @@ type Subscription interface { RecordChan() chan *types.Struct Subscribe(ids []string) (added []string) Subscriptions() []string - Publish(id string, msg *types.Struct) bool + // PublishAsync is non-blocking and guarantees the order of messages + // returns false if the subscription is closed or the id is not subscribed + PublishAsync(id string, msg *types.Struct) bool } func (sub *subscription) RecordChan() chan *types.Struct { @@ -40,6 +45,7 @@ func (sub *subscription) Close() { sub.Unlock() close(sub.quit) + sub.publishQueue.Close() sub.wg.Wait() close(sub.ch) } @@ -60,7 +66,44 @@ loop: return } -func (sub *subscription) Publish(id string, msg *types.Struct) bool { +// should be called via sub.processQueueOnce +func (sub *subscription) processQueue() { + go func() { + <-sub.quit + err := sub.publishQueue.Close() + if err != nil && !errors.Is(err, mb.ErrClosed) { + log.Errorf("subscription %p failed to close async queue: %s", sub, err) + } + unprocessed := sub.publishQueue.Len() + if unprocessed > 0 { + log.Warnf("subscription %p has %d unprocessed messages in the async queue", sub, unprocessed) + } + }() + + var ( + msg *types.Struct + err error + ) + for { + // no need for cancellation here, because we close the queue itself on quit and it will return + msg, err = sub.publishQueue.WaitOne(context.Background()) + if err != nil { + if !errors.Is(err, mb.ErrClosed) { + log.Errorf("subscription %p failed to get message from async queue: %s", sub, err) + } + return + } + select { + case sub.ch <- msg: + case <-sub.quit: + log.Warnf("subscription %p is closed, dropping message", sub) + } + } +} + +// PublishAsync is non-blocking and guarantees the order of messages +// returns false if the subscription is closed or the id is not subscribed +func (sub *subscription) PublishAsync(id string, msg *types.Struct) bool { sub.RLock() if sub.closed { sub.RUnlock() @@ -70,24 +113,14 @@ func (sub *subscription) Publish(id string, msg *types.Struct) bool { sub.RUnlock() return false } - sub.wg.Add(1) - defer sub.wg.Done() sub.RUnlock() - - log.Debugf("objStore subscription send %s %p", id, sub) - var total time.Duration - for { - select { - case <-sub.quit: - return false - case sub.ch <- msg: - return true - case <-time.After(time.Second * 3): - total += time.Second * 3 - log.Errorf(fmt.Sprintf("subscription %p is blocked for %.0f seconds, failed to send %s", sub, total.Seconds(), id)) - continue - } - } + sub.processQueueOnce.Do(func() { + go sub.processQueue() + }) + log.Debugf("objStore subscription sendasync %s %p", id, sub) + // we have unlimited buffer, so it should never block, no need for context cancellation + err := sub.publishQueue.Add(context.Background(), msg) + return err == nil } func (sub *subscription) SubscribedForId(id string) bool { diff --git a/pkg/lib/localstore/objectstore/queries_test.go b/pkg/lib/localstore/objectstore/queries_test.go index 68efa3e9b9..c012d3dcd4 100644 --- a/pkg/lib/localstore/objectstore/queries_test.go +++ b/pkg/lib/localstore/objectstore/queries_test.go @@ -3,6 +3,7 @@ package objectstore import ( "fmt" "sort" + "strconv" "testing" "time" @@ -647,15 +648,27 @@ func TestQueryByIdAndSubscribeForChanges(t *testing.T) { } }) - t.Run("update details", func(t *testing.T) { - err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", "name1 updated"))) - require.NoError(t, err) + t.Run("update details order", func(t *testing.T) { + for i := 1; i <= 1000; i++ { + err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", fmt.Sprintf("%d", i)))) + require.NoError(t, err) + } - select { - case rec := <-recordsCh: - assert.Equal(t, "name1 updated", pbtypes.GetString(rec, bundle.RelationKeyName.String())) - case <-time.After(10 * time.Millisecond): - require.Fail(t, "update has not been received") + prev := 0 + for { + select { + case rec := <-recordsCh: + name := pbtypes.GetString(rec, bundle.RelationKeyName.String()) + num, err := strconv.Atoi(name) + require.NoError(t, err) + require.Equal(t, prev+1, num) + if num == 1000 { + return + } + prev = num + case <-time.After(10 * time.Millisecond): + require.Fail(t, "update has not been received") + } } }) } diff --git a/pkg/lib/localstore/objectstore/update.go b/pkg/lib/localstore/objectstore/update.go index 3260e11e13..e1e829af05 100644 --- a/pkg/lib/localstore/objectstore/update.go +++ b/pkg/lib/localstore/objectstore/update.go @@ -208,9 +208,7 @@ func (s *dsObjectStore) sendUpdatesToSubscriptions(id string, details *types.Str Details: detCopy, }) } - for i := range s.subscriptions { - go func(sub database.Subscription) { - _ = sub.Publish(id, detCopy) - }(s.subscriptions[i]) + for _, sub := range s.subscriptions { + _ = sub.PublishAsync(id, detCopy) } }