Skip to content

Commit

Permalink
Introduce thread-safe committee map to the synchronizer (#96)
Browse files Browse the repository at this point in the history
* Introduce thread safe committee map to synchronizer

* Rename

* Move the instantiation

* Implement AsSlice and remove Range function

* Change TestStoreAndLoad

* Rely on sync.Map instead of plain map feat RWMutex
  • Loading branch information
Stefan-Ethernal authored Jun 26, 2024
1 parent bce3046 commit be68a80
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 18 deletions.
31 changes: 14 additions & 17 deletions synchronizer/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ type BatchSynchronizer struct {
blockBatchSize uint
self common.Address
db db.DB
committee map[common.Address]etherman.DataCommitteeMember
comitteeLock sync.Mutex
committee *CommitteeMapSafe
syncLock sync.Mutex
reorgs <-chan BlockReorg
sequencer SequencerTracker
Expand Down Expand Up @@ -76,20 +75,20 @@ func NewBatchSynchronizer(
}

func (bs *BatchSynchronizer) resolveCommittee() error {
bs.comitteeLock.Lock()
defer bs.comitteeLock.Unlock()

committee := make(map[common.Address]etherman.DataCommitteeMember)
current, err := bs.client.GetCurrentDataCommittee()
if err != nil {
return err
}
for _, member := range current.Members {
if bs.self != member.Addr {
committee[member.Addr] = member

filteredMembers := make([]etherman.DataCommitteeMember, 0, len(current.Members))
for _, m := range current.Members {
if m.Addr != bs.self {
filteredMembers = append(filteredMembers, m)
}
}
bs.committee = committee

bs.committee = NewCommitteeMapSafe()
bs.committee.StoreBatch(filteredMembers)
return nil
}

Expand Down Expand Up @@ -316,7 +315,7 @@ func (bs *BatchSynchronizer) resolve(batch types.BatchKey) (*types.OffChainData,
}

// If the sequencer failed to produce data, try the other nodes
if len(bs.committee) == 0 {
if bs.committee.Length() == 0 {
// committee is resolved again once all members are evicted. They can be evicted
// for not having data, or their config being malformed
err := bs.resolveCommittee()
Expand All @@ -326,21 +325,19 @@ func (bs *BatchSynchronizer) resolve(batch types.BatchKey) (*types.OffChainData,
}

// pull out the members, iterating will change the map on error
members := make([]etherman.DataCommitteeMember, len(bs.committee))
for _, member := range bs.committee {
members = append(members, member)
}
members := bs.committee.AsSlice()

// iterate through them randomly until data is resolved
for _, r := range rand.Perm(len(members)) {
member := members[r]
if member.URL == "" || member.Addr == common.HexToAddress("0x0") || member.Addr == bs.self {
delete(bs.committee, member.Addr)
bs.committee.Delete(member.Addr)
continue // malformed committee, skip what is known to be wrong
}
value, err := bs.resolveWithMember(batch.Hash, member)
if err != nil {
log.Warnf("error resolving, continuing: %v", err)
delete(bs.committee, member.Addr)
bs.committee.Delete(member.Addr)
continue // did not have data or errored out
}

Expand Down
3 changes: 2 additions & 1 deletion synchronizer/batches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestBatchSynchronizer_ResolveCommittee(t *testing.T) {
}

require.NoError(t, batchSyncronizer.resolveCommittee())
require.Len(t, batchSyncronizer.committee, len(committee.Members))
require.Equal(t, len(committee.Members), batchSyncronizer.committee.Length())

ethermanMock.AssertExpectations(t)
})
Expand Down Expand Up @@ -130,6 +130,7 @@ func TestBatchSynchronizer_Resolve(t *testing.T) {
client: ethermanMock,
sequencer: sequencerMock,
rpcClientFactory: clientFactoryMock,
committee: NewCommitteeMapSafe(),
}

offChainData, err := batchSyncronizer.resolve(batchKey)
Expand Down
70 changes: 70 additions & 0 deletions synchronizer/committee_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package synchronizer

import (
"sync"
"sync/atomic"

"github.com/0xPolygon/cdk-data-availability/etherman"
"github.com/ethereum/go-ethereum/common"
)

// CommitteeMapSafe represents a thread-safe implementation for the data availability committee members map.
type CommitteeMapSafe struct {
members sync.Map
membersCount int32
}

// NewCommitteeMapSafe creates a new CommitteeMapSafe.
func NewCommitteeMapSafe() *CommitteeMapSafe {
return &CommitteeMapSafe{members: sync.Map{}}
}

// Store sets the value for a key.
func (t *CommitteeMapSafe) Store(member etherman.DataCommitteeMember) {
_, exists := t.members.LoadOrStore(member.Addr, member)
if !exists {
atomic.AddInt32(&t.membersCount, 1)
} else {
t.members.Store(member.Addr, member)
}
}

// StoreBatch sets the range of values and keys.
func (t *CommitteeMapSafe) StoreBatch(members []etherman.DataCommitteeMember) {
for _, m := range members {
t.Store(m)
}
}

// Load returns the value stored in the map for a key, or false if no value is present.
func (t *CommitteeMapSafe) Load(addr common.Address) (etherman.DataCommitteeMember, bool) {
rawValue, exists := t.members.Load(addr)
if !exists {
return etherman.DataCommitteeMember{}, false
}
return rawValue.(etherman.DataCommitteeMember), exists
}

// Delete deletes the value for a key.
func (t *CommitteeMapSafe) Delete(key common.Address) {
_, exists := t.members.LoadAndDelete(key)
if exists {
atomic.AddInt32(&t.membersCount, -1)
}
}

// AsSlice returns a slice of etherman.DataCommitteeMembers.
func (t *CommitteeMapSafe) AsSlice() []etherman.DataCommitteeMember {
membersSlice := make([]etherman.DataCommitteeMember, 0, atomic.LoadInt32(&t.membersCount))
t.members.Range(func(_, rawMember any) bool {
membersSlice = append(membersSlice, rawMember.(etherman.DataCommitteeMember))

return true
})
return membersSlice
}

// Length returns the current length of the map.
func (t *CommitteeMapSafe) Length() int {
return int(atomic.LoadInt32(&t.membersCount))
}
139 changes: 139 additions & 0 deletions synchronizer/committee_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package synchronizer

import (
"sync"
"testing"

"github.com/0xPolygon/cdk-data-availability/etherman"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)

func TestStoreAndLoad(t *testing.T) {
committee := NewCommitteeMapSafe()

members := []etherman.DataCommitteeMember{
{Addr: common.HexToAddress("0x1"), URL: "Member 1"},
{Addr: common.HexToAddress("0x2"), URL: "Member 2"},
{Addr: common.HexToAddress("0x3"), URL: "Member 3"},
{Addr: common.HexToAddress("0x4"), URL: "Member 4"},
{Addr: common.HexToAddress("0x5"), URL: "Member 5"},
{Addr: common.HexToAddress("0x6"), URL: "Member 6"},
}

var wg sync.WaitGroup
wg.Add(2)

ch := make(chan etherman.DataCommitteeMember)

go func() {
defer wg.Done()

for _, m := range members {
committee.Store(m)
ch <- m
}
close(ch)
}()

var actualMembers []etherman.DataCommitteeMember
go func() {
defer wg.Done()

for m := range ch {
member, ok := committee.Load(m.Addr)
require.True(t, ok)
actualMembers = append(actualMembers, member)
}
}()

wg.Wait()

require.Len(t, actualMembers, len(members))
for i, m := range members {
require.Equal(t, m, actualMembers[i])
}

// replace the single committee member
replacedMember := etherman.DataCommitteeMember{Addr: members[0].Addr, URL: "New Member 1"}
committee.Store(replacedMember)
require.Equal(t, len(members), committee.Length())
actualReplacedMember, exists := committee.Load(replacedMember.Addr)
require.True(t, exists)
require.Equal(t, replacedMember, actualReplacedMember)
// skip the first member, because it is replaced and already asserted
for i, m := range members[1:] {
require.Equal(t, m, actualMembers[i+1])
}
}

func TestDelete(t *testing.T) {
committee := NewCommitteeMapSafe()

member := etherman.DataCommitteeMember{Addr: common.HexToAddress("0x1"), URL: "Member 1"}

committee.Store(member)
committee.Delete(member.Addr)

_, ok := committee.Load(member.Addr)
require.False(t, ok)
}

func TestStoreBatch(t *testing.T) {
committee := NewCommitteeMapSafe()

members := []etherman.DataCommitteeMember{
{Addr: common.HexToAddress("0x1"), URL: "http://localhost:1001"},
{Addr: common.HexToAddress("0x2"), URL: "http://localhost:1002"},
{Addr: common.HexToAddress("0x3"), URL: "http://localhost:1003"},
}

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
committee.StoreBatch(members)
}()

wg.Wait()

for _, member := range members {
loadedMember, ok := committee.Load(member.Addr)
require.True(t, ok)
require.Equal(t, member, loadedMember)
}
}

func TestAsSlice(t *testing.T) {
committee := NewCommitteeMapSafe()
committee.StoreBatch(
[]etherman.DataCommitteeMember{
{Addr: common.HexToAddress("0x1"), URL: "Member 1"},
{Addr: common.HexToAddress("0x2"), URL: "Member 2"},
{Addr: common.HexToAddress("0x3"), URL: "Member 3"},
{Addr: common.HexToAddress("0x4"), URL: "Member 4"},
})

membersSlice := committee.AsSlice()

require.Equal(t, committee.Length(), len(membersSlice))
for _, member := range membersSlice {
foundMember, ok := committee.Load(member.Addr)
require.True(t, ok)
require.Equal(t, foundMember, member)
}
}

func TestLength(t *testing.T) {
committee := NewCommitteeMapSafe()

members := []etherman.DataCommitteeMember{
{Addr: common.HexToAddress("0x1"), URL: "http://localhost:1001"},
{Addr: common.HexToAddress("0x2"), URL: "http://localhost:1002"},
}

committee.StoreBatch(members)

require.Equal(t, len(members), committee.Length())
}

0 comments on commit be68a80

Please sign in to comment.