Skip to content

Commit

Permalink
fix: reduce reserve cnt on same chunk and index collision (#4798)
Browse files Browse the repository at this point in the history
Co-authored-by: Ivan Vandot <ivan@vandot.rs>
  • Loading branch information
acha-bill and vandot authored Sep 2, 2024
1 parent 383bbcc commit f1ed79e
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ jobs:
- uses: actions/upload-artifact@v4
with:
name: temp-artifacts
include-hidden-files: true
path: |
Dockerfile.goreleaser
Makefile
Expand Down
17 changes: 15 additions & 2 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
r.multx.Lock(strconv.Itoa(int(bin)))
defer r.multx.Unlock(strconv.Itoa(int(bin)))

return r.st.Run(ctx, func(s transaction.Store) error {
var shouldIncReserveSize, shouldDecrReserveSize bool

err = r.st.Run(ctx, func(s transaction.Store) error {

oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
Expand Down Expand Up @@ -165,6 +167,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
if err != nil {
return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err)
}
shouldDecrReserveSize = true
}
}

Expand Down Expand Up @@ -279,11 +282,21 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
}

if !loadedStampIndex {
r.size.Add(1)
shouldIncReserveSize = true
}

return nil
})
if err != nil {
return err
}
if shouldIncReserveSize {
r.size.Add(1)
}
if shouldDecrReserveSize {
r.size.Add(-1)
}
return nil
}

func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error {
Expand Down
97 changes: 97 additions & 0 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func TestSameChunkAddress(t *testing.T) {
binBinIDs := make(map[uint8]uint64)

t.Run("same stamp index and older timestamp", func(t *testing.T) {
size1 := r.Size()
signer := getSigner(t)
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
Expand All @@ -174,9 +175,14 @@ func TestSameChunkAddress(t *testing.T) {
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
size2 := r.Size()
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
}
})

t.Run("different stamp index and older timestamp", func(t *testing.T) {
size1 := r.Size()
signer := getSigner(t)
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
Expand All @@ -193,6 +199,10 @@ func TestSameChunkAddress(t *testing.T) {
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
size2 := r.Size()
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
}
})

replace := func(t *testing.T, ch1 swarm.Chunk, ch2 swarm.Chunk, ch1BinID, ch2BinID uint64) {
Expand Down Expand Up @@ -233,6 +243,7 @@ func TestSameChunkAddress(t *testing.T) {
}

t.Run("same stamp index and newer timestamp", func(t *testing.T) {
size1 := r.Size()
signer := getSigner(t)
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
Expand All @@ -242,9 +253,14 @@ func TestSameChunkAddress(t *testing.T) {
bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes())
binBinIDs[bin] += 2
replace(t, ch1, ch2, binBinIDs[bin]-1, binBinIDs[bin])
size2 := r.Size()
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
}
})

t.Run("different stamp index and newer timestamp", func(t *testing.T) {
size1 := r.Size()
signer := getSigner(t)
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
Expand All @@ -254,9 +270,14 @@ func TestSameChunkAddress(t *testing.T) {
bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes())
binBinIDs[bin] += 2
replace(t, ch1, ch2, binBinIDs[bin]-1, binBinIDs[bin])
size2 := r.Size()
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
}
})

t.Run("not a soc and newer timestamp", func(t *testing.T) {
size1 := r.Size()
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 7))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 8))
Expand Down Expand Up @@ -284,6 +305,11 @@ func TestSameChunkAddress(t *testing.T) {
if !bytes.Equal(ch.Data(), ch1.Data()) {
t.Fatalf("expected chunk data to not be updated")
}

size2 := r.Size()
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1)
}
})

t.Run("chunk with different batchID remains untouched", func(t *testing.T) {
Expand Down Expand Up @@ -327,6 +353,8 @@ func TestSameChunkAddress(t *testing.T) {
}
}

size1 := r.Size()

// soc
signer := getSigner(t)
batch := postagetesting.MustNewBatch()
Expand All @@ -350,7 +378,76 @@ func TestSameChunkAddress(t *testing.T) {
t.Fatalf("expected chunk addresses to be the same")
}
noReplace(ch1, ch2)
size2 := r.Size()
if size2-size1 != 4 {
t.Fatalf("expected reserve size to increase by 4, got %d", size2-size1)
}
})

t.Run("same address but index collision with different chunk", func(t *testing.T) {
size1 := r.Size()
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
err = r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
bin1 := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes())
binBinIDs[bin1] += 1
ch1BinID := binBinIDs[bin1]
ch1StampHash, err := ch1.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

signer := getSigner(t)
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
ch2 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1))
err = r.Put(ctx, ch2)
if err != nil {
t.Fatal(err)
}
bin2 := swarm.Proximity(baseAddr.Bytes(), ch2.Address().Bytes())
binBinIDs[bin2] += 1
ch2BinID := binBinIDs[bin2]
ch2StampHash, err := ch2.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin1, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: binBinIDs[bin1], StampHash: ch1StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: binBinIDs[bin2], StampHash: ch2StampHash}, false)

s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer)
ch3 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 2))
err = r.Put(ctx, ch3)
if err != nil {
t.Fatal(err)
}
binBinIDs[bin2] += 1
ch3StampHash, err := ch3.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
ch3BinID := binBinIDs[bin2]

checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin1, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch3.Stamp().BatchID(), Address: ch3.Address(), StampHash: ch3StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: ch1BinID, StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch3BinID, StampHash: ch3StampHash}, false)

size2 := r.Size()

// (ch1 + ch2) == 2 and then ch3 reduces reserve size by 1
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
}
})

}

func TestReplaceOldIndex(t *testing.T) {
Expand Down

0 comments on commit f1ed79e

Please sign in to comment.