Skip to content

Commit

Permalink
TestBlockchainEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
timwu20 committed Jan 19, 2025
1 parent 06b74e4 commit c302bd2
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 11 deletions.
4 changes: 2 additions & 2 deletions internal/client/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ type ImportNotifications[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] chan<- BlockImportNotification[H, N, Header]
] chan BlockImportNotification[H, N, Header]

// FinalityNotifications is a channel of block finality notifications.
type FinalityNotifications[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] chan<- FinalityNotification[H, N, Header]
] chan FinalityNotification[H, N, Header]

// BlockchainEvents is the source of blockchain events.
type BlockchainEvents[
Expand Down
18 changes: 9 additions & 9 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ type Client[
backend api.Backend[H, N, Hasher, Header, E]
storageNotifications api.StorageNotifications[H]
importNotificationChansMtx sync.Mutex
importNotificationChans map[chan<- api.BlockImportNotification[H, N, Header]]any
importNotificationChans map[chan api.BlockImportNotification[H, N, Header]]any
everyImportNotificationChansMtx sync.Mutex
everyImportNotificationChans map[chan<- api.BlockImportNotification[H, N, Header]]any
everyImportNotificationChans map[chan api.BlockImportNotification[H, N, Header]]any
finalityNotificationChansMtx sync.Mutex
finalityNotificationChans map[chan<- api.FinalityNotification[H, N, Header]]any
finalityNotificationChans map[chan api.FinalityNotification[H, N, Header]]any
// Collects auxiliary operations to be performed atomically together with block import operations.
importActionsMtx sync.Mutex
importActions []api.OnImportAction[H, N, Header]
Expand Down Expand Up @@ -62,9 +62,9 @@ func New[
return &Client[H, Hasher, N, E, Header]{
backend: backend,
storageNotifications: api.NewStorageNotifications[H](),
importNotificationChans: make(map[chan<- api.BlockImportNotification[H, N, Header]]any),
everyImportNotificationChans: make(map[chan<- api.BlockImportNotification[H, N, Header]]any),
finalityNotificationChans: make(map[chan<- api.FinalityNotification[H, N, Header]]any),
importNotificationChans: make(map[chan api.BlockImportNotification[H, N, Header]]any),
everyImportNotificationChans: make(map[chan api.BlockImportNotification[H, N, Header]]any),
finalityNotificationChans: make(map[chan api.FinalityNotification[H, N, Header]]any),
unpinWorkerChan: unpinWorkerChan,
}
}
Expand Down Expand Up @@ -238,11 +238,11 @@ func (c *Client[H, Hasher, N, E, Header]) notifyFinalized(notification *api.Fina
return nil
}

func notifyChans[M any](msg M, chans map[chan<- M]any, timeout time.Duration) {
func notifyChans[M any](msg M, chans map[chan M]any, timeout time.Duration) {
wg := sync.WaitGroup{}
for ch := range chans {
wg.Add(1)
go func(ch chan<- M) {
go func(ch chan M) {
defer wg.Done()
select {
case ch <- msg:
Expand Down Expand Up @@ -272,7 +272,7 @@ func (c *Client[H, Hasher, N, E, Header]) notifyImported(
importNotificationAction api.ImportNotificationAction,
storageChanges *api.StorageChanges,
) error {
if notification != nil {
if notification == nil {
return nil
}

Expand Down
265 changes: 265 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,21 @@
package client

import (
"sync"
"testing"

"github.com/ChainSafe/gossamer/internal/client/api"
"github.com/ChainSafe/gossamer/internal/client/db"
statedb "github.com/ChainSafe/gossamer/internal/client/state-db"
memorykvdb "github.com/ChainSafe/gossamer/internal/kvdb/memory-kvdb"
"github.com/ChainSafe/gossamer/internal/primitives/core/hash"
"github.com/ChainSafe/gossamer/internal/primitives/database"
"github.com/ChainSafe/gossamer/internal/primitives/runtime"
"github.com/ChainSafe/gossamer/internal/primitives/runtime/generic"
rt_testing "github.com/ChainSafe/gossamer/internal/primitives/runtime/testing"
statemachine "github.com/ChainSafe/gossamer/internal/primitives/state-machine"
"github.com/ChainSafe/gossamer/internal/primitives/storage"
"github.com/stretchr/testify/require"
)

type noopExtrinsic struct{}
Expand All @@ -28,3 +39,257 @@ var (
_ api.BlockchainEvents[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]] = &TestClient{}
_ api.PreCommitActions[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]] = &TestClient{}
)

func NewTestBackend(t *testing.T,
blocksPruning db.BlocksPruning, canonicalizationDelay uint64,
) api.Backend[
hash.H256,
uint64,
runtime.BlakeTwo256,
*generic.Header[uint64, hash.H256, runtime.BlakeTwo256],
rt_testing.ExtrinsicsWrapper[uint64],
] {
t.Helper()

kvdb := memorykvdb.New(13)
var statePruning statedb.PruningMode
switch blocksPruning := blocksPruning.(type) {
case db.BlocksPruningKeepAll:
statePruning = statedb.PruningModeArchiveAll{}
case db.BlocksPruningKeepFinalized:
statePruning = statedb.PruningModeArchiveCanonical{}
case db.BlocksPruningSome:
statePruning = statedb.NewPruningModeConstrained(uint32(blocksPruning))
default:
t.Fatalf("unreachable")
}
trieCacheMaxSize := uint(16 * 1024 * 1024)
dbSetting := db.DatabaseConfig{
TrieCacheMaximumSize: &trieCacheMaxSize,
StatePruning: statePruning,
Source: db.DatabaseSource{DB: database.NewDBAdapter[hash.H256](kvdb), RequireCreateFlag: true},
BlocksPruning: blocksPruning,
}

backend, err := db.NewBackend[
hash.H256,
uint64,
rt_testing.ExtrinsicsWrapper[uint64],
runtime.BlakeTwo256,
*generic.Header[uint64, hash.H256, runtime.BlakeTwo256],
](dbSetting, canonicalizationDelay)
if err != nil {
panic(err)
}
return backend
}

func TestNew(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))
require.NotNil(t, c)
}

func TestBlockchainEvents(t *testing.T) {
t.Run("register_unregister", func(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))
blockImport := c.RegisterImportNotificationStream()
require.NotNil(t, blockImport)
_, ok := c.importNotificationChans[blockImport]
require.True(t, ok)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for range blockImport {
}
wg.Done()
}()

everyImport := c.RegisterEveryImportNotificationStream()
require.NotNil(t, everyImport)
_, ok = c.everyImportNotificationChans[everyImport]
require.True(t, ok)

wg.Add(1)
go func() {
for range everyImport {
}
wg.Done()
}()

finality := c.RegisterFinalityNotificationStream()
require.NotNil(t, finality)
_, ok = c.finalityNotificationChans[finality]
require.True(t, ok)

wg.Add(1)
go func() {
for range finality {
}
wg.Done()
}()

c.UnregisterImportNotificationStream(blockImport)
_, ok = c.importNotificationChans[blockImport]
require.False(t, ok)

c.UnregisterEveryImportNotificationStream(everyImport)
_, ok = c.everyImportNotificationChans[everyImport]
require.False(t, ok)

c.UnregisterFinalityNotificationStream(finality)
_, ok = c.finalityNotificationChans[finality]
require.False(t, ok)

wg.Wait()
})

t.Run("register_receive_block_import_unregister", func(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))
blockImport := c.RegisterImportNotificationStream()
require.NotNil(t, blockImport)
_, ok := c.importNotificationChans[blockImport]
require.True(t, ok)

var blockImportNotifications []api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for notif := range blockImport {
blockImportNotifications = append(blockImportNotifications, notif)
}
wg.Done()
}()

var everyImportNotifications []api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]
everyImport := c.RegisterEveryImportNotificationStream()
require.NotNil(t, everyImport)
_, ok = c.everyImportNotificationChans[everyImport]
require.True(t, ok)

wg.Add(1)
go func() {
for notif := range everyImport {
everyImportNotifications = append(everyImportNotifications, notif)
}
wg.Done()
}()

// sends to both
c.notifyImported(&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{}, api.BothBlockImportNotificationAction, nil)
// sends to import
c.notifyImported(&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{}, api.RecentBlockImportNotificationAction, nil)
// sends to every
c.notifyImported(&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{}, api.EveryBlockImportNotificationAction, nil)

c.UnregisterImportNotificationStream(blockImport)
_, ok = c.importNotificationChans[blockImport]
require.False(t, ok)

c.UnregisterEveryImportNotificationStream(everyImport)
_, ok = c.everyImportNotificationChans[everyImport]
require.False(t, ok)

wg.Wait()

require.Len(t, blockImportNotifications, 2)
require.Len(t, everyImportNotifications, 2)
})

t.Run("register_receive_block_import_storage_changes_unregister", func(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))
wg := sync.WaitGroup{}

topStorage := c.StorageChangesNotificationStream(nil, []api.ChildFilterKeys{})
wg.Add(1)
go func() {
msg := <-topStorage.Chan()
require.Len(t, msg.Changes, 1)
require.Len(t, msg.ChildChanges, 0)
wg.Done()
}()

childStorage := c.StorageChangesNotificationStream([]storage.StorageKey{}, []api.ChildFilterKeys{
{
Key: storage.StorageKey("child0"),
FilterKeys: []storage.StorageKey{storage.StorageKey("child0")},
},
})
wg.Add(1)
go func() {
msg := <-childStorage.Chan()
require.Len(t, msg.Changes, 0)
require.Len(t, msg.ChildChanges, 1)
wg.Done()
}()

wildCard := c.StorageChangesNotificationStream(nil, []api.ChildFilterKeys{
{
Key: storage.StorageKey("child0"),
FilterKeys: []storage.StorageKey{storage.StorageKey("child0")},
},
})
wg.Add(1)
go func() {
msg := <-wildCard.Chan()
require.Len(t, msg.Changes, 1)
require.Len(t, msg.ChildChanges, 1)
wg.Done()
}()

// sends to both
c.notifyImported(
&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{},
api.BothBlockImportNotificationAction,
&api.StorageChanges{
StorageCollection: statemachine.StorageCollection{
{statemachine.StorageKey("top0"), statemachine.StorageValue("top0")},
},
ChildStorageCollection: []struct {
statemachine.StorageKey
statemachine.StorageCollection
}{
{
StorageKey: statemachine.StorageKey("child0"),
StorageCollection: statemachine.StorageCollection{
{statemachine.StorageKey("child0"), statemachine.StorageValue("child0")},
},
},
},
},
)

wg.Wait()
topStorage.Drop()
childStorage.Drop()

})

t.Run("register_receive_finality_unregister", func(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))
finality := c.RegisterFinalityNotificationStream()
require.NotNil(t, finality)
_, ok := c.finalityNotificationChans[finality]
require.True(t, ok)

var finalityNotifications []api.FinalityNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for notif := range finality {
finalityNotifications = append(finalityNotifications, notif)
}
wg.Done()
}()

c.notifyFinalized(&api.FinalityNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{})

c.UnregisterFinalityNotificationStream(finality)
_, ok = c.finalityNotificationChans[finality]
require.False(t, ok)

wg.Wait()

require.Len(t, finalityNotifications, 1)
})
}

0 comments on commit c302bd2

Please sign in to comment.