Skip to content

Commit

Permalink
TestPreCommitActions
Browse files Browse the repository at this point in the history
  • Loading branch information
timwu20 committed Jan 19, 2025
1 parent c302bd2 commit fdc527e
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 102 deletions.
168 changes: 85 additions & 83 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,113 +87,115 @@ func (c *Client[H, Hasher, N, E, Header]) unpin(message api.Unpin[H]) error {
}
}

func (c *Client[H, Hasher, N, E, Header]) LockImportRun(
func (c *Client[H, Hasher, N, E, Header]) lockImportRun(
f func(*api.ClientImportOperation[H, Hasher, N, Header, E]) error,
) error {
var inner = func() error {
c.backend.GetImportLock().Lock()
defer c.backend.GetImportLock().Unlock()
c.backend.GetImportLock().Lock()
defer c.backend.GetImportLock().Unlock()

blockImportOp, err := c.backend.BeginOperation()
if err != nil {
return err
}
blockImportOp, err := c.backend.BeginOperation()
if err != nil {
return err
}

clientImportOp := api.ClientImportOperation[H, Hasher, N, Header, E]{
Op: blockImportOp,
}
clientImportOp := api.ClientImportOperation[H, Hasher, N, Header, E]{
Op: blockImportOp,
}

err = f(&clientImportOp)
if err != nil {
return err
}
err = f(&clientImportOp)
if err != nil {
return err
}

var finalityNotification *api.FinalityNotification[H, N, Header]
if clientImportOp.NotifyFinalized != nil {
finalityNotification = api.NewFinalityNotificationFromSummary(*clientImportOp.NotifyFinalized, c.unpin)
}
var finalityNotification *api.FinalityNotification[H, N, Header]
if clientImportOp.NotifyFinalized != nil {
finalityNotification = api.NewFinalityNotificationFromSummary(*clientImportOp.NotifyFinalized, c.unpin)
}

var (
importNotification *api.BlockImportNotification[H, N, Header]
storageChanges *api.StorageChanges
importNotificationAction api.ImportNotificationAction
)
if clientImportOp.NotifyImported != nil {
importNotification = api.NewBlockImportNotificationFromSummary(*clientImportOp.NotifyImported, c.unpin)
storageChanges = clientImportOp.NotifyImported.StorageChanges
importNotificationAction = clientImportOp.NotifyImported.ImportNotificationAction
} else {
importNotificationAction = api.NoneBlockImportNotificationAction
}
var (
importNotification *api.BlockImportNotification[H, N, Header]
storageChanges *api.StorageChanges
importNotificationAction api.ImportNotificationAction
)
if clientImportOp.NotifyImported != nil {
importNotification = api.NewBlockImportNotificationFromSummary(*clientImportOp.NotifyImported, c.unpin)
storageChanges = clientImportOp.NotifyImported.StorageChanges
importNotificationAction = clientImportOp.NotifyImported.ImportNotificationAction
} else {
importNotificationAction = api.NoneBlockImportNotificationAction
}

if finalityNotification != nil {
c.finalityActionsMtx.Lock()
defer c.finalityActionsMtx.Unlock()
for _, action := range c.finalityActions {
err := clientImportOp.Op.InsertAux(action(*finalityNotification))
if err != nil {
return err
}
if finalityNotification != nil {
c.finalityActionsMtx.Lock()
defer c.finalityActionsMtx.Unlock()
for _, action := range c.finalityActions {
err := clientImportOp.Op.InsertAux(action(*finalityNotification))
if err != nil {
return err
}
}
if importNotification != nil {
c.importActionsMtx.Lock()
defer c.importActionsMtx.Unlock()
for _, action := range c.importActions {
err := clientImportOp.Op.InsertAux(action(*importNotification))
if err != nil {
return err
}
}
if importNotification != nil {
c.importActionsMtx.Lock()
defer c.importActionsMtx.Unlock()
for _, action := range c.importActions {
err := clientImportOp.Op.InsertAux(action(*importNotification))
if err != nil {
return err
}
}
}

err = c.backend.CommitOperation(clientImportOp.Op)
if err != nil {
return err
}
err = c.backend.CommitOperation(clientImportOp.Op)
if err != nil {
return err
}

// We need to pin the block in the backend once
// for each notification. Once all notifications are
// dropped, the block will be unpinned automatically.
if finalityNotification != nil {
err := c.backend.PinBlock(finalityNotification.Hash)
// We need to pin the block in the backend once
// for each notification. Once all notifications are
// dropped, the block will be unpinned automatically.
if finalityNotification != nil {
err := c.backend.PinBlock(finalityNotification.Hash)
if err != nil {
logger.Debugf("Unable to pin block for finality notification. hash: %s, Error: %v",
finalityNotification.Hash, err)
} else {
err := c.announcePin(api.AnnouncePin[H]{Hash: finalityNotification.Hash})
if err != nil {
logger.Debugf("Unable to pin block for finality notification. hash: %s, Error: %v",
finalityNotification.Hash, err)
} else {
err := c.announcePin(api.AnnouncePin[H]{Hash: finalityNotification.Hash})
if err != nil {
logger.Errorf("Unable to send AnnouncePin worker message for finality: %s", err)
}
logger.Errorf("Unable to send AnnouncePin worker message for finality: %s", err)
}
}
}

if importNotification != nil {
err := c.backend.PinBlock(importNotification.Hash)
if importNotification != nil {
err := c.backend.PinBlock(importNotification.Hash)
if err != nil {
logger.Debugf("Unable to pin block for import notification. hash: %s, Error: %v",
importNotification.Hash, err)
} else {
err := c.announcePin(api.AnnouncePin[H]{Hash: importNotification.Hash})
if err != nil {
logger.Debugf("Unable to pin block for import notification. hash: %s, Error: %v",
finalityNotification.Hash, err)
} else {
err := c.announcePin(api.AnnouncePin[H]{Hash: finalityNotification.Hash})
if err != nil {
logger.Errorf("Unable to send AnnouncePin worker message for import: %s", err)
}
logger.Errorf("Unable to send AnnouncePin worker message for import: %s", err)
}
}
}

err = c.notifyFinalized(finalityNotification)
if err != nil {
return err
}
err = c.notifyImported(importNotification, importNotificationAction, storageChanges)
if err != nil {
return err
}

return nil
err = c.notifyFinalized(finalityNotification)
if err != nil {
return err
}
err = c.notifyImported(importNotification, importNotificationAction, storageChanges)
if err != nil {
return err
}

err := inner()
return nil
}

func (c *Client[H, Hasher, N, E, Header]) LockImportRun(
f func(*api.ClientImportOperation[H, Hasher, N, Header, E]) error,
) error {
err := c.lockImportRun(f)
c.importingBlockMtx.Lock()
c.importingBlock = nil
c.importingBlockMtx.Unlock()
Expand Down
75 changes: 56 additions & 19 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func TestNew(t *testing.T) {
require.NotNil(t, c)
}

type BlockImportOperation = api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]] //nolint:lll
type FinalityNotification = api.FinalityNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]] //nolint:lll

func TestBlockchainEvents(t *testing.T) {
t.Run("register_unregister", func(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))
Expand Down Expand Up @@ -151,7 +154,7 @@ func TestBlockchainEvents(t *testing.T) {
_, ok := c.importNotificationChans[blockImport]
require.True(t, ok)

var blockImportNotifications []api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]
var blockImportNotifications []BlockImportOperation
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
Expand All @@ -161,7 +164,7 @@ func TestBlockchainEvents(t *testing.T) {
wg.Done()
}()

var everyImportNotifications []api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]
var everyImportNotifications []BlockImportOperation
everyImport := c.RegisterEveryImportNotificationStream()
require.NotNil(t, everyImport)
_, ok = c.everyImportNotificationChans[everyImport]
Expand All @@ -176,11 +179,11 @@ func TestBlockchainEvents(t *testing.T) {
}()

// sends to both
c.notifyImported(&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{}, api.BothBlockImportNotificationAction, nil)
c.notifyImported(&BlockImportOperation{}, api.BothBlockImportNotificationAction, nil)
// sends to import
c.notifyImported(&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{}, api.RecentBlockImportNotificationAction, nil)
c.notifyImported(&BlockImportOperation{}, api.RecentBlockImportNotificationAction, nil)
// sends to every
c.notifyImported(&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{}, api.EveryBlockImportNotificationAction, nil)
c.notifyImported(&BlockImportOperation{}, api.EveryBlockImportNotificationAction, nil)

c.UnregisterImportNotificationStream(blockImport)
_, ok = c.importNotificationChans[blockImport]
Expand Down Expand Up @@ -210,10 +213,7 @@ func TestBlockchainEvents(t *testing.T) {
}()

childStorage := c.StorageChangesNotificationStream([]storage.StorageKey{}, []api.ChildFilterKeys{
{
Key: storage.StorageKey("child0"),
FilterKeys: []storage.StorageKey{storage.StorageKey("child0")},
},
{Key: storage.StorageKey("child0"), FilterKeys: []storage.StorageKey{storage.StorageKey("child0")}},
})
wg.Add(1)
go func() {
Expand All @@ -224,10 +224,7 @@ func TestBlockchainEvents(t *testing.T) {
}()

wildCard := c.StorageChangesNotificationStream(nil, []api.ChildFilterKeys{
{
Key: storage.StorageKey("child0"),
FilterKeys: []storage.StorageKey{storage.StorageKey("child0")},
},
{Key: storage.StorageKey("child0"), FilterKeys: []storage.StorageKey{storage.StorageKey("child0")}},
})
wg.Add(1)
go func() {
Expand All @@ -239,11 +236,11 @@ func TestBlockchainEvents(t *testing.T) {

// sends to both
c.notifyImported(
&api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{},
&BlockImportOperation{},
api.BothBlockImportNotificationAction,
&api.StorageChanges{
StorageCollection: statemachine.StorageCollection{
{statemachine.StorageKey("top0"), statemachine.StorageValue("top0")},
{StorageKey: statemachine.StorageKey("top0"), StorageValue: statemachine.StorageValue("top0")},
},
ChildStorageCollection: []struct {
statemachine.StorageKey
Expand All @@ -252,7 +249,7 @@ func TestBlockchainEvents(t *testing.T) {
{
StorageKey: statemachine.StorageKey("child0"),
StorageCollection: statemachine.StorageCollection{
{statemachine.StorageKey("child0"), statemachine.StorageValue("child0")},
{StorageKey: statemachine.StorageKey("child0"), StorageValue: statemachine.StorageValue("child0")},
},
},
},
Expand All @@ -262,7 +259,6 @@ func TestBlockchainEvents(t *testing.T) {
wg.Wait()
topStorage.Drop()
childStorage.Drop()

})

t.Run("register_receive_finality_unregister", func(t *testing.T) {
Expand All @@ -272,7 +268,7 @@ func TestBlockchainEvents(t *testing.T) {
_, ok := c.finalityNotificationChans[finality]
require.True(t, ok)

var finalityNotifications []api.FinalityNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]
var finalityNotifications []FinalityNotification
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
Expand All @@ -282,7 +278,7 @@ func TestBlockchainEvents(t *testing.T) {
wg.Done()
}()

c.notifyFinalized(&api.FinalityNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{})
c.notifyFinalized(&FinalityNotification{})

c.UnregisterFinalityNotificationStream(finality)
_, ok = c.finalityNotificationChans[finality]
Expand All @@ -293,3 +289,44 @@ func TestBlockchainEvents(t *testing.T) {
require.Len(t, finalityNotifications, 1)
})
}

type ClientImportOperation = api.ClientImportOperation[
hash.H256,
runtime.BlakeTwo256,
uint64,
*generic.Header[uint64, hash.H256, runtime.BlakeTwo256],
rt_testing.ExtrinsicsWrapper[uint64],
]

func TestLockImportRun(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))
err := c.LockImportRun(func(cio *ClientImportOperation) error {
return nil
})
require.NoError(t, err)
}

func TestPreCommitActions(t *testing.T) {
t.Run("register_import_and_finality_actions", func(t *testing.T) {
c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0))

var count int
c.RegisterImportAction(func(bin BlockImportOperation) api.AuxDataOperations {
count++
return api.AuxDataOperations{}
})
c.RegisterFinalityAction(func(fn FinalityNotification) api.AuxDataOperations {
count++
return api.AuxDataOperations{}
})

err := c.LockImportRun(func(cio *ClientImportOperation) error {
cio.NotifyFinalized = &api.FinalizeSummary[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{} //nolint:lll
cio.NotifyImported = &api.ImportSummary[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{} //nolint:lll
return nil
})
require.NoError(t, err)

require.Equal(t, 2, count)
})
}

0 comments on commit fdc527e

Please sign in to comment.