Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/sync): Implement state sync strategy #4425

Open
wants to merge 21 commits into
base: diego/warpsync/strategy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/kusama/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"
config.Core.SyncMode = cfg.FullSync

return config
}
2 changes: 1 addition & 1 deletion chain/paseo/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"
config.Core.SyncMode = cfg.FullSync

return config
}
2 changes: 1 addition & 1 deletion chain/polkadot/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"
config.Core.SyncMode = cfg.FullSync

return config
}
2 changes: 1 addition & 1 deletion chain/westend-dev/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.Sync = "full"
config.Core.SyncMode = cfg.FullSync

return config
}
2 changes: 1 addition & 1 deletion chain/westend-local/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.Sync = "full"
config.Core.SyncMode = cfg.FullSync

return config
}
Expand Down
2 changes: 1 addition & 1 deletion chain/westend/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"
config.Core.SyncMode = cfg.FullSync

return config
}
15 changes: 9 additions & 6 deletions cmd/gossamer/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
role string
// validator when set, the node will be an authority
validator bool
// Sync mode [warp | full]
syncMode string

// Account Config
// key to use for the node
Expand Down Expand Up @@ -102,6 +104,10 @@ Usage:
return fmt.Errorf("failed to parse role: %s", err)
}

if err := parseSyncMode(); err != nil {
return fmt.Errorf("failed to parse sync mode: %s", err)
}

if err := parseTelemetryURL(); err != nil {
return fmt.Errorf("failed to parse telemetry-url: %s", err.Error())
}
Expand Down Expand Up @@ -529,13 +535,10 @@ func addCoreFlags(cmd *cobra.Command) error {
return fmt.Errorf("failed to add --grandpa-interval flag: %s", err)
}

if err := addStringFlagBindViper(cmd,
cmd.Flags().StringVar(&syncMode,
"sync",
config.Core.Sync,
"sync mode [warp | full]",
"core.sync"); err != nil {
return fmt.Errorf("failed to add --sync flag: %s", err)
}
cfg.FullSync.String(),
"Sync mode. One of 'full' or 'warp'.")

return nil
}
Expand Down
17 changes: 17 additions & 0 deletions cmd/gossamer/commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,23 @@ func parseRole() error {
return nil
}

// parseSyncMode parses the sync mode from the command line flags
func parseSyncMode() error {
var selectedSyncMode cfg.SyncMode
switch syncMode {
case cfg.FullSync.String():
selectedSyncMode = cfg.FullSync
case cfg.WarpSync.String():
selectedSyncMode = cfg.WarpSync
default:
return fmt.Errorf("invalid sync mode: %s", role)
}

config.Core.SyncMode = selectedSyncMode
viper.Set("core.syncMode", config.Core.SyncMode)
return nil
}

// parseTelemetryURL parses the telemetry-url from the command line flag
func parseTelemetryURL() error {
if telemetryURLs == "" {
Expand Down
23 changes: 18 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const (
DefaultSystemVersion = "0.0.0"

// DefaultSyncMode is the default block sync mode
DefaultSyncMode = "full"
DefaultSyncMode = FullSync
)

// DefaultRPCModules the default RPC modules
Expand Down Expand Up @@ -191,7 +191,7 @@ type CoreConfig struct {
GrandpaAuthority bool `mapstructure:"grandpa-authority"`
WasmInterpreter string `mapstructure:"wasm-interpreter,omitempty"`
GrandpaInterval time.Duration `mapstructure:"grandpa-interval,omitempty"`
Sync string `mapstructure:"sync,omitempty"`
SyncMode SyncMode `mapstructure:"sync,omitempty"`
}

// StateConfig contains the configuration for the state.
Expand Down Expand Up @@ -367,7 +367,7 @@ func DefaultConfig() *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
Sync: DefaultSyncMode,
SyncMode: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -449,7 +449,7 @@ func DefaultConfigFromSpec(nodeSpec *genesis.Genesis) *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
Sync: DefaultSyncMode,
SyncMode: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -531,7 +531,7 @@ func Copy(c *Config) Config {
GrandpaAuthority: c.Core.GrandpaAuthority,
WasmInterpreter: c.Core.WasmInterpreter,
GrandpaInterval: c.Core.GrandpaInterval,
Sync: c.Core.Sync,
SyncMode: c.Core.SyncMode,
},
Network: &NetworkConfig{
Port: c.Network.Port,
Expand Down Expand Up @@ -611,6 +611,19 @@ func (c Chain) String() string {
return string(c)
}

// SyncMode is a string representing a sync mode
type SyncMode string

const (
FullSync SyncMode = "full"
WarpSync SyncMode = "warp"
StateSync SyncMode = "state"
)

func (n SyncMode) String() string {
return string(n)
}

// NetworkRole is a string representing a network role
type NetworkRole string

Expand Down
2 changes: 1 addition & 1 deletion dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (d *discovery) advertise() {

ttl, err = d.rd.Advertise(d.ctx, string(d.pid))
if err != nil {
logger.Warnf("failed to advertise in the DHT: %s", err)
logger.Debugf("failed to advertise in the DHT: %s", err)
ttl = tryAdvertiseTimeout
}
}
Expand Down
23 changes: 21 additions & 2 deletions dot/network/messages/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ type StateRequest struct {
NoProof bool
}

func NewStateRequest(block common.Hash, start [][]byte, noProof bool) *StateRequest {
return &StateRequest{
Block: block,
Start: start,
NoProof: noProof,
}
}

func (s *StateRequest) String() string {
return fmt.Sprintf("StateRequest Block=%s Start=[0x%x, 0x%x] NoProof=%v",
return fmt.Sprintf("StateRequest Block=%s Start=[%v] NoProof=%v",
s.Block.String(),
s.Start[0], s.Start[1],
s.Start,
s.NoProof,
)
}
Expand Down Expand Up @@ -98,3 +106,14 @@ func (s *StateResponse) Decode(in []byte) error {

return nil
}

func (s *StateResponse) Encode() ([]byte, error) {
panic("not implemented")
dimartiro marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *StateResponse) String() string {
return fmt.Sprintf("StateResponse Entries=[%v] Proof=[%v]",
s.Entries,
s.Proof,
)
}
2 changes: 1 addition & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
// we've completed the handshake with the peer, send message directly
logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg)
if err := s.host.writeToStream(stream, msg); err != nil {
logger.Errorf("failed to send message to peer %s: %s", peer, err)
logger.Debugf("failed to send message to peer %s: %s", peer, err)

// the stream was closed or reset, close it on our end and delete it from our peer's data
if errors.Is(err, io.EOF) || errors.Is(err, network.ErrReset) {
Expand Down
13 changes: 13 additions & 0 deletions dot/network/request_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type RequestResponseProtocol struct {
responseBuf []byte
}

func NewRequestResponseProtocol(ctx context.Context, host *host, protocolID protocol.ID,
requestTimeout time.Duration, maxResponseSize uint64) *RequestResponseProtocol {
return &RequestResponseProtocol{
ctx: ctx,
host: host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocolID,
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
}

func (rrp *RequestResponseProtocol) Do(to peer.ID, req, res messages.P2PMessage) error {
rrp.host.p2pHost.ConnManager().Protect(to, "")
defer rrp.host.p2pHost.ConnManager().Unprotect(to, "")
Expand Down
13 changes: 3 additions & 10 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
// the following are sub-protocols used by the node
SyncID = "/sync/2"
WarpSyncID = "/sync/warp"
StateSyncID = "/state/2"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"
Expand Down Expand Up @@ -629,15 +630,7 @@ func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout
genesisHash = strings.TrimPrefix(genesisHash, "0x")
protocolId := fmt.Sprintf("/%s%s", genesisHash, subprotocol)

return &RequestResponseProtocol{
ctx: s.ctx,
host: s.host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocol.ID(protocolId),
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
return NewRequestResponseProtocol(s.ctx, s.host, protocol.ID(protocolId), requestTimeout, maxResponseSize)
}

// Health returns information about host needed for the rpc server
Expand Down Expand Up @@ -765,7 +758,7 @@ func (s *Service) processMessage(msg peerset.Message) {
err := s.host.connect(addrInfo)
if err != nil {
// TODO: if error happens here outgoing (?) slot is occupied but no peer is really connected
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
logger.Debugf("failed to open connection for peer %s: %s", peerID, err)
return
}
logger.Debugf("connection successful with peer %s", peerID)
Expand Down
5 changes: 5 additions & 0 deletions dot/peerset/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ const (
BadWarpProofValue Reputation = -(1 << 29)
// BadWarpProofReason is used when peer send invalid warp sync proof.
BadWarpProofReason = "Bad warp proof"

// BadStateValue is used when peer send invalid state response.
BadStateValue Reputation = -(1 << 29)
// BadStateReason is used when peer send invalid state response.
BadStateReason = "Bad state"
)
52 changes: 9 additions & 43 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"strings"
"time"

cfg "github.com/ChainSafe/gossamer/config"

Expand Down Expand Up @@ -38,8 +37,6 @@ import (
wazero_runtime "github.com/ChainSafe/gossamer/lib/runtime/wazero"
)

const blockRequestTimeout = 20 * time.Second

// BlockProducer to produce blocks
type BlockProducer interface {
Pause() error
Expand Down Expand Up @@ -524,51 +521,20 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
return nil, fmt.Errorf("failed to parse sync log level: %w", err)
}

// Should be shared between all sync strategies
peersView := sync.NewPeerViewSet()

var warpSyncStrategy sync.Strategy

if config.Core.Sync == "warp" {
warpSyncProvider := warpsync.NewWarpSyncProofProvider(st.Block, st.Grandpa)

warpSyncCfg := &sync.WarpSyncConfig{
Telemetry: telemetryMailer,
BadBlocks: genesisData.BadBlocks,
WarpSyncProvider: warpSyncProvider,
WarpSyncRequestMaker: net.GetRequestResponseProtocol(network.WarpSyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
SyncRequestMaker: net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
BlockState: st.Block,
Peers: peersView,
}

warpSyncStrategy = sync.NewWarpSyncStrategy(warpSyncCfg)
}

syncCfg := &sync.FullSyncConfig{
BlockState: st.Block,
StorageState: st.Storage,
TransactionState: st.Transaction,
FinalityGadget: fg,
BabeVerifier: verifier,
BlockImportHandler: cs,
Telemetry: telemetryMailer,
BadBlocks: genesisData.BadBlocks,
RequestMaker: net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
Peers: peersView,
}
fullSync := sync.NewFullSyncStrategy(syncCfg)

return sync.NewSyncService(
syncLogLevel,
sync.WithNetwork(net),
sync.WithBlockState(st.Block),
sync.WithGrandpaState(st.Grandpa),
sync.WithStorageState(st.Storage),
sync.WithFinalityGadget(fg),
sync.WithBabeVerifier(verifier),
sync.WithBlockImportHandler(cs),
sync.WithTelemetry(telemetryMailer),
sync.WithBadBlocks(genesisData.BadBlocks),
sync.WithSyncMethod(config.Core.SyncMode),
sync.WithTransactionState(st.Transaction),
sync.WithSlotDuration(slotDuration),
sync.WithWarpSyncStrategy(warpSyncStrategy),
sync.WithFullSyncStrategy(fullSync),
sync.WithMinPeers(config.Network.MinPeers),
), nil
}
Expand Down
1 change: 1 addition & 0 deletions dot/sync/block_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type (

// StorageState is the interface for the storage state
StorageState interface {
StoreTrie(ts *rtstorage.TrieState, header *types.Header) error
TrieState(root *common.Hash) (*rtstorage.TrieState, error)
sync.Locker
}
Expand Down
Loading
Loading