Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-3354 subscription publish undetermined order #1171

Merged
merged 6 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 59 additions & 26 deletions pkg/lib/database/subscription.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package database

import (
"fmt"
"context"
"errors"
"sync"
"time"

"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/types"
"golang.org/x/exp/slices"
)

type subscription struct {
ids []string
quit chan struct{}
closed bool
ch chan *types.Struct
wg sync.WaitGroup
ids []string
quit chan struct{}
closed bool
ch chan *types.Struct
wg sync.WaitGroup
publishQueue mb.MB[*types.Struct]
processQueueOnce sync.Once
sync.RWMutex
}

Expand All @@ -23,7 +26,9 @@ type Subscription interface {
RecordChan() chan *types.Struct
Subscribe(ids []string) (added []string)
Subscriptions() []string
Publish(id string, msg *types.Struct) bool
// PublishAsync is non-blocking and guarantees the order of messages
// returns false if the subscription is closed or the id is not subscribed
PublishAsync(id string, msg *types.Struct) bool
}

func (sub *subscription) RecordChan() chan *types.Struct {
Expand All @@ -40,6 +45,7 @@ func (sub *subscription) Close() {
sub.Unlock()
close(sub.quit)

sub.publishQueue.Close()
sub.wg.Wait()
close(sub.ch)
}
Expand All @@ -60,7 +66,44 @@ loop:
return
}

func (sub *subscription) Publish(id string, msg *types.Struct) bool {
// should be called via sub.processQueueOnce
func (sub *subscription) processQueue() {
go func() {
<-sub.quit
err := sub.publishQueue.Close()
if err != nil && !errors.Is(err, mb.ErrClosed) {
log.Errorf("subscription %p failed to close async queue: %s", sub, err)
}
unprocessed := sub.publishQueue.Len()
if unprocessed > 0 {
log.Warnf("subscription %p has %d unprocessed messages in the async queue", sub, unprocessed)
}
}()

var (
msg *types.Struct
err error
)
for {
// no need for cancellation here, because we close the queue itself on quit and it will return
msg, err = sub.publishQueue.WaitOne(context.Background())
if err != nil {
if !errors.Is(err, mb.ErrClosed) {
log.Errorf("subscription %p failed to get message from async queue: %s", sub, err)
}
return
}
select {
case sub.ch <- msg:
case <-sub.quit:
log.Warnf("subscription %p is closed, dropping message", sub)
}
}
}

// PublishAsync is non-blocking and guarantees the order of messages
// returns false if the subscription is closed or the id is not subscribed
func (sub *subscription) PublishAsync(id string, msg *types.Struct) bool {
sub.RLock()
if sub.closed {
sub.RUnlock()
Expand All @@ -70,24 +113,14 @@ func (sub *subscription) Publish(id string, msg *types.Struct) bool {
sub.RUnlock()
return false
}
sub.wg.Add(1)
defer sub.wg.Done()
sub.RUnlock()

log.Debugf("objStore subscription send %s %p", id, sub)
var total time.Duration
for {
select {
case <-sub.quit:
return false
case sub.ch <- msg:
return true
case <-time.After(time.Second * 3):
total += time.Second * 3
log.Errorf(fmt.Sprintf("subscription %p is blocked for %.0f seconds, failed to send %s", sub, total.Seconds(), id))
continue
}
}
sub.processQueueOnce.Do(func() {
go sub.processQueue()
})
log.Debugf("objStore subscription sendasync %s %p", id, sub)
// we have unlimited buffer, so it should never block, no need for context cancellation
err := sub.publishQueue.Add(context.Background(), msg)
return err == nil
}

func (sub *subscription) SubscribedForId(id string) bool {
Expand Down
29 changes: 21 additions & 8 deletions pkg/lib/localstore/objectstore/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package objectstore
import (
"fmt"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -647,15 +648,27 @@ func TestQueryByIdAndSubscribeForChanges(t *testing.T) {
}
})

t.Run("update details", func(t *testing.T) {
err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", "name1 updated")))
require.NoError(t, err)
t.Run("update details order", func(t *testing.T) {
for i := 1; i <= 1000; i++ {
err = s.UpdateObjectDetails("id1", makeDetails(makeObjectWithName("id1", fmt.Sprintf("%d", i))))
require.NoError(t, err)
}

select {
case rec := <-recordsCh:
assert.Equal(t, "name1 updated", pbtypes.GetString(rec, bundle.RelationKeyName.String()))
case <-time.After(10 * time.Millisecond):
require.Fail(t, "update has not been received")
prev := 0
for {
select {
case rec := <-recordsCh:
name := pbtypes.GetString(rec, bundle.RelationKeyName.String())
num, err := strconv.Atoi(name)
require.NoError(t, err)
require.Equal(t, prev+1, num)
if num == 1000 {
return
}
prev = num
case <-time.After(10 * time.Millisecond):
require.Fail(t, "update has not been received")
}
}
})
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/lib/localstore/objectstore/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ func (s *dsObjectStore) sendUpdatesToSubscriptions(id string, details *types.Str
Details: detCopy,
})
}
for i := range s.subscriptions {
go func(sub database.Subscription) {
_ = sub.Publish(id, detCopy)
}(s.subscriptions[i])
for _, sub := range s.subscriptions {
_ = sub.PublishAsync(id, detCopy)
}
}
Loading