diff --git a/internal/client/api/backend.go b/internal/client/api/backend.go index 173722362b..ea2b44d243 100644 --- a/internal/client/api/backend.go +++ b/internal/client/api/backend.go @@ -6,6 +6,7 @@ package api import ( "sync" + "github.com/ChainSafe/gossamer/internal/client/consensus" "github.com/ChainSafe/gossamer/internal/primitives/blockchain" "github.com/ChainSafe/gossamer/internal/primitives/core/offchain" "github.com/ChainSafe/gossamer/internal/primitives/runtime" @@ -13,6 +14,74 @@ import ( "github.com/ChainSafe/gossamer/internal/primitives/storage" ) +// ImportNotificationAction describes which block import notification stream should be notified. +type ImportNotificationAction uint + +const ( + // RecentBlockImportNotificationAction notifies only when the node has synced to the tip or there is a re-org. + RecentBlockImportNotificationAction ImportNotificationAction = iota + // EveryBlockImportNotificationAction notifies for every single block no matter what the sync state is. + EveryBlockImportNotificationAction + // BothBlockImportNotificationAction means both [RecentBlockImportNotificationAction] and + // [EveryBlockImportNotificationAction] should be fired. + BothBlockImportNotificationAction + // NoneBlockImportNotificationAction means no block import notification should be fired. + NoneBlockImportNotificationAction +) + +// StorageChanges contains a [statemachine.StorageCollection] and [statemachine.ChildStorageCollection] +type StorageChanges struct { + statemachine.StorageCollection + statemachine.ChildStorageCollection +} + +// ImportSummary contains information about the block that just got imported, +// including storage changes, reorged blocks, etc. +type ImportSummary[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] struct { + Hash H // Block hash of the imported block. + Origin consensus.BlockOrigin // Import origin. + Header Header // Header of the imported block. + IsNewBest bool // Is this block a new best block. + StorageChanges *StorageChanges // Optional storage changes. + // TreeRoute from old best to new best. + // If nil, there was no re-org while importing. + TreeRoute *blockchain.TreeRoute[H, N] + ImportNotificationAction ImportNotificationAction // Which notify action to take for this import. +} + +// FinalizeSummary contains information about the block that just got finalized, including tree heads that became +// stale at the moment of finalization. +type FinalizeSummary[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] struct { + // Last finalized block header. + Header Header + // Blocks that were finalized. + // The last entry is the one that has been explicitly finalized. + Finalized []H + // Heads that became stale during this finalization operation. + StaleHeads []H +} + +// ClientImportOperation is an import operation wrapper. +type ClientImportOperation[ + H runtime.Hash, + Hasher runtime.Hasher[H], + N runtime.Number, + Header runtime.Header[N, H], + E runtime.Extrinsic, +] struct { + Op BlockImportOperation[N, H, Hasher, Header, E] // DB Operation. + NotifyImported *ImportSummary[H, N, Header] // Summary of imported block. + NotifyFinalized *FinalizeSummary[H, N, Header] // Summary of finalized block. +} + // NewBlockState is the state of a new block. type NewBlockState uint8 @@ -87,6 +156,21 @@ type BlockImportOperation[ UpdateTransactionIndex(index []statemachine.IndexOperation) error } +// LockImportRun is the interface for performing operations on the backend. +type LockImportRun[ + H runtime.Hash, + N runtime.Number, + Hasher runtime.Hasher[H], + Header runtime.Header[N, H], + E runtime.Extrinsic, +] interface { + /// LockImportRun locks the import lock, and run operations inside. + LockImportRun( + f func(*ClientImportOperation[H, Hasher, N, Header, E]) error, + ) error +} + +// KeyValue is used in [AuxStore.InsertAux]. Key and Value should not be nil. type KeyValue struct { Key []byte Value []byte diff --git a/internal/client/api/client.go b/internal/client/api/client.go index 133bd2db20..efb0d16958 100644 --- a/internal/client/api/client.go +++ b/internal/client/api/client.go @@ -3,7 +3,67 @@ package api -// AuxDataOperation is a slice of operations to be performed on storage aux data. +import ( + "github.com/ChainSafe/gossamer/internal/client/consensus" + "github.com/ChainSafe/gossamer/internal/primitives/blockchain" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/ChainSafe/gossamer/internal/primitives/storage" +) + +// ImportNotifications is a channel of block import events. +type ImportNotifications[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] chan BlockImportNotification[H, N, Header] + +// FinalityNotifications is a channel of block finality notifications. +type FinalityNotifications[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] chan FinalityNotification[H, N, Header] + +// BlockchainEvents is the source of blockchain events. +type BlockchainEvents[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] interface { + // RegisterImportNotificationStream retrieves a channel of block import events. + // + // Not guaranteed to be fired for every imported block. Use + // [RegisterEveryImportNotificationStream] if you want a notification of every imported block + // regardless. + // + // The events for this notification stream are emitted: + // - During initial sync process: if there is a re-org while importing blocks. + // - After initial sync process: on every imported block, regardless of whether it is + // the new best block or not, or if it causes a re-org or not. + RegisterImportNotificationStream() ImportNotifications[H, N, Header] + // UnregisterImportNotificationStream will unregister a registered channel. + UnregisterImportNotificationStream(ImportNotifications[H, N, Header]) + + // RegisterEveryImportNotificationStream retrieves a channel of block import events for every imported block. + RegisterEveryImportNotificationStream() ImportNotifications[H, N, Header] + // UnregisterEveryImportNotificationStream will unregister a registered channel. + UnregisterEveryImportNotificationStream(ImportNotifications[H, N, Header]) + + // RegisterFinalityNotificationStream will get a channel of finality notifications. Not guaranteed to be fired for + //every finalized block. + RegisterFinalityNotificationStream() FinalityNotifications[H, N, Header] + // UnregisterFinalityNotificationStream will unregister a registered channel. + UnregisterFinalityNotificationStream(FinalityNotifications[H, N, Header]) + + // StorageChangesNotificationStream retrieves a storage changes event stream. + // Passing nil for filterKeys subscribes to all storage changes. + StorageChangesNotificationStream( + filterKeys []storage.StorageKey, + childFilterKeys []ChildFilterKeys, + ) StorageEventStream[H] +} + +// AuxDataOperation is an operation to be performed on storage aux data. // Key is the encoded data key. // Value is the encoded optional data to write. // If Value is nil, the key and the associated data are deleted from storage. @@ -11,4 +71,163 @@ type AuxDataOperation struct { Key []byte Data []byte } + +// AuxDataOperations is a slice of [AuxDataOperation] to be performed on storage aux data. type AuxDataOperations []AuxDataOperation + +// OnImportAction is a callback invoked before committing the operations created during block import. +// This gives the opportunity to perform auxiliary pre-commit actions and optionally +// enqueue further storage write operations to be atomically performed on commit. +type OnImportAction[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] func(BlockImportNotification[H, N, Header]) AuxDataOperations + +// OnFinalityAction is a callback invoked before committing the operations created during block finalization. +// This gives the opportunity to perform auxiliary pre-commit actions and optionally +// enqueue further storage write operations to be atomically performed on commit. +type OnFinalityAction[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] func(FinalityNotification[H, N, Header]) AuxDataOperations + +// PreCommitActions is the interface to perform auxiliary actions before committing a block import or +// finality operation. +type PreCommitActions[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] interface { + RegisterImportAction(op OnImportAction[H, N, Header]) // Actions to be performed on block import. + RegisterFinalityAction(op OnFinalityAction[H, N, Header]) // Actions to be performed on block finalization. +} + +// Sends a message to the pinning-worker once dropped to unpin a block in the backend. +type unpinHandleInner[H runtime.Hash] struct { + hash H // Hash of the block pinned by this handle + unpin func(message Unpin[H]) error +} + +func (uhi unpinHandleInner[H]) Drop() { + err := uhi.unpin(Unpin[H]{uhi.hash}) + if err != nil { + logger.Debugf("Unable to unpin block with hash: %s, error: %v", uhi.hash, err) + } +} + +// UnpinWorkerMessage is the message that signals notification-based pinning actions to the pinning-worker. +// When the notification is dropped, an [Unpin] message should be sent to the worker. +type UnpinWorkerMessage[H runtime.Hash] interface { + isUnpinWorkerMessage() +} + +// AnnouncePin should be sent when a import or finality notification is created. +type AnnouncePin[H runtime.Hash] struct { + Hash H +} + +// Unpin should be sent when a import or finality notification is dropped. +type Unpin[H runtime.Hash] struct { + Hash H +} + +func (AnnouncePin[H]) isUnpinWorkerMessage() {} +func (Unpin[H]) isUnpinWorkerMessage() {} + +// UnpinHandle keeps a specific block pinned while the handle is alive. +// Once the last handle instance for a given block is dropped, the +// block is unpinned in the [Backend]. +type UnpinHandle[H runtime.Hash] struct { + unpinHandleInner[H] +} + +// NewUnpinHandle is constructor for [UnpinHandle]. +func NewUnpinHandle[H runtime.Hash](hash H, unpin func(message Unpin[H]) error) UnpinHandle[H] { + return UnpinHandle[H]{ + unpinHandleInner[H]{ + hash: hash, + unpin: unpin, + }, + } +} + +func (up UnpinHandle[H]) Hash() H { + return up.hash +} + +// BlockImportNotification is the summary of an imported block. +type BlockImportNotification[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] struct { + Hash H // Imported block header hash. + Origin consensus.BlockOrigin // Imported block origin. + Header Header // Imported block header. + IsNewBest bool // Is this the new best block. + // TreeRoute from old best to new best. If nil, there was no re-org while importing. + TreeRoute *blockchain.TreeRoute[H, N] + unpinHandle UnpinHandle[H] // Handle to unpin the block this notification is associated with. +} + +// Drop will unpin the block from the backend. +func (bin BlockImportNotification[H, N, Header]) Drop() { + bin.unpinHandle.Drop() +} + +// NewBlockImportNotificationFromSummary is constructor of [BlockImportNotification] given an [ImportSummary]. +func NewBlockImportNotificationFromSummary[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +](summary ImportSummary[H, N, Header], unpin func(message Unpin[H]) error) *BlockImportNotification[H, N, Header] { + return &BlockImportNotification[H, N, Header]{ + Hash: summary.Hash, + Origin: summary.Origin, + Header: summary.Header, + IsNewBest: summary.IsNewBest, + TreeRoute: summary.TreeRoute, + unpinHandle: NewUnpinHandle[H](summary.Hash, unpin), + } +} + +// FinalityNotification is the summary of a finalized block. +type FinalityNotification[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +] struct { + Hash H // Finalized block header hash. + Header Header // Finalized block header. + // TreeRoute path from the old finalized to new finalized parent (implicitly finalized blocks). + // This maps to the range [oldFinalized, ...newFinalized]. + TreeRoute []H + StaleHeads []H // Stale branches heads. + unpinHandle UnpinHandle[H] // Handle to unpin the block this notification associated with. +} + +// Drop will unpin the block from the backend. +func (fn FinalityNotification[H, N, Header]) Drop() { + fn.unpinHandle.Drop() +} + +// NewFinalityNotificationFromSummary is constructor of [FinalityNotification] given an [FinalizeSummary]. +func NewFinalityNotificationFromSummary[ + H runtime.Hash, + N runtime.Number, + Header runtime.Header[N, H], +](summary FinalizeSummary[H, N, Header], unpin func(message Unpin[H]) error) *FinalityNotification[H, N, Header] { + var hash H + if len(summary.Finalized) > 0 { + hash = summary.Finalized[len(summary.Finalized)-1] + } + return &FinalityNotification[H, N, Header]{ + Hash: hash, + Header: summary.Header, + TreeRoute: summary.Finalized, + StaleHeads: summary.StaleHeads, + unpinHandle: NewUnpinHandle[H](hash, unpin), + } +} diff --git a/internal/client/api/notifications.go b/internal/client/api/notifications.go new file mode 100644 index 0000000000..feb4d34cf2 --- /dev/null +++ b/internal/client/api/notifications.go @@ -0,0 +1,89 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package api + +import ( + "github.com/ChainSafe/gossamer/internal/client/utils/pubsub" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/ChainSafe/gossamer/internal/primitives/storage" +) + +// StorageNotification is the message type delivered to subscribers. +type StorageNotification[H runtime.Hash] struct { + Block H // The hash of the block + StorageChangeSet // The set of changes +} + +// StorageChange is a helper struct that contains a [storage.StorageKey] and [storage.StorageData]. +// A nil for StorageData represents that the key should be deleted. +type StorageChange struct { + storage.StorageKey + storage.StorageData // can be nil +} + +// StorageChildChange is a helper struct that contains a [storage.StorageKey] that represents the child key, +// and a changeset which is a slice of [StorageChange]. +type StorageChildChange struct { + storage.StorageKey + ChangeSet []StorageChange +} + +// StorageChangeSet is a type that represents a storage changeset. +type StorageChangeSet struct { + // changes: Arc<[(StorageKey, Option)]>, + Changes []StorageChange + // child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option)>)]>, + ChildChanges []StorageChildChange + // filter: Keys, + Filter Keys + // child_filters: ChildKeys, + ChildFilters ChildKeys +} + +type Keys map[string]any // can be nil +type ChildKeys map[string]map[string]any // can be nil + +// StorageNotifications manages storage listeners. +type StorageNotifications[H runtime.Hash] struct { + *pubsub.Hub[SubscribeOp, SubscriberMessage[H], StorageNotification[H], *registry[H]] +} + +// NewStorageNotifications is constructor for [StorageNotifications]. +func NewStorageNotifications[H runtime.Hash]() StorageNotifications[H] { + registry := newRegistry[H]() + hub := pubsub.NewHub("mpsc_storage_notification_items", registry) + return StorageNotifications[H]{ + Hub: hub, + } +} + +// Trigger notification to all listeners. +// Note the changes are going to be filtered by listener's filter key. +// In fact no event might be sent if clients are not interested in the changes. +func (s StorageNotifications[H]) Trigger(hash H, changeset []StorageChange, childChangeSet []StorageChildChange) { + s.Hub.Send(SubscriberMessage[H]{ + Hash: hash, + ChangeSet: changeset, + ChildChangeSet: childChangeSet, + }) +} + +// Listen will start listening for particular storage keys. +func (s StorageNotifications[H]) Listen( + filterKeys []storage.StorageKey, + filterChildKeys []ChildFilterKeys, +) StorageEventStream[H] { + receiver := s.Hub.Subscribe(SubscribeOp{ + FilterKeys: filterKeys, + FilterChildKeys: filterChildKeys, + }) + return StorageEventStream[H]{ + Receiver: receiver, + } +} + +// StorageEventStream is the receiving side of storage change events. +type StorageEventStream[H runtime.Hash] struct { + *pubsub.Receiver[StorageNotification[H], *registry[H]] +} diff --git a/internal/client/api/registry.go b/internal/client/api/registry.go new file mode 100644 index 0000000000..1263864331 --- /dev/null +++ b/internal/client/api/registry.go @@ -0,0 +1,336 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package api + +import ( + "maps" + + "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/ChainSafe/gossamer/internal/primitives/storage" +) + +var logger = log.NewFromGlobal(log.AddContext("pkg", "client/api")) + +// ChildFilterKeys is a helper struct with child key and associated filter keys +type ChildFilterKeys struct { + Key storage.StorageKey + FilterKeys []storage.StorageKey // can be nil +} + +// SubscribeOp is a a command to subscribe with the specified filters. +type SubscribeOp struct { + FilterKeys []storage.StorageKey // can be nil + FilterChildKeys []ChildFilterKeys +} + +// SubscriberMessage is the message type returned based on a [SubscribeOp] +type SubscriberMessage[H any] struct { + Hash H + ChangeSet []StorageChange + ChildChangeSet []StorageChildChange +} + +type subscriberSink struct { + subsID uint64 + keys Keys + childKeys ChildKeys + wasTriggered bool +} + +type registry[H runtime.Hash] struct { + wildcardListeners map[uint64]any + listeners map[string]map[uint64]any + childListeners map[string]struct { + cListeners map[string]map[uint64]any + cWildcards map[uint64]any + } + sinks map[uint64]subscriberSink +} + +func newRegistry[H runtime.Hash]() *registry[H] { + return ®istry[H]{ + wildcardListeners: make(map[uint64]any), + listeners: make(map[string]map[uint64]any), + childListeners: make(map[string]struct { + cListeners map[string]map[uint64]any + cWildcards map[uint64]any + }), + sinks: make(map[uint64]subscriberSink), + } +} + +func (r *registry[H]) Subscribe(subsOp SubscribeOp, subsID uint64) { + keys := r.listenFrom(subsID, subsOp.FilterKeys, r.listeners, r.wildcardListeners) + + var childKeys map[string]map[string]any = nil + if subsOp.FilterChildKeys != nil { + for _, fck := range subsOp.FilterChildKeys { + cKey := fck.Key + oKeys := fck.FilterKeys + + _, ok := r.childListeners[string(cKey)] + if !ok { + r.childListeners[string(cKey)] = struct { + cListeners map[string]map[uint64]any + cWildcards map[uint64]any + }{ + cListeners: make(map[string]map[uint64]any), + cWildcards: make(map[uint64]any), + } + } + if childKeys == nil { + childKeys = make(map[string]map[string]any) + } + childKeys[string(cKey)] = r.listenFrom( + subsID, + oKeys, + r.childListeners[string(cKey)].cListeners, + r.childListeners[string(cKey)].cWildcards, + ) + } + } + + // TODO: metrics are added here see substrate code: + // /~https://github.com/paritytech/polkadot-sdk/blob/bc53b9a03a742f8b658806a01a7bf853cb9a86cd/substrate/client/api/src/notifications/registry.rs#L136 + + _, ok := r.sinks[subsID] + if ok { + logger.Warnf("subscribe has been passed a non-unique subsID") + } + r.sinks[subsID] = subscriberSink{ + subsID: subsID, + keys: keys, + childKeys: childKeys, + wasTriggered: false, + } +} + +func (r *registry[H]) Unsubscribe(subsID uint64) { + r.removeSubscriber(subsID) +} + +func (r *registry[H]) removeSubscriber(subscriber uint64) *struct { + Keys + ChildKeys +} { + sink, ok := r.sinks[subscriber] + if !ok { + return nil + } + delete(r.sinks, subscriber) + + r.removeSubscriberFrom(subscriber, sink.keys, r.listeners, r.wildcardListeners) + if sink.childKeys != nil { + for cKey, filters := range sink.childKeys { + _, ok := r.childListeners[cKey] + if ok { + r.removeSubscriberFrom( + subscriber, + filters, + r.childListeners[cKey].cListeners, + r.childListeners[cKey].cWildcards, + ) + } + + if len(r.childListeners[cKey].cListeners) == 0 && len(r.childListeners[cKey].cWildcards) == 0 { + delete(r.childListeners, cKey) + } + + } + } + + // TODO: metrics are added here see substrate code: + // /~https://github.com/paritytech/polkadot-sdk/blob/bc53b9a03a742f8b658806a01a7bf853cb9a86cd/substrate/client/api/src/notifications/registry.rs#L285 + + return &struct { + Keys + ChildKeys + }{ + Keys: sink.keys, + ChildKeys: sink.childKeys, + } +} + +func (r *registry[H]) removeSubscriberFrom( + subscriber uint64, + filters Keys, + listeners map[string]map[uint64]any, + wildcards map[uint64]any, +) { + if filters == nil { + delete(wildcards, subscriber) + } else { + for key := range filters { + var removeKey bool + _, ok := listeners[key] + if ok { + delete(listeners[key], subscriber) + removeKey = len(listeners[key]) == 0 + } else { + removeKey = false + } + + if removeKey { + delete(listeners, key) + } + } + } +} + +func (r *registry[H]) listenFrom( + currentID uint64, + filterKeys []storage.StorageKey, + listeners map[string]map[uint64]any, + wildcards map[uint64]any, +) Keys { + if filterKeys == nil { + wildcards[currentID] = nil + return nil + } + keys := make(Keys) + for _, key := range filterKeys { + _, ok := listeners[string(key)] + if !ok { + listeners[string(key)] = make(map[uint64]any) + } + listeners[string(key)][currentID] = nil + keys[string(key)] = nil + } + return keys +} + +func (r *registry[H]) Dispatch(message SubscriberMessage[H], dispatch func(uint64, StorageNotification[H])) { + r.trigger(message.Hash, message.ChangeSet, message.ChildChangeSet, dispatch) +} + +func (r *registry[H]) trigger( //nolint:gocyclo + hash H, + changeset []StorageChange, + childChangeSet []StorageChildChange, + dispatch func(uint64, StorageNotification[H]), +) { + hasWildcard := len(r.wildcardListeners) != 0 + + // early exit if no listeners + if !hasWildcard && len(r.listeners) == 0 && len(r.childListeners) == 0 { + return + } + + subscribers := maps.Clone(r.wildcardListeners) + var changes []StorageChange + var childChanges []StorageChildChange + + // collect subscribers and changes + for _, change := range changeset { + listeners, ok := r.listeners[string(change.StorageKey)] + if ok { + for listener := range listeners { + subscribers[listener] = nil + } + } + + if hasWildcard || len(listeners) > 0 { + changes = append(changes, change) + } + } + for _, childChange := range childChangeSet { + childListener, ok := r.childListeners[string(childChange.StorageKey)] + if ok { + var changes []StorageChange + for _, change := range childChange.ChangeSet { + listeners, ok := childListener.cListeners[string(change.StorageKey)] + + if ok { + for listener := range listeners { + subscribers[listener] = nil + } + } + + for listener := range childListener.cWildcards { + subscribers[listener] = nil + } + + if len(childListener.cWildcards) > 0 || len(listeners) > 0 { + changes = append(changes, change) + } + } + if len(changes) > 0 { + childChanges = append(childChanges, StorageChildChange{ + StorageKey: childChange.StorageKey, + ChangeSet: changes, + }) + } + } + } + + // Don't send empty notifications + if len(changes) == 0 && len(childChanges) == 0 { + return + } + + // Trigger the events + for subsID, sink := range r.sinks { + _, ok := subscribers[subsID] + if !ok { + continue + } + + sink.wasTriggered = true + r.sinks[subsID] = sink + + var ( + filteredChanges []StorageChange + filteredChildChanges []StorageChildChange + ) + + if sink.keys != nil { + for _, change := range changes { + _, ok := sink.keys[string(change.StorageKey)] + if ok { + filteredChanges = append(filteredChanges, change) + } + } + } else { + filteredChanges = changes + } + + if sink.childKeys != nil { + for _, childChange := range childChanges { + filter, ok := sink.childKeys[string(childChange.StorageKey)] + if ok { + filteredChildChange := StorageChildChange{ + StorageKey: childChange.StorageKey, + ChangeSet: nil, + } + for _, change := range childChange.ChangeSet { + if filter == nil { + filteredChildChange.ChangeSet = append(filteredChildChange.ChangeSet, change) + } else { + _, ok := filter[string(change.StorageKey)] + if ok { + filteredChildChange.ChangeSet = append(filteredChildChange.ChangeSet, change) + } + } + } + filteredChildChanges = append(filteredChildChanges, filteredChildChange) + } + } + } + + storageChangeSet := StorageChangeSet{ + Changes: filteredChanges, + ChildChanges: filteredChildChanges, + Filter: sink.keys, + ChildFilters: sink.childKeys, + } + + notification := StorageNotification[H]{ + Block: hash, + StorageChangeSet: storageChangeSet, + } + + dispatch(subsID, notification) + } +} diff --git a/internal/client/api/registry_test.go b/internal/client/api/registry_test.go new file mode 100644 index 0000000000..3beee7fc9c --- /dev/null +++ b/internal/client/api/registry_test.go @@ -0,0 +1,189 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package api + +import ( + "sync" + "testing" + + "github.com/ChainSafe/gossamer/internal/primitives/core/hash" + "github.com/ChainSafe/gossamer/internal/primitives/storage" + "github.com/stretchr/testify/require" +) + +func Test_StorageNotifications(t *testing.T) { + t.Run("triggering_change_should_notify_wildcard_listeners", func(t *testing.T) { + notifications := NewStorageNotifications[hash.H256]() + childFilter := ChildFilterKeys{ + Key: []byte{4}, + FilterKeys: nil, + } + recv := notifications.Listen(nil, []ChildFilterKeys{childFilter}) + + changeset := []StorageChange{ + {StorageKey: []byte{2}, StorageData: []byte{3}}, + {StorageKey: []byte{3}, StorageData: nil}, + } + cChangeset1 := []StorageChange{ + {StorageKey: []byte{5}, StorageData: []byte{4}}, + {StorageKey: []byte{6}, StorageData: nil}, + } + cChangeset := []StorageChildChange{ + {StorageKey: []byte{4}, ChangeSet: cChangeset1}, + } + + done := make(chan any) + go func() { + defer close(done) + storageNotification := <-recv.Chan() + require.Equal(t, hash.NewH256FromLowUint64BigEndian(1), storageNotification.Block) + require.Equal(t, changeset, storageNotification.StorageChangeSet.Changes) + require.Equal(t, cChangeset, storageNotification.StorageChangeSet.ChildChanges) + + }() + + notifications.Trigger( + hash.NewH256FromLowUint64BigEndian(1), + changeset, + cChangeset, + ) + + <-done + recv.Drop() + notifications.Shutdown() + }) + + t.Run("should_only_notify_interested_listeners", func(t *testing.T) { + notifications := NewStorageNotifications[hash.H256]() + childFilter := ChildFilterKeys{ + Key: []byte{4}, + FilterKeys: []storage.StorageKey{[]byte{5}}, + } + recv1 := notifications.Listen([]storage.StorageKey{{1}}, nil) + recv2 := notifications.Listen([]storage.StorageKey{{2}}, nil) + recv3 := notifications.Listen([]storage.StorageKey{}, []ChildFilterKeys{childFilter}) + + changeset := []StorageChange{ + {StorageKey: []byte{2}, StorageData: []byte{3}}, + {StorageKey: []byte{1}, StorageData: nil}, + } + cChangeset1 := []StorageChange{ + {StorageKey: []byte{5}, StorageData: []byte{4}}, + {StorageKey: []byte{6}, StorageData: nil}, + } + cChangeset := []StorageChildChange{ + {StorageKey: []byte{4}, ChangeSet: cChangeset1}, + } + + var ( + wg sync.WaitGroup + notif1 StorageNotification[hash.H256] + notif2 StorageNotification[hash.H256] + notif3 StorageNotification[hash.H256] + ) + wg.Add(3) + go func() { + defer wg.Done() + notif1 = <-recv1.Chan() + }() + go func() { + defer wg.Done() + notif2 = <-recv2.Chan() + }() + go func() { + defer wg.Done() + notif3 = <-recv3.Chan() + }() + + notifications.Trigger(hash.NewH256FromLowUint64BigEndian(1), changeset, cChangeset) + + wg.Wait() + + require.Equal(t, hash.NewH256FromLowUint64BigEndian(1), notif1.Block) + require.Equal(t, []StorageChange{{StorageKey: []byte{1}, StorageData: nil}}, notif1.StorageChangeSet.Changes) + require.Equal(t, []StorageChildChange(nil), notif1.StorageChangeSet.ChildChanges) + + require.Equal(t, hash.NewH256FromLowUint64BigEndian(1), notif2.Block) + require.Equal(t, []StorageChange{{StorageKey: []byte{2}, StorageData: []byte{3}}}, notif2.StorageChangeSet.Changes) + require.Equal(t, []StorageChildChange(nil), notif2.StorageChangeSet.ChildChanges) + + require.Equal(t, hash.NewH256FromLowUint64BigEndian(1), notif3.Block) + require.Equal(t, []StorageChange(nil), notif3.StorageChangeSet.Changes) + require.Equal(t, []StorageChildChange{{StorageKey: []byte{4}, ChangeSet: []StorageChange{ + {StorageKey: []byte{5}, StorageData: []byte{4}}, + }}}, notif3.StorageChangeSet.ChildChanges) + + recv1.Drop() + recv2.Drop() + recv3.Drop() + notifications.Shutdown() + }) + + t.Run("should_cleanup_subscribers_if_dropped", func(t *testing.T) { + notifications := NewStorageNotifications[hash.H256]() + { + childFilter := ChildFilterKeys{ + Key: []byte{4}, + FilterKeys: []storage.StorageKey{[]byte{5}}, + } + recv1 := notifications.Listen([]storage.StorageKey{{1}}, nil) + recv2 := notifications.Listen([]storage.StorageKey{{2}}, nil) + recv3 := notifications.Listen(nil, nil) + recv4 := notifications.Listen(nil, []ChildFilterKeys{childFilter}) + + require.Equal(t, 2, len(notifications.Registry().listeners)) + require.Equal(t, 2, len(notifications.Registry().wildcardListeners)) + require.Equal(t, 1, len(notifications.Registry().childListeners)) + + recv1.Drop() + recv2.Drop() + recv3.Drop() + recv4.Drop() + } + + changeset := []StorageChange{ + {StorageKey: []byte{2}, StorageData: []byte{3}}, + {StorageKey: []byte{1}, StorageData: nil}, + } + cChangeset := []StorageChildChange{} + notifications.Trigger(hash.NewH256FromLowUint64BigEndian(1), changeset, cChangeset) + + require.Equal(t, 0, len(notifications.Registry().listeners)) + require.Equal(t, 0, len(notifications.Registry().wildcardListeners)) + require.Equal(t, 0, len(notifications.Registry().childListeners)) + + notifications.Shutdown() + }) + + t.Run("should_cleanup_subscriber_if_stream_is_dropped", func(t *testing.T) { + notifications := NewStorageNotifications[hash.H256]() + stream := notifications.Listen(nil, nil) + require.Equal(t, 1, len(notifications.Registry().sinks)) + stream.Drop() + require.Equal(t, 0, len(notifications.Registry().sinks)) + }) + + t.Run("should_not_send_empty_subscriber", func(t *testing.T) { + notifications := NewStorageNotifications[hash.H256]() + recv := notifications.Listen(nil, nil) + + changeset := []StorageChange{} + cChangeset := []StorageChildChange{} + + var notifCount int + done := make(chan any) + go func() { + defer close(done) + for range recv.Chan() { + notifCount++ + } + }() + notifications.Trigger(hash.NewH256FromLowUint64BigEndian(1), changeset, cChangeset) + recv.Drop() + <-done + require.Zero(t, notifCount) + + notifications.Shutdown() + }) +} diff --git a/internal/client/client.go b/internal/client/client.go new file mode 100644 index 0000000000..c42741062e --- /dev/null +++ b/internal/client/client.go @@ -0,0 +1,419 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package client + +import ( + "fmt" + "sync" + "time" + + "github.com/ChainSafe/gossamer/internal/client/api" + "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/ChainSafe/gossamer/internal/primitives/storage" +) + +var logger = log.NewFromGlobal(log.AddContext("pkg", "client")) + +// Client type that implements a number of client interfaces +type Client[ + H runtime.Hash, + Hasher runtime.Hasher[H], + N runtime.Number, + E runtime.Extrinsic, + Header runtime.Header[N, H], +] struct { + backend api.Backend[H, N, Hasher, Header, E] + storageNotifications api.StorageNotifications[H] + importNotificationChansMtx sync.Mutex + importNotificationChans map[chan api.BlockImportNotification[H, N, Header]]any + everyImportNotificationChansMtx sync.Mutex + everyImportNotificationChans map[chan api.BlockImportNotification[H, N, Header]]any + finalityNotificationChansMtx sync.Mutex + finalityNotificationChans map[chan api.FinalityNotification[H, N, Header]]any + // Collects auxiliary operations to be performed atomically together with block import operations. + importActionsMtx sync.Mutex + importActions []api.OnImportAction[H, N, Header] + // Collects auxiliary operations to be performed atomically together with block finalization operations. + finalityActionsMtx sync.Mutex + finalityActions []api.OnFinalityAction[H, N, Header] + // Holds the block hash currently being imported. + importingBlockMtx sync.RWMutex + importingBlock *H + unpinWorkerChan chan<- api.UnpinWorkerMessage[H] +} + +// New is constructor for [Client] +func New[ + H runtime.Hash, + Hasher runtime.Hasher[H], + N runtime.Number, + E runtime.Extrinsic, + Header runtime.Header[N, H], +]( + backend api.Backend[H, N, Hasher, Header, E], +) *Client[H, Hasher, N, E, Header] { + + unpinWorkerChan := make(chan api.UnpinWorkerMessage[H]) + npw := newNotificationPinningWorker(unpinWorkerChan, backend) + go npw.run() + + return &Client[H, Hasher, N, E, Header]{ + backend: backend, + storageNotifications: api.NewStorageNotifications[H](), + importNotificationChans: make(map[chan api.BlockImportNotification[H, N, Header]]any), + everyImportNotificationChans: make(map[chan api.BlockImportNotification[H, N, Header]]any), + finalityNotificationChans: make(map[chan api.FinalityNotification[H, N, Header]]any), + unpinWorkerChan: unpinWorkerChan, + } +} + +func (c *Client[H, Hasher, N, E, Header]) announcePin(message api.AnnouncePin[H]) error { + select { + case c.unpinWorkerChan <- message: + return nil + default: + return fmt.Errorf("unable to send AnnouncePin message to Client.unpinWorkerChan") + } +} + +func (c *Client[H, Hasher, N, E, Header]) unpin(message api.Unpin[H]) error { + select { + case c.unpinWorkerChan <- message: + return nil + default: + return fmt.Errorf("unable to send Unpin message to Client.unpinWorkerChan") + } +} + +func (c *Client[H, Hasher, N, E, Header]) lockImportRun( + f func(*api.ClientImportOperation[H, Hasher, N, Header, E]) error, +) error { + c.backend.GetImportLock().Lock() + defer c.backend.GetImportLock().Unlock() + + blockImportOp, err := c.backend.BeginOperation() + if err != nil { + return err + } + + clientImportOp := api.ClientImportOperation[H, Hasher, N, Header, E]{ + Op: blockImportOp, + } + + err = f(&clientImportOp) + if err != nil { + return err + } + + var finalityNotification *api.FinalityNotification[H, N, Header] + if clientImportOp.NotifyFinalized != nil { + finalityNotification = api.NewFinalityNotificationFromSummary(*clientImportOp.NotifyFinalized, c.unpin) + } + + var ( + importNotification *api.BlockImportNotification[H, N, Header] + storageChanges *api.StorageChanges + importNotificationAction api.ImportNotificationAction + ) + if clientImportOp.NotifyImported != nil { + importNotification = api.NewBlockImportNotificationFromSummary(*clientImportOp.NotifyImported, c.unpin) + storageChanges = clientImportOp.NotifyImported.StorageChanges + importNotificationAction = clientImportOp.NotifyImported.ImportNotificationAction + } else { + importNotificationAction = api.NoneBlockImportNotificationAction + } + + if finalityNotification != nil { + c.finalityActionsMtx.Lock() + defer c.finalityActionsMtx.Unlock() + for _, action := range c.finalityActions { + err := clientImportOp.Op.InsertAux(action(*finalityNotification)) + if err != nil { + return err + } + } + } + if importNotification != nil { + c.importActionsMtx.Lock() + defer c.importActionsMtx.Unlock() + for _, action := range c.importActions { + err := clientImportOp.Op.InsertAux(action(*importNotification)) + if err != nil { + return err + } + } + } + + err = c.backend.CommitOperation(clientImportOp.Op) + if err != nil { + return err + } + + // We need to pin the block in the backend once + // for each notification. Once all notifications are + // dropped, the block will be unpinned automatically. + if finalityNotification != nil { + err := c.backend.PinBlock(finalityNotification.Hash) + if err != nil { + logger.Debugf("Unable to pin block for finality notification. hash: %s, Error: %v", + finalityNotification.Hash, err) + } else { + err := c.announcePin(api.AnnouncePin[H]{Hash: finalityNotification.Hash}) + if err != nil { + logger.Errorf("Unable to send AnnouncePin worker message for finality: %s", err) + } + } + } + + if importNotification != nil { + err := c.backend.PinBlock(importNotification.Hash) + if err != nil { + logger.Debugf("Unable to pin block for import notification. hash: %s, Error: %v", + importNotification.Hash, err) + } else { + err := c.announcePin(api.AnnouncePin[H]{Hash: importNotification.Hash}) + if err != nil { + logger.Errorf("Unable to send AnnouncePin worker message for import: %s", err) + } + } + } + + err = c.notifyFinalized(finalityNotification) + if err != nil { + return err + } + err = c.notifyImported(importNotification, importNotificationAction, storageChanges) + if err != nil { + return err + } + + return nil +} + +func (c *Client[H, Hasher, N, E, Header]) LockImportRun( + f func(*api.ClientImportOperation[H, Hasher, N, Header, E]) error, +) error { + err := c.lockImportRun(f) + c.importingBlockMtx.Lock() + c.importingBlock = nil + c.importingBlockMtx.Unlock() + return err +} + +const notifyFinalizedTimeout = 5 * time.Second +const notifyBlockImportTimeout = notifyFinalizedTimeout + +func (c *Client[H, Hasher, N, E, Header]) notifyFinalized(notification *api.FinalityNotification[H, N, Header]) error { + c.finalityNotificationChansMtx.Lock() + defer c.finalityNotificationChansMtx.Unlock() + + if notification == nil { + return nil + } + + // TODO: telemetry is implemented here. See substrate code: + // /~https://github.com/paritytech/polkadot-sdk/blob/72fb8bd3cd4a5051bb855415b360657d7ce247fb/substrate/client/service/src/client/client.rs#L984 + + wg := sync.WaitGroup{} + for ch := range c.finalityNotificationChans { + wg.Add(1) + go func(ch chan<- api.FinalityNotification[H, N, Header]) { + defer wg.Done() + ch <- *notification + }(ch) + } + done := make(chan any) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + break + case <-time.After(notifyFinalizedTimeout): + break + } + + return nil +} + +func notifyChans[M any](msg M, chans map[chan M]any, timeout time.Duration) { + wg := sync.WaitGroup{} + for ch := range chans { + wg.Add(1) + go func(ch chan M) { + defer wg.Done() + select { + case ch <- msg: + default: + // cleanup chan if not able to send + close(ch) + delete(chans, ch) + } + }(ch) + } + done := make(chan any) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + break + case <-time.After(timeout): + break + } +} + +func (c *Client[H, Hasher, N, E, Header]) notifyImported( + notification *api.BlockImportNotification[H, N, Header], + importNotificationAction api.ImportNotificationAction, + storageChanges *api.StorageChanges, +) error { + if notification == nil { + return nil + } + + var triggerStorageChangesNotification = func() { + if storageChanges != nil { + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? (from substrate) + changeset := make([]api.StorageChange, len(storageChanges.StorageCollection)) + for i, kv := range storageChanges.StorageCollection { + changeset[i] = api.StorageChange{ + StorageKey: storage.StorageKey(kv.StorageKey), + StorageData: storage.StorageData(kv.StorageValue), + } + } + childChangeset := make([]api.StorageChildChange, len(storageChanges.ChildStorageCollection)) + for i, kc := range storageChanges.ChildStorageCollection { + changeset := make([]api.StorageChange, len(kc.StorageCollection)) + for i, kv := range kc.StorageCollection { + changeset[i] = api.StorageChange{ + StorageKey: storage.StorageKey(kv.StorageKey), + StorageData: storage.StorageData(kv.StorageValue), + } + } + childChangeset[i] = api.StorageChildChange{ + StorageKey: storage.StorageKey(kc.StorageKey), + ChangeSet: changeset, + } + } + c.storageNotifications.Trigger( + notification.Hash, + changeset, + childChangeset, + ) + } + } + + switch importNotificationAction { + case api.BothBlockImportNotificationAction: + triggerStorageChangesNotification() + c.importNotificationChansMtx.Lock() + defer c.importNotificationChansMtx.Unlock() + notifyChans(*notification, c.importNotificationChans, notifyBlockImportTimeout) + + c.everyImportNotificationChansMtx.Lock() + defer c.everyImportNotificationChansMtx.Unlock() + notifyChans(*notification, c.everyImportNotificationChans, notifyBlockImportTimeout) + case api.RecentBlockImportNotificationAction: + triggerStorageChangesNotification() + c.importNotificationChansMtx.Lock() + defer c.importNotificationChansMtx.Unlock() + notifyChans(*notification, c.importNotificationChans, notifyBlockImportTimeout) + case api.EveryBlockImportNotificationAction: + c.everyImportNotificationChansMtx.Lock() + defer c.everyImportNotificationChansMtx.Unlock() + notifyChans(*notification, c.everyImportNotificationChans, notifyBlockImportTimeout) + case api.NoneBlockImportNotificationAction: + // This branch is unreachable in fact because the block import notification must be + // not nil (it's already handled at the beginning of this function) at this point. + default: + panic("unreachable") + } + + return nil +} + +func (c *Client[H, Hasher, N, E, Header]) RegisterImportAction(op api.OnImportAction[H, N, Header]) { + c.importActionsMtx.Lock() + defer c.importActionsMtx.Unlock() + c.importActions = append(c.importActions, op) +} + +func (c *Client[H, Hasher, N, E, Header]) RegisterFinalityAction(op api.OnFinalityAction[H, N, Header]) { + c.finalityActionsMtx.Lock() + defer c.finalityActionsMtx.Unlock() + c.finalityActions = append(c.finalityActions, op) +} + +func (c *Client[H, Hasher, N, E, Header]) RegisterImportNotificationStream() api.ImportNotifications[H, N, Header] { + ch := make(chan api.BlockImportNotification[H, N, Header]) + c.importNotificationChansMtx.Lock() + defer c.importNotificationChansMtx.Unlock() + c.importNotificationChans[ch] = nil + return ch +} + +func (c *Client[H, Hasher, N, E, Header]) UnregisterImportNotificationStream( + ch api.ImportNotifications[H, N, Header], +) { + c.importNotificationChansMtx.Lock() + defer c.importNotificationChansMtx.Unlock() + _, ok := c.importNotificationChans[ch] + if ok { + close(ch) + } + delete(c.importNotificationChans, ch) +} + +func (c *Client[H, _, N, E, Header]) RegisterEveryImportNotificationStream() api.ImportNotifications[H, N, Header] { + ch := make(chan api.BlockImportNotification[H, N, Header]) + c.everyImportNotificationChansMtx.Lock() + defer c.everyImportNotificationChansMtx.Unlock() + c.everyImportNotificationChans[ch] = nil + return ch +} + +func (c *Client[H, Hasher, N, E, Header]) UnregisterEveryImportNotificationStream( + ch api.ImportNotifications[H, N, Header], +) { + c.everyImportNotificationChansMtx.Lock() + defer c.everyImportNotificationChansMtx.Unlock() + _, ok := c.everyImportNotificationChans[ch] + if ok { + close(ch) + } + delete(c.everyImportNotificationChans, ch) +} + +func (c *Client[H, _, N, E, Header]) RegisterFinalityNotificationStream() api.FinalityNotifications[H, N, Header] { + ch := make(chan api.FinalityNotification[H, N, Header]) + c.finalityNotificationChansMtx.Lock() + defer c.finalityNotificationChansMtx.Unlock() + c.finalityNotificationChans[ch] = nil + return ch +} + +func (c *Client[H, Hasher, N, E, Header]) UnregisterFinalityNotificationStream( + ch api.FinalityNotifications[H, N, Header], +) { + c.finalityNotificationChansMtx.Lock() + defer c.finalityNotificationChansMtx.Unlock() + _, ok := c.finalityNotificationChans[ch] + if ok { + close(ch) + } + delete(c.finalityNotificationChans, ch) +} + +func (c *Client[H, Hasher, N, E, Header]) StorageChangesNotificationStream( + filterKeys []storage.StorageKey, + childFilterKeys []api.ChildFilterKeys, +) api.StorageEventStream[H] { + return c.storageNotifications.Listen(filterKeys, childFilterKeys) +} diff --git a/internal/client/client_test.go b/internal/client/client_test.go new file mode 100644 index 0000000000..577ac75377 --- /dev/null +++ b/internal/client/client_test.go @@ -0,0 +1,339 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package client + +import ( + "sync" + "testing" + + "github.com/ChainSafe/gossamer/internal/client/api" + "github.com/ChainSafe/gossamer/internal/client/db" + statedb "github.com/ChainSafe/gossamer/internal/client/state-db" + memorykvdb "github.com/ChainSafe/gossamer/internal/kvdb/memory-kvdb" + "github.com/ChainSafe/gossamer/internal/primitives/core/hash" + "github.com/ChainSafe/gossamer/internal/primitives/database" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/ChainSafe/gossamer/internal/primitives/runtime/generic" + rt_testing "github.com/ChainSafe/gossamer/internal/primitives/runtime/testing" + statemachine "github.com/ChainSafe/gossamer/internal/primitives/state-machine" + "github.com/ChainSafe/gossamer/internal/primitives/storage" + "github.com/stretchr/testify/require" +) + +type noopExtrinsic struct{} + +func (noopExtrinsic) IsSigned() *bool { + return nil +} + +var _ runtime.Extrinsic = noopExtrinsic{} + +type TestClient struct { + Client[ + hash.H256, runtime.BlakeTwo256, uint64, noopExtrinsic, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256], + ] +} + +var ( + _ api.BlockchainEvents[ + hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256], + ] = &TestClient{} + _ api.PreCommitActions[ + hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256], + ] = &TestClient{} + _ api.LockImportRun[ + hash.H256, uint64, runtime.BlakeTwo256, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256], noopExtrinsic, + ] = &TestClient{} +) + +func NewTestBackend(t *testing.T, + blocksPruning db.BlocksPruning, canonicalizationDelay uint64, +) api.Backend[ + hash.H256, + uint64, + runtime.BlakeTwo256, + *generic.Header[uint64, hash.H256, runtime.BlakeTwo256], + rt_testing.ExtrinsicsWrapper[uint64], +] { + t.Helper() + + kvdb := memorykvdb.New(13) + var statePruning statedb.PruningMode + switch blocksPruning := blocksPruning.(type) { + case db.BlocksPruningKeepAll: + statePruning = statedb.PruningModeArchiveAll{} + case db.BlocksPruningKeepFinalized: + statePruning = statedb.PruningModeArchiveCanonical{} + case db.BlocksPruningSome: + statePruning = statedb.NewPruningModeConstrained(uint32(blocksPruning)) + default: + t.Fatalf("unreachable") + } + trieCacheMaxSize := uint(16 * 1024 * 1024) + dbSetting := db.DatabaseConfig{ + TrieCacheMaximumSize: &trieCacheMaxSize, + StatePruning: statePruning, + Source: db.DatabaseSource{DB: database.NewDBAdapter[hash.H256](kvdb), RequireCreateFlag: true}, + BlocksPruning: blocksPruning, + } + + backend, err := db.NewBackend[ + hash.H256, + uint64, + rt_testing.ExtrinsicsWrapper[uint64], + runtime.BlakeTwo256, + *generic.Header[uint64, hash.H256, runtime.BlakeTwo256], + ](dbSetting, canonicalizationDelay) + if err != nil { + panic(err) + } + return backend +} + +func TestNew(t *testing.T) { + c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0)) + require.NotNil(t, c) +} + +type BlockImportOperation = api.BlockImportNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]] //nolint:lll +type FinalityNotification = api.FinalityNotification[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]] //nolint:lll + +func TestBlockchainEvents(t *testing.T) { + t.Run("register_unregister", func(t *testing.T) { + c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0)) + blockImport := c.RegisterImportNotificationStream() + require.NotNil(t, blockImport) + _, ok := c.importNotificationChans[blockImport] + require.True(t, ok) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for range blockImport { + } + wg.Done() + }() + + everyImport := c.RegisterEveryImportNotificationStream() + require.NotNil(t, everyImport) + _, ok = c.everyImportNotificationChans[everyImport] + require.True(t, ok) + + wg.Add(1) + go func() { + for range everyImport { + } + wg.Done() + }() + + finality := c.RegisterFinalityNotificationStream() + require.NotNil(t, finality) + _, ok = c.finalityNotificationChans[finality] + require.True(t, ok) + + wg.Add(1) + go func() { + for range finality { + } + wg.Done() + }() + + c.UnregisterImportNotificationStream(blockImport) + _, ok = c.importNotificationChans[blockImport] + require.False(t, ok) + + c.UnregisterEveryImportNotificationStream(everyImport) + _, ok = c.everyImportNotificationChans[everyImport] + require.False(t, ok) + + c.UnregisterFinalityNotificationStream(finality) + _, ok = c.finalityNotificationChans[finality] + require.False(t, ok) + + wg.Wait() + }) + + t.Run("register_receive_block_import_unregister", func(t *testing.T) { + c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0)) + blockImport := c.RegisterImportNotificationStream() + require.NotNil(t, blockImport) + _, ok := c.importNotificationChans[blockImport] + require.True(t, ok) + + var blockImportNotifications []BlockImportOperation + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for notif := range blockImport { + blockImportNotifications = append(blockImportNotifications, notif) + } + wg.Done() + }() + + var everyImportNotifications []BlockImportOperation + everyImport := c.RegisterEveryImportNotificationStream() + require.NotNil(t, everyImport) + _, ok = c.everyImportNotificationChans[everyImport] + require.True(t, ok) + + wg.Add(1) + go func() { + for notif := range everyImport { + everyImportNotifications = append(everyImportNotifications, notif) + } + wg.Done() + }() + + // sends to both + c.notifyImported(&BlockImportOperation{}, api.BothBlockImportNotificationAction, nil) + // sends to import + c.notifyImported(&BlockImportOperation{}, api.RecentBlockImportNotificationAction, nil) + // sends to every + c.notifyImported(&BlockImportOperation{}, api.EveryBlockImportNotificationAction, nil) + + c.UnregisterImportNotificationStream(blockImport) + _, ok = c.importNotificationChans[blockImport] + require.False(t, ok) + + c.UnregisterEveryImportNotificationStream(everyImport) + _, ok = c.everyImportNotificationChans[everyImport] + require.False(t, ok) + + wg.Wait() + + require.Len(t, blockImportNotifications, 2) + require.Len(t, everyImportNotifications, 2) + }) + + t.Run("register_receive_block_import_storage_changes_unregister", func(t *testing.T) { + c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0)) + wg := sync.WaitGroup{} + + topStorage := c.StorageChangesNotificationStream(nil, []api.ChildFilterKeys{}) + wg.Add(1) + go func() { + msg := <-topStorage.Chan() + require.Len(t, msg.Changes, 1) + require.Len(t, msg.ChildChanges, 0) + wg.Done() + }() + + childStorage := c.StorageChangesNotificationStream([]storage.StorageKey{}, []api.ChildFilterKeys{ + {Key: storage.StorageKey("child0"), FilterKeys: []storage.StorageKey{storage.StorageKey("child0")}}, + }) + wg.Add(1) + go func() { + msg := <-childStorage.Chan() + require.Len(t, msg.Changes, 0) + require.Len(t, msg.ChildChanges, 1) + wg.Done() + }() + + wildCard := c.StorageChangesNotificationStream(nil, []api.ChildFilterKeys{ + {Key: storage.StorageKey("child0"), FilterKeys: []storage.StorageKey{storage.StorageKey("child0")}}, + }) + wg.Add(1) + go func() { + msg := <-wildCard.Chan() + require.Len(t, msg.Changes, 1) + require.Len(t, msg.ChildChanges, 1) + wg.Done() + }() + + // sends to both + c.notifyImported( + &BlockImportOperation{}, + api.BothBlockImportNotificationAction, + &api.StorageChanges{ + StorageCollection: statemachine.StorageCollection{ + {StorageKey: statemachine.StorageKey("top0"), StorageValue: statemachine.StorageValue("top0")}, + }, + ChildStorageCollection: []struct { + statemachine.StorageKey + statemachine.StorageCollection + }{ + { + StorageKey: statemachine.StorageKey("child0"), + StorageCollection: statemachine.StorageCollection{ + {StorageKey: statemachine.StorageKey("child0"), StorageValue: statemachine.StorageValue("child0")}, + }, + }, + }, + }, + ) + + wg.Wait() + topStorage.Drop() + childStorage.Drop() + }) + + t.Run("register_receive_finality_unregister", func(t *testing.T) { + c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0)) + finality := c.RegisterFinalityNotificationStream() + require.NotNil(t, finality) + _, ok := c.finalityNotificationChans[finality] + require.True(t, ok) + + var finalityNotifications []FinalityNotification + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for notif := range finality { + finalityNotifications = append(finalityNotifications, notif) + } + wg.Done() + }() + + c.notifyFinalized(&FinalityNotification{}) + + c.UnregisterFinalityNotificationStream(finality) + _, ok = c.finalityNotificationChans[finality] + require.False(t, ok) + + wg.Wait() + + require.Len(t, finalityNotifications, 1) + }) +} + +type ClientImportOperation = api.ClientImportOperation[ + hash.H256, + runtime.BlakeTwo256, + uint64, + *generic.Header[uint64, hash.H256, runtime.BlakeTwo256], + rt_testing.ExtrinsicsWrapper[uint64], +] + +func TestLockImportRun(t *testing.T) { + c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0)) + err := c.LockImportRun(func(cio *ClientImportOperation) error { + return nil + }) + require.NoError(t, err) +} + +func TestPreCommitActions(t *testing.T) { + t.Run("register_import_and_finality_actions", func(t *testing.T) { + c := New(NewTestBackend(t, db.BlocksPruningKeepFinalized{}, 0)) + + var count int + c.RegisterImportAction(func(bin BlockImportOperation) api.AuxDataOperations { + count++ + return api.AuxDataOperations{} + }) + c.RegisterFinalityAction(func(fn FinalityNotification) api.AuxDataOperations { + count++ + return api.AuxDataOperations{} + }) + + err := c.LockImportRun(func(cio *ClientImportOperation) error { + cio.NotifyFinalized = &api.FinalizeSummary[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{} //nolint:lll + cio.NotifyImported = &api.ImportSummary[hash.H256, uint64, *generic.Header[uint64, hash.H256, runtime.BlakeTwo256]]{} //nolint:lll + return nil + }) + require.NoError(t, err) + + require.Equal(t, 2, count) + }) +} diff --git a/internal/client/consensus/consensus.go b/internal/client/consensus/consensus.go new file mode 100644 index 0000000000..8f48346353 --- /dev/null +++ b/internal/client/consensus/consensus.go @@ -0,0 +1,21 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package consensus + +type BlockOrigin uint + +const ( + // Genesis block built into the client. + GenesisBlockOrigin BlockOrigin = iota + // Block is part of the initial sync with the network. + NetworkInitialSyncBlockOrigin + // Block was broadcasted on the network. + NetworkBroadcastBlockOrigin + // Block that was received from the network and validated in the consensus process. + ConsensusBroadcastBlockOrigin + // Block that was collated by this node. + OwnBlockOrigin + // Block was imported from a file. + FileBlockOrigin +) diff --git a/internal/client/notification_pinning.go b/internal/client/notification_pinning.go new file mode 100644 index 0000000000..3b845da64a --- /dev/null +++ b/internal/client/notification_pinning.go @@ -0,0 +1,108 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package client + +import ( + "github.com/ChainSafe/gossamer/internal/client/api" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/dolthub/maphash" + "github.com/elastic/go-freelru" +) + +const notificationPinningLimit = uint32(1024) + +type unpinBlock[H runtime.Hash] interface { + // Pin the block to keep body, justification and state available after pruning. + // Number of pins are reference counted. Users need to make sure to perform + // one call to UnpinBlock per call to PinBlock. + PinBlock(hash H) error + + // Unpin the block to allow pruning. + UnpinBlock(hash H) +} + +type notificationPinningWorker[ + H runtime.Hash, +] struct { + unpinMessageChan <-chan api.UnpinWorkerMessage[H] + backend unpinBlock[H] + pinnedBlocks freelru.Cache[H, uint32] +} + +func (npw *notificationPinningWorker[H]) handleAnnounceMessage(hash H) { + count, _ := npw.pinnedBlocks.Peek(hash) + count++ // this will set to 1 anyways if doesn't exist + npw.pinnedBlocks.Add(hash, count) +} + +func (npw *notificationPinningWorker[H]) handleUnpinMessage(hash H) { + count, ok := npw.pinnedBlocks.Peek(hash) + if ok { + count = count - 1 + npw.pinnedBlocks.Add(hash, count) + if count == 0 { + npw.pinnedBlocks.Remove(hash) + } + logger.Debugf("Reducing pinning refcount for block hash = %s", hash) + npw.backend.UnpinBlock(hash) + } else { + logger.Debugf("Received unpin message for already unpinned block. hash = %s", hash) + } +} + +func (npw *notificationPinningWorker[H]) run() { + for msg := range npw.unpinMessageChan { + switch msg := msg.(type) { + case api.Unpin[H]: + npw.handleUnpinMessage(msg.Hash) + case api.AnnouncePin[H]: + npw.handleAnnounceMessage(msg.Hash) + } + } + logger.Debugf("Terminating unpin-worker, stream terminated.") +} + +type hasher[K comparable] struct { + maphash.Hasher[K] +} + +func (h hasher[K]) Hash(key K) uint32 { + return uint32(h.Hasher.Hash(key)) +} + +func newNotificationPinningWorker[H runtime.Hash]( + unpinMesageChan <-chan api.UnpinWorkerMessage[H], + backend unpinBlock[H], +) *notificationPinningWorker[H] { + return newNotificationPinningWorkerWithLimit(unpinMesageChan, backend, notificationPinningLimit) +} + +func newNotificationPinningWorkerWithLimit[H runtime.Hash]( + unpinMesageChan <-chan api.UnpinWorkerMessage[H], + backend unpinBlock[H], + limit uint32, +) *notificationPinningWorker[H] { + h := hasher[H]{maphash.NewHasher[H]()} + pinnedBlocks, err := freelru.New[H, uint32](limit, h.Hash) + if err != nil { + panic(err) + } + pinnedBlocks.SetOnEvict(func(h H, references uint32) { + if references > 0 { + logger.Warnf("Notification block pinning limit reached. Unpinning block with hash = %s", h) + for i := uint32(0); i < references; i++ { + backend.UnpinBlock(h) + } + } else { + logger.Tracef("Unpinned block. hash = %s", h) + } + }) + worker := notificationPinningWorker[H]{ + unpinMessageChan: unpinMesageChan, + backend: backend, + pinnedBlocks: pinnedBlocks, + } + go worker.run() + return &worker +} diff --git a/internal/client/notification_pinning_test.go b/internal/client/notification_pinning_test.go new file mode 100644 index 0000000000..5ee8bf5417 --- /dev/null +++ b/internal/client/notification_pinning_test.go @@ -0,0 +1,182 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package client + +import ( + "testing" + + "github.com/ChainSafe/gossamer/internal/client/api" + "github.com/ChainSafe/gossamer/internal/primitives/core/hash" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/stretchr/testify/require" +) + +type TestUnpinBlock[H runtime.Hash] struct { + counts map[H]uint32 +} + +func (tub *TestUnpinBlock[H]) PinBlock(hash H) error { + _, ok := tub.counts[hash] + if !ok { + tub.counts[hash] = 0 + } + tub.counts[hash]++ + return nil +} + +func (tub *TestUnpinBlock[H]) UnpinBlock(hash H) { + if tub.counts[hash] != 0 { + tub.counts[hash]-- + } +} + +func Test_notificationPinningWorker(t *testing.T) { + t.Run("pin_unpin", func(t *testing.T) { + unpinMsgChan := make(chan api.UnpinWorkerMessage[hash.H256]) + backend := TestUnpinBlock[hash.H256]{counts: make(map[hash.H256]uint32)} + worker := newNotificationPinningWorker(unpinMsgChan, &backend) + require.NotNil(t, worker) + + h := hash.NewRandomH256() + + err := backend.PinBlock(h) + require.NoError(t, err) + require.Equal(t, uint32(1), backend.counts[h]) + + worker.handleAnnounceMessage(h) + require.Equal(t, 1, worker.pinnedBlocks.Len()) + + worker.handleUnpinMessage(h) + require.Equal(t, uint32(0), backend.counts[h]) + require.Equal(t, 0, worker.pinnedBlocks.Len()) + + close(unpinMsgChan) + }) + + t.Run("multiple_pins", func(t *testing.T) { + unpinMsgChan := make(chan api.UnpinWorkerMessage[hash.H256]) + backend := TestUnpinBlock[hash.H256]{counts: make(map[hash.H256]uint32)} + worker := newNotificationPinningWorker(unpinMsgChan, &backend) + require.NotNil(t, worker) + + h := hash.NewRandomH256() + + // Block got pinned multiple times. + err := backend.PinBlock(h) + require.NoError(t, err) + err = backend.PinBlock(h) + require.NoError(t, err) + err = backend.PinBlock(h) + require.NoError(t, err) + require.Equal(t, uint32(3), backend.counts[h]) + + worker.handleAnnounceMessage(h) + worker.handleAnnounceMessage(h) + worker.handleAnnounceMessage(h) + require.Equal(t, 1, worker.pinnedBlocks.Len()) + + worker.handleUnpinMessage(h) + require.Equal(t, uint32(2), backend.counts[h]) + worker.handleUnpinMessage(h) + require.Equal(t, uint32(1), backend.counts[h]) + worker.handleUnpinMessage(h) + require.Equal(t, uint32(0), backend.counts[h]) + require.Equal(t, 0, worker.pinnedBlocks.Len()) + + close(unpinMsgChan) + }) + + t.Run("too_many_unpins", func(t *testing.T) { + unpinMsgChan := make(chan api.UnpinWorkerMessage[hash.H256]) + backend := TestUnpinBlock[hash.H256]{counts: make(map[hash.H256]uint32)} + worker := newNotificationPinningWorker(unpinMsgChan, &backend) + require.NotNil(t, worker) + + h := hash.NewRandomH256() + h2 := hash.NewRandomH256() + + // Block was announced once but unpinned multiple times. The worker should ignore the + // additional unpins. + err := backend.PinBlock(h) + require.NoError(t, err) + err = backend.PinBlock(h) + require.NoError(t, err) + err = backend.PinBlock(h) + require.NoError(t, err) + require.Equal(t, uint32(3), backend.counts[h]) + + worker.handleAnnounceMessage(h) + require.Equal(t, 1, worker.pinnedBlocks.Len()) + + worker.handleUnpinMessage(h) + require.Equal(t, uint32(2), backend.counts[h]) + worker.handleUnpinMessage(h) + require.Equal(t, uint32(2), backend.counts[h]) + require.Equal(t, 0, worker.pinnedBlocks.Len()) + + worker.handleUnpinMessage(h2) + require.Equal(t, 0, worker.pinnedBlocks.Len()) + require.NotContains(t, h2, backend.counts) + + close(unpinMsgChan) + }) + + t.Run("should_evict_when_limit_reached", func(t *testing.T) { + unpinMsgChan := make(chan api.UnpinWorkerMessage[hash.H256]) + backend := TestUnpinBlock[hash.H256]{counts: make(map[hash.H256]uint32)} + worker := newNotificationPinningWorkerWithLimit(unpinMsgChan, &backend, 2) + require.NotNil(t, worker) + + h := hash.NewRandomH256() + h2 := hash.NewRandomH256() + h3 := hash.NewRandomH256() + h4 := hash.NewRandomH256() + + // Multiple blocks are announced but the cache size is too small. We expect that blocks + // are evicted by the cache and unpinned in the backend. + err := backend.PinBlock(h) + require.NoError(t, err) + err = backend.PinBlock(h2) + require.NoError(t, err) + err = backend.PinBlock(h3) + require.NoError(t, err) + require.Equal(t, uint32(1), backend.counts[h]) + require.Equal(t, uint32(1), backend.counts[h2]) + require.Equal(t, uint32(1), backend.counts[h3]) + + worker.handleAnnounceMessage(h) + _, ok := worker.pinnedBlocks.Peek(h) + require.True(t, ok) + worker.handleAnnounceMessage(h2) + _, ok = worker.pinnedBlocks.Peek(h2) + require.True(t, ok) + worker.handleAnnounceMessage(h3) + _, ok = worker.pinnedBlocks.Peek(h2) + require.True(t, ok) + _, ok = worker.pinnedBlocks.Peek(h3) + require.True(t, ok) + require.Equal(t, 2, worker.pinnedBlocks.Len()) + + // Hash 1 should have gotten unpinned, since its oldest. + require.Equal(t, uint32(0), backend.counts[h]) + require.Equal(t, uint32(1), backend.counts[h2]) + require.Equal(t, uint32(1), backend.counts[h3]) + + // Hash 2 is getting bumped. + worker.handleAnnounceMessage(h2) + _, ok = worker.pinnedBlocks.Peek(h2) + require.True(t, ok) + + // Since hash 2 was accessed, evict hash 3. + worker.handleAnnounceMessage(h4) + _, ok = worker.pinnedBlocks.Peek(h4) + require.True(t, ok) + _, ok = worker.pinnedBlocks.Peek(h2) + require.True(t, ok) + _, ok = worker.pinnedBlocks.Peek(h3) + require.False(t, ok) + + close(unpinMsgChan) + }) +} diff --git a/internal/client/utils/pubsub/pubsub.go b/internal/client/utils/pubsub/pubsub.go new file mode 100644 index 0000000000..1dc43b1633 --- /dev/null +++ b/internal/client/utils/pubsub/pubsub.go @@ -0,0 +1,170 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +// Provides means to implement a typical Pub/Sub mechanism. +// +// This module provides a type [Hub] which can be used both to subscribe, +// and to send the broadcast messages. +// +// The [`Hub`] type is parametrized by two other types: +// - M — the type of a message that shall be delivered to the subscribers; +// - Registry — implementation of the subscription/dispatch logic. +// +// A Registry is implemented by implementing the following interfaces: +// - [Subscribe] +// - [Dispatch] +// - [Unsubscribe] +// +// As a result of subscription [Hub.Subscribe] method returns an instance of +// [Receiver]. That can be used to retrieve a channel of messages. +// Upon [Receiver.Drop] the [Receiver] shall unregister itself from the [Hub]. +package pubsub + +import ( + "fmt" + "sync" + + "github.com/ChainSafe/gossamer/internal/log" +) + +var logger = log.NewFromGlobal(log.AddContext("pkg", "client/utils/pubsub")) + +// Unsubscribe unregisters a previously created subscription. +type Unsubscribe interface { + // Remove all registrations of the subscriber with ID subsID. + Unsubscribe(subsID uint64) +} + +// Subscribe using a key of type K +type Subscribe[K any] interface { + // Register subscriber with the ID `subs_id` as having interest to the key `K`. + Subscribe(subKey K, subsID uint64) +} + +// Dispatch a message of type M. The type Item will be sent through the channel as a result of such dispatch. +type Dispatch[M, Item any] interface { + // Dispatch the message of type M. + // + // The implementation is given an instance of M and is supposed to invoke Dispatch for + // each matching subscriber, with an argument of type Item matching that subscriber. + // + // Note that this does not have to be of the same type with the item that will be sent through + // to the subscribers. The subscribers will receive a message of type Item. + Dispatch(message M, dispatch func(seqID uint64, item Item)) +} + +type Registry[K any, M, Item any] interface { + Subscribe[K] + Unsubscribe + Dispatch[M, Item] +} + +// Hub is a subscription hub. +// +// Does the subscription and dispatch. +// The exact subscription and routing behaviour is to be implemented by the Registry (of type R). +type Hub[K, M, Item any, R Registry[K, M, Item]] struct { + tracingKey string + shared[Item, R] +} + +// NewHub creates a new instance of Hub over the initialised Registry. +func NewHub[K any, M, Item any, R Registry[K, M, Item]](tracingKey string, registry R) *Hub[K, M, Item, R] { + return &Hub[K, M, Item, R]{ + tracingKey: tracingKey, + shared: shared[Item, R]{ + channels: make(map[uint64]chan<- Item), + registry: registry, + }, + } +} + +// Subscribe to this Hub using the subsKey. +// +// A subscription with a key K is possible if the Registry implements [Subscribe]. +func (h *Hub[K, M, Item, R]) Subscribe(subsKey K) *Receiver[Item, R] { + h.shared.Lock() + defer h.shared.Unlock() + + subsID := h.shared.idSequence + h.shared.idSequence++ + + h.shared.registry.Subscribe(subsKey, subsID) + + _, ok := h.shared.channels[subsID] + if ok { + panic("Used subsID to create another ID. Should be unique until uint64 is overflowed.") + } + ch := make(chan Item) + h.shared.channels[subsID] = ch + + return &Receiver[Item, R]{ + channel: ch, + shared: &h.shared, + subsID: subsID, + } +} + +func (h *Hub[K, M, Item, R]) Send(trigger M) { + h.shared.Lock() + defer h.shared.Unlock() + + h.shared.registry.Dispatch(trigger, func(subsID uint64, item Item) { + _, ok := h.shared.channels[subsID] + if !ok { + logger.Warnf("No Sink for SubsID = %d", subsID) + } + h.shared.channels[subsID] <- item + }) +} + +func (h *Hub[K, M, Ret, R]) Shutdown() { + h.shared.Lock() + defer h.shared.Unlock() + + for _, ch := range h.shared.channels { + close(ch) + } + h.shared.channels = nil +} + +func (h *Hub[K, M, Ret, R]) Registry() R { + return h.shared.registry +} + +// Receiver is the receiving side of the subscription. +// +// The messages are delivered as items from a channel. +// Upon calling Drop this receiver unsubscribes itself from the [Hub]. +type Receiver[M any, Registry Unsubscribe] struct { + channel <-chan M + shared *shared[M, Registry] + subsID uint64 +} + +func (r *Receiver[M, Registry]) Chan() <-chan M { + return r.channel +} + +func (r *Receiver[M, Registry]) Drop() { + r.shared.Unsubscribe(r.subsID) +} + +type shared[M any, Registry Unsubscribe] struct { + idSequence uint64 + registry Registry + channels map[uint64]chan<- M + sync.Mutex +} + +func (s *shared[M, Registry]) Unsubscribe(subsID uint64) { + s.Lock() + defer s.Unlock() + _, ok := s.channels[subsID] + if !ok { + panic(fmt.Sprintf("invalid subsID: %d", subsID)) + } + close(s.channels[subsID]) + delete(s.channels, subsID) + s.registry.Unsubscribe(subsID) +} diff --git a/internal/client/utils/pubsub/pubsub_test.go b/internal/client/utils/pubsub/pubsub_test.go new file mode 100644 index 0000000000..41859a3353 --- /dev/null +++ b/internal/client/utils/pubsub/pubsub_test.go @@ -0,0 +1,123 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package pubsub_test + +import ( + "testing" + + "github.com/ChainSafe/gossamer/internal/client/utils/pubsub" + "github.com/stretchr/testify/require" +) + +type Message uint64 +type TestHub struct { + *pubsub.Hub[SubsKey, Message, Message, *Registry[Message]] +} +type TestReceiver = pubsub.Receiver[Message, *Registry[Message]] + +type Registry[M any] struct { + subscribers map[uint64]SubsKey +} + +func (r *Registry[M]) Subscribe(subsKey SubsKey, subsID uint64) { + r.subscribers[subsID] = subsKey +} + +func (r *Registry[M]) Unsubscribe(subsID uint64) { + delete(r.subscribers, subsID) +} + +func (r *Registry[M]) Dispatch(message M, dispatch func(uint64, M)) { + for id, subsKey := range r.subscribers { + _ = subsKey + dispatch(id, message) + } +} + +type SubsKey struct { + // receiver *TestReceiver +} + +func NewTestHub() *TestHub { + return &TestHub{ + pubsub.NewHub("a_tracing_key", &Registry[Message]{subscribers: map[uint64]SubsKey{}}), + } +} + +func (th *TestHub) SubsCount() int { + r := th.Hub.Registry() + return len(r.subscribers) +} + +func Test_Hub(t *testing.T) { + t.Run("receives_relevant_messages_and_chan_closes_on_hub_shutdown", func(t *testing.T) { + hub := NewTestHub() + require.Equal(t, 0, hub.SubsCount()) + + // No subscribers yet. That message is not supposed to get to anyone. + hub.Send(0) + + rx01 := hub.Subscribe(SubsKey{}) + require.Equal(t, 1, hub.SubsCount()) + + // That message is sent after subscription. Should be delivered into rx_01. + done := make(chan Message) + go func() { + m := <-rx01.Chan() + require.Equal(t, Message(1), m) + close(done) + }() + hub.Send(1) + <-done + + // Hub is disposed, so rx01 should be closed. + hub.Shutdown() + + done = make(chan Message) + go func() { + for range rx01.Chan() { + } + close(done) + }() + <-done + }) + + t.Run("subs_count_is_modified_on_rx_drop", func(t *testing.T) { + hub := NewTestHub() + require.Equal(t, 0, hub.SubsCount()) + + rx01 := hub.Subscribe(SubsKey{}) + require.Equal(t, 1, hub.SubsCount()) + rx02 := hub.Subscribe(SubsKey{}) + require.Equal(t, 2, hub.SubsCount()) + + rx01.Drop() + require.Equal(t, 1, hub.SubsCount()) + rx02.Drop() + require.Equal(t, 0, hub.SubsCount()) + }) + + t.Run("positive_subs_count_is_correct_upon_drop_of_rxs_on_cloned_hubs", func(t *testing.T) { + hub := NewTestHub() + hub2 := hub + require.Equal(t, 0, hub.SubsCount()) + require.Equal(t, 0, hub2.SubsCount()) + + rx01 := hub2.Subscribe(SubsKey{}) + require.Equal(t, 1, hub.SubsCount()) + require.Equal(t, 1, hub2.SubsCount()) + + rx02 := hub2.Subscribe(SubsKey{}) + require.Equal(t, 2, hub.SubsCount()) + require.Equal(t, 2, hub2.SubsCount()) + + rx01.Drop() + require.Equal(t, 1, hub.SubsCount()) + require.Equal(t, 1, hub2.SubsCount()) + + rx02.Drop() + require.Equal(t, 0, hub.SubsCount()) + require.Equal(t, 0, hub2.SubsCount()) + }) +} diff --git a/internal/primitives/consensus/grandpa/grandpa.go b/internal/primitives/consensus/grandpa/grandpa.go index 01141a4683..7ac237db0a 100644 --- a/internal/primitives/consensus/grandpa/grandpa.go +++ b/internal/primitives/consensus/grandpa/grandpa.go @@ -12,7 +12,7 @@ import ( "golang.org/x/exp/constraints" ) -var logger = log.NewFromGlobal(log.AddContext("pkg", "consensus/grandpa")) +var logger = log.NewFromGlobal(log.AddContext("pkg", "primitives/consensus/grandpa")) // AuthorityID is the identity of a Grandpa authority. type AuthorityID = app.Public diff --git a/internal/primitives/storage/storage.go b/internal/primitives/storage/storage.go index ff95d2b727..c983e3e668 100644 --- a/internal/primitives/storage/storage.go +++ b/internal/primitives/storage/storage.go @@ -17,7 +17,10 @@ type StorageKey []byte // PrefixedStorageKey is a storage key of a child trie, it contains the prefix to the key. type PrefixedStorageKey []byte -// StorageChild is child trie storage data. +// StorageData is storage data associated to a [StorageKey]. +type StorageData []byte + +// Child trie storage data. type StorageChild struct { Data btree.Map[string, []byte] // Child data for storage. ChildInfo ChildInfo // Associated child info for a child trie. diff --git a/internal/primitives/trie/cache/cache.go b/internal/primitives/trie/cache/cache.go index 3a03742baa..b5706b062c 100644 --- a/internal/primitives/trie/cache/cache.go +++ b/internal/primitives/trie/cache/cache.go @@ -13,7 +13,7 @@ import ( "github.com/elastic/go-freelru" ) -var logger = log.NewFromGlobal(log.AddContext("pkg", "primitives/cache")) +var logger = log.NewFromGlobal(log.AddContext("pkg", "primitives/trie/cache")) // The maximum number of existing keys in the shared cache that a single local cache // can promote to the front of the LRU cache in one go.