Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(internal/client): Introduction of Client that implements LockImportRun, BlockchainEvents, and PrecommitActions. #4477

Open
wants to merge 16 commits into
base: refactor/client-db
Choose a base branch
from
84 changes: 84 additions & 0 deletions internal/client/api/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,82 @@ package api
import (
"sync"

"github.com/ChainSafe/gossamer/internal/client/consensus"
"github.com/ChainSafe/gossamer/internal/primitives/blockchain"
"github.com/ChainSafe/gossamer/internal/primitives/core/offchain"
"github.com/ChainSafe/gossamer/internal/primitives/runtime"
statemachine "github.com/ChainSafe/gossamer/internal/primitives/state-machine"
"github.com/ChainSafe/gossamer/internal/primitives/storage"
)

// ImportNotificationAction describes which block import notification stream should be notified.
type ImportNotificationAction uint

const (
// RecentBlockImportNotificationAction notifies only when the node has synced to the tip or there is a re-org.
RecentBlockImportNotificationAction ImportNotificationAction = iota
// EveryBlockImportNotificationAction notifies for every single block no matter what the sync state is.
EveryBlockImportNotificationAction
// BothBlockImportNotificationAction means both [RecentBlockImportNotificationAction] and
// [EveryBlockImportNotificationAction] should be fired.
BothBlockImportNotificationAction
// NoneBlockImportNotificationAction means no block import notification should be fired.
NoneBlockImportNotificationAction
)

// StorageChanges contains a [statemachine.StorageCollection] and [statemachine.ChildStorageCollection]
type StorageChanges struct {
statemachine.StorageCollection
statemachine.ChildStorageCollection
}

// ImportSummary contains information about the block that just got imported,
// including storage changes, reorged blocks, etc.
type ImportSummary[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] struct {
Hash H // Block hash of the imported block.
Origin consensus.BlockOrigin // Import origin.
Header Header // Header of the imported block.
IsNewBest bool // Is this block a new best block.
StorageChanges *StorageChanges // Optional storage changes.
// TreeRoute from old best to new best.
// If nil, there was no re-org while importing.
TreeRoute *blockchain.TreeRoute[H, N]
ImportNotificationAction ImportNotificationAction // Which notify action to take for this import.
}

// FinalizeSummary contains information about the block that just got finalized, including tree heads that became
// stale at the moment of finalization.
type FinalizeSummary[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] struct {
// Last finalized block header.
Header Header
// Blocks that were finalized.
// The last entry is the one that has been explicitly finalized.
Finalized []H
// Heads that became stale during this finalization operation.
StaleHeads []H
}

// ClientImportOperation is an import operation wrapper.
type ClientImportOperation[
H runtime.Hash,
Hasher runtime.Hasher[H],
N runtime.Number,
Header runtime.Header[N, H],
E runtime.Extrinsic,
] struct {
Op BlockImportOperation[N, H, Hasher, Header, E] // DB Operation.
NotifyImported *ImportSummary[H, N, Header] // Summary of imported block.
NotifyFinalized *FinalizeSummary[H, N, Header] // Summary of finalized block.
}

// NewBlockState is the state of a new block.
type NewBlockState uint8

Expand Down Expand Up @@ -87,6 +156,21 @@ type BlockImportOperation[
UpdateTransactionIndex(index []statemachine.IndexOperation) error
}

// LockImportRun is the interface for performing operations on the backend.
type LockImportRun[
H runtime.Hash,
N runtime.Number,
Hasher runtime.Hasher[H],
Header runtime.Header[N, H],
E runtime.Extrinsic,
] interface {
/// LockImportRun locks the import lock, and run operations inside.
LockImportRun(
f func(*ClientImportOperation[H, Hasher, N, Header, E]) error,
) error
}

// KeyValue is used in [AuxStore.InsertAux]. Key and Value should not be nil.
type KeyValue struct {
Key []byte
Value []byte
Expand Down
221 changes: 220 additions & 1 deletion internal/client/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,231 @@

package api

// AuxDataOperation is a slice of operations to be performed on storage aux data.
import (
"github.com/ChainSafe/gossamer/internal/client/consensus"
"github.com/ChainSafe/gossamer/internal/primitives/blockchain"
"github.com/ChainSafe/gossamer/internal/primitives/runtime"
"github.com/ChainSafe/gossamer/internal/primitives/storage"
)

// ImportNotifications is a channel of block import events.
type ImportNotifications[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] chan BlockImportNotification[H, N, Header]

// FinalityNotifications is a channel of block finality notifications.
type FinalityNotifications[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] chan FinalityNotification[H, N, Header]

// BlockchainEvents is the source of blockchain events.
type BlockchainEvents[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] interface {
// RegisterImportNotificationStream retrieves a channel of block import events.
//
// Not guaranteed to be fired for every imported block. Use
// [RegisterEveryImportNotificationStream] if you want a notification of every imported block
// regardless.
//
// The events for this notification stream are emitted:
// - During initial sync process: if there is a re-org while importing blocks.
// - After initial sync process: on every imported block, regardless of whether it is
// the new best block or not, or if it causes a re-org or not.
RegisterImportNotificationStream() ImportNotifications[H, N, Header]
// UnregisterImportNotificationStream will unregister a registered channel.
UnregisterImportNotificationStream(ImportNotifications[H, N, Header])

// RegisterEveryImportNotificationStream retrieves a channel of block import events for every imported block.
RegisterEveryImportNotificationStream() ImportNotifications[H, N, Header]
// UnregisterEveryImportNotificationStream will unregister a registered channel.
UnregisterEveryImportNotificationStream(ImportNotifications[H, N, Header])

// RegisterFinalityNotificationStream will get a channel of finality notifications. Not guaranteed to be fired for
//every finalized block.
RegisterFinalityNotificationStream() FinalityNotifications[H, N, Header]
// UnregisterFinalityNotificationStream will unregister a registered channel.
UnregisterFinalityNotificationStream(FinalityNotifications[H, N, Header])

// StorageChangesNotificationStream retrieves a storage changes event stream.
// Passing nil for filterKeys subscribes to all storage changes.
StorageChangesNotificationStream(
filterKeys []storage.StorageKey,
childFilterKeys []ChildFilterKeys,
) StorageEventStream[H]
}

// AuxDataOperation is an operation to be performed on storage aux data.
// Key is the encoded data key.
// Value is the encoded optional data to write.
// If Value is nil, the key and the associated data are deleted from storage.
type AuxDataOperation struct {
Key []byte
Data []byte
}

// AuxDataOperations is a slice of [AuxDataOperation] to be performed on storage aux data.
type AuxDataOperations []AuxDataOperation

// OnImportAction is a callback invoked before committing the operations created during block import.
// This gives the opportunity to perform auxiliary pre-commit actions and optionally
// enqueue further storage write operations to be atomically performed on commit.
type OnImportAction[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] func(BlockImportNotification[H, N, Header]) AuxDataOperations

// OnFinalityAction is a callback invoked before committing the operations created during block finalization.
// This gives the opportunity to perform auxiliary pre-commit actions and optionally
// enqueue further storage write operations to be atomically performed on commit.
type OnFinalityAction[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] func(FinalityNotification[H, N, Header]) AuxDataOperations

// PreCommitActions is the interface to perform auxiliary actions before committing a block import or
// finality operation.
type PreCommitActions[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] interface {
RegisterImportAction(op OnImportAction[H, N, Header]) // Actions to be performed on block import.
RegisterFinalityAction(op OnFinalityAction[H, N, Header]) // Actions to be performed on block finalization.
}

// Sends a message to the pinning-worker once dropped to unpin a block in the backend.
type unpinHandleInner[H runtime.Hash] struct {
hash H // Hash of the block pinned by this handle
unpin func(message Unpin[H]) error
}

func (uhi unpinHandleInner[H]) Drop() {
err := uhi.unpin(Unpin[H]{uhi.hash})
if err != nil {
logger.Debugf("Unable to unpin block with hash: %s, error: %v", uhi.hash, err)
}
}

// UnpinWorkerMessage is the message that signals notification-based pinning actions to the pinning-worker.
// When the notification is dropped, an [Unpin] message should be sent to the worker.
type UnpinWorkerMessage[H runtime.Hash] interface {
isUnpinWorkerMessage()
}

// AnnouncePin should be sent when a import or finality notification is created.
type AnnouncePin[H runtime.Hash] struct {
Hash H
}

// Unpin should be sent when a import or finality notification is dropped.
type Unpin[H runtime.Hash] struct {
Hash H
}

func (AnnouncePin[H]) isUnpinWorkerMessage() {}
func (Unpin[H]) isUnpinWorkerMessage() {}

// UnpinHandle keeps a specific block pinned while the handle is alive.
// Once the last handle instance for a given block is dropped, the
// block is unpinned in the [Backend].
type UnpinHandle[H runtime.Hash] struct {
unpinHandleInner[H]
}

// NewUnpinHandle is constructor for [UnpinHandle].
func NewUnpinHandle[H runtime.Hash](hash H, unpin func(message Unpin[H]) error) UnpinHandle[H] {
return UnpinHandle[H]{
unpinHandleInner[H]{
hash: hash,
unpin: unpin,
},
}
}

func (up UnpinHandle[H]) Hash() H {
return up.hash
}

// BlockImportNotification is the summary of an imported block.
type BlockImportNotification[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] struct {
Hash H // Imported block header hash.
Origin consensus.BlockOrigin // Imported block origin.
Header Header // Imported block header.
IsNewBest bool // Is this the new best block.
// TreeRoute from old best to new best. If nil, there was no re-org while importing.
TreeRoute *blockchain.TreeRoute[H, N]
unpinHandle UnpinHandle[H] // Handle to unpin the block this notification is associated with.
}

// Drop will unpin the block from the backend.
func (bin BlockImportNotification[H, N, Header]) Drop() {
bin.unpinHandle.Drop()
}

// NewBlockImportNotificationFromSummary is constructor of [BlockImportNotification] given an [ImportSummary].
func NewBlockImportNotificationFromSummary[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
](summary ImportSummary[H, N, Header], unpin func(message Unpin[H]) error) *BlockImportNotification[H, N, Header] {
return &BlockImportNotification[H, N, Header]{
Hash: summary.Hash,
Origin: summary.Origin,
Header: summary.Header,
IsNewBest: summary.IsNewBest,
TreeRoute: summary.TreeRoute,
unpinHandle: NewUnpinHandle[H](summary.Hash, unpin),
}
}

// FinalityNotification is the summary of a finalized block.
type FinalityNotification[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
] struct {
Hash H // Finalized block header hash.
Header Header // Finalized block header.
// TreeRoute path from the old finalized to new finalized parent (implicitly finalized blocks).
// This maps to the range [oldFinalized, ...newFinalized].
TreeRoute []H
StaleHeads []H // Stale branches heads.
unpinHandle UnpinHandle[H] // Handle to unpin the block this notification associated with.
}

// Drop will unpin the block from the backend.
func (fn FinalityNotification[H, N, Header]) Drop() {
fn.unpinHandle.Drop()
}

// NewFinalityNotificationFromSummary is constructor of [FinalityNotification] given an [FinalizeSummary].
func NewFinalityNotificationFromSummary[
H runtime.Hash,
N runtime.Number,
Header runtime.Header[N, H],
](summary FinalizeSummary[H, N, Header], unpin func(message Unpin[H]) error) *FinalityNotification[H, N, Header] {
var hash H
if len(summary.Finalized) > 0 {
hash = summary.Finalized[len(summary.Finalized)-1]
}
return &FinalityNotification[H, N, Header]{
Hash: hash,
Header: summary.Header,
TreeRoute: summary.Finalized,
StaleHeads: summary.StaleHeads,
unpinHandle: NewUnpinHandle[H](hash, unpin),
}
}
Loading
Loading