diff --git a/bucket.go b/bucket.go index 9d39cd4..0ed1edc 100644 --- a/bucket.go +++ b/bucket.go @@ -42,7 +42,6 @@ type Bucket struct { serial uint32 // Serial number for logging inMemory bool // True if it's an in-memory database closed bool // represents state when it is closed - hlc *hybridLogicalClock } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection @@ -196,7 +195,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err return nil, err } - bucket.hlc = NewHybridLogicalClock(bucket.getLastTimestamp()) + hlc.updateLatestTime(bucket.getLastTimestamp()) exists, bucketCopy := registerBucket(bucket) // someone else beat registered the bucket in the registry, that's OK we'll close ours @@ -388,7 +387,6 @@ func (b *Bucket) copy() *Bucket { expManager: b.expManager, serial: b.serial, inMemory: b.inMemory, - hlc: b.hlc, } return r } diff --git a/bucket_test.go b/bucket_test.go index f7844be..28bec01 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -41,10 +41,14 @@ func testBucketPath(t *testing.T) string { } func makeTestBucket(t *testing.T) *Bucket { + return makeTestBucketWithName(t, strings.ToLower(t.Name())) +} + +func makeTestBucketWithName(t *testing.T, name string) *Bucket { LoggingCallback = func(level LogLevel, fmt string, args ...any) { t.Logf(logLevelNamesPrint[level]+fmt, args...) } - bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), strings.ToLower(t.Name()), CreateNew) + bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), name, CreateNew) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, bucket.CloseAndDelete(testCtx(t))) diff --git a/collection.go b/collection.go index 49e1366..ce81c42 100644 --- a/collection.go +++ b/collection.go @@ -591,7 +591,7 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) { func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error { var e *event err := c.bucket.inTransaction(func(txn *sql.Tx) error { - newCas := uint64(c.bucket.hlc.Now()) + newCas := uint64(hlc.Now()) var err error e, err = fn(txn, newCas) if err != nil { diff --git a/hlc.go b/hlc.go index e716dbc..d3d2d5d 100644 --- a/hlc.go +++ b/hlc.go @@ -13,6 +13,12 @@ import ( "time" ) +var hlc *hybridLogicalClock + +func init() { + hlc = NewHybridLogicalClock(0) +} + type timestamp uint64 // hybridLogicalClock is a hybrid logical clock implementation for rosmar that produces timestamps that will always be increasing regardless of clock changes. @@ -43,6 +49,14 @@ func NewHybridLogicalClock(lastTime timestamp) *hybridLogicalClock { } } +func (c *hybridLogicalClock) updateLatestTime(lastTime timestamp) { + c.mutex.Lock() + defer c.mutex.Unlock() + if uint64(lastTime) > c.highestTime { + c.highestTime = uint64(lastTime) + } +} + // Now returns the next time represented in nanoseconds. This can be the current timestamp, or if multiple occur in the same nanosecond, an increasing timestamp. func (c *hybridLogicalClock) Now() timestamp { c.mutex.Lock() diff --git a/hlc_test.go b/hlc_test.go index 16369de..b75ff30 100644 --- a/hlc_test.go +++ b/hlc_test.go @@ -9,9 +9,11 @@ package rosmar import ( + "fmt" "sync" "testing" + sgbucket "github.com/couchbase/sg-bucket" "github.com/stretchr/testify/require" ) @@ -105,3 +107,56 @@ func TestHLCReverseTime(t *testing.T) { require.Equal(t, timestamp(0x3d0000), hlc.Now()) } + +func TestHLCCrossBucket(t *testing.T) { + goroutines := 10 + documentCount := 10 + + collection1 := makeTestBucketWithName(t, "bucket1").DefaultDataStore() + collection2 := makeTestBucketWithName(t, "bucket2").DefaultDataStore() + + wg := sync.WaitGroup{} + results := make(chan []uint64) + + createDocuments := func(goroutineIdx int, collection sgbucket.DataStore) { + + defer wg.Done() + casValues := make([]uint64, documentCount) + for i := 0; i < documentCount; i++ { + cas, err := collection.WriteCas(fmt.Sprintf("key_%d_%d", goroutineIdx, i), 0, 0, []byte(" World"), sgbucket.AddOnly) + require.NoError(t, err) + casValues[i] = cas + } + results <- casValues + } + for i := 0; i < goroutines; i++ { + for _, collection := range []sgbucket.DataStore{collection1, collection2} { + wg.Add(1) + go createDocuments(i, collection) + } + } + + doneChan := make(chan struct{}) + go func() { + wg.Wait() + doneChan <- struct{}{} + }() + allCas := make([]uint64, 0, goroutines*documentCount) +loop: + for { + select { + case casValues := <-results: + allCas = append(allCas, casValues...) + case <-doneChan: + break loop + } + } + uniqueCas := make(map[uint64]struct{}) + for _, cas := range allCas { + if _, ok := uniqueCas[cas]; ok { + t.Errorf("cas %d is not unique", cas) + } + uniqueCas[cas] = struct{}{} + } + +}