diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index dd14ca514b3f..c95c72c370a1 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -40,7 +40,10 @@ use sc_consensus::{ use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend}; use sc_network_sync::SyncingService; use sc_network_transactions::TransactionsHandlerController; -use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig}; +use sc_service::{ + build_polkadot_syncing_strategy, Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, + WarpSyncConfig, +}; use sc_telemetry::{log, TelemetryWorkerHandle}; use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; @@ -425,7 +428,7 @@ pub struct BuildNetworkParams< pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>( BuildNetworkParams { parachain_config, - net_config, + mut net_config, client, transaction_pool, para_id, @@ -462,7 +465,7 @@ where IQ: ImportQueue + 'static, Network: NetworkBackend::Hash>, { - let warp_sync_params = match parachain_config.network.sync_mode { + let warp_sync_config = match parachain_config.network.sync_mode { SyncMode::Warp => { log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block..."); @@ -493,9 +496,19 @@ where }, }; let metrics = Network::register_notification_metrics( - parachain_config.prometheus_config.as_ref().map(|cfg| &cfg.registry), + parachain_config.prometheus_config.as_ref().map(|config| &config.registry), ); + let syncing_strategy = build_polkadot_syncing_strategy( + parachain_config.protocol_id(), + parachain_config.chain_spec.fork_id(), + &mut net_config, + warp_sync_config, + client.clone(), + &spawn_handle, + parachain_config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + sc_service::build_network(sc_service::BuildNetworkParams { config: parachain_config, net_config, @@ -504,7 +517,7 @@ where spawn_handle, import_queue, block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)), - warp_sync_config: warp_sync_params, + syncing_strategy, block_relay: None, metrics, }) diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index fe96d29c1ceb..dd35423e18e1 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -84,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use prometheus_endpoint::Registry; #[cfg(feature = "full-node")] use sc_service::KeystoreContainer; -use sc_service::{RpcHandlers, SpawnTaskHandle}; +use sc_service::{build_polkadot_syncing_strategy, RpcHandlers, SpawnTaskHandle}; use sc_telemetry::TelemetryWorker; #[cfg(feature = "full-node")] use sc_telemetry::{Telemetry, TelemetryWorkerHandle}; @@ -1028,6 +1028,16 @@ pub fn new_full< }) }; + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + Some(WarpSyncConfig::WithProvider(warp_sync)), + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -1037,7 +1047,7 @@ pub fn new_full< spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), + syncing_strategy, block_relay: None, metrics, })?; diff --git a/prdoc/pr_5666.prdoc b/prdoc/pr_5666.prdoc new file mode 100644 index 000000000000..08bd9815cdd4 --- /dev/null +++ b/prdoc/pr_5666.prdoc @@ -0,0 +1,19 @@ +title: Make syncing strategy an argument of the syncing engine + +doc: + - audience: Node Dev + description: | + Syncing strategy is no longer implicitly created when building network, but needs to be instantiated explicitly. + Previously default implementation can be created with new function `build_polkadot_syncing_strategy` or custom + syncing strategy could be implemented and used instead if desired, providing greater flexibility for chain + developers. + +crates: + - name: cumulus-client-service + bump: patch + - name: polkadot-service + bump: patch + - name: sc-service + bump: major + - name: sc-network-sync + bump: major diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 1b345a23f27e..69e953f54e42 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -32,6 +32,7 @@ use frame_system_rpc_runtime_api::AccountNonceApi; use futures::prelude::*; use kitchensink_runtime::RuntimeApi; use node_primitives::Block; +use polkadot_sdk::sc_service::build_polkadot_syncing_strategy; use sc_client_api::{Backend, BlockBackend}; use sc_consensus_babe::{self, SlotProportion}; use sc_network::{ @@ -506,6 +507,16 @@ pub fn new_full_base::Hash>>( Vec::default(), )); + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + Some(WarpSyncConfig::WithProvider(warp_sync)), + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -515,7 +526,7 @@ pub fn new_full_base::Hash>>( spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), + syncing_strategy, block_relay: None, metrics, })?; diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 86c1a7abf744..aafbd950202d 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -24,7 +24,6 @@ use crate::{ BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream, }, block_relay_protocol::{BlockDownloader, BlockResponseError}, - block_request_handler::MAX_BLOCKS_IN_RESPONSE, pending_responses::{PendingResponses, ResponseEvent}, schema::v1::{StateRequest, StateResponse}, service::{ @@ -32,8 +31,8 @@ use crate::{ syncing_service::{SyncingService, ToServiceCommand}, }, strategy::{ - warp::{EncodedProof, WarpProofRequest, WarpSyncConfig}, - PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, + warp::{EncodedProof, WarpProofRequest}, + StrategyKey, SyncingAction, SyncingStrategy, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -189,7 +188,7 @@ pub struct Peer { pub struct SyncingEngine { /// Syncing strategy. - strategy: PolkadotSyncingStrategy, + strategy: Box>, /// Blockchain client. client: Arc, @@ -271,12 +270,6 @@ pub struct SyncingEngine { /// Block downloader block_downloader: Arc>, - /// Protocol name used to send out state requests - state_request_protocol_name: ProtocolName, - - /// Protocol name used to send out warp sync requests - warp_sync_protocol_name: Option, - /// Handle to import queue. import_queue: Box>, } @@ -301,35 +294,15 @@ where protocol_id: ProtocolId, fork_id: &Option, block_announce_validator: Box + Send>, - warp_sync_config: Option>, + syncing_strategy: Box>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, block_downloader: Arc>, - state_request_protocol_name: ProtocolName, - warp_sync_protocol_name: Option, peer_store_handle: Arc, ) -> Result<(Self, SyncingService, N::NotificationProtocolConfig), ClientError> where N: NetworkBackend::Hash>, { - let mode = net_config.network_config.sync_mode; - let max_parallel_downloads = net_config.network_config.max_parallel_downloads; - let max_blocks_per_request = - if net_config.network_config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 { - log::info!( - target: LOG_TARGET, - "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}", - ); - MAX_BLOCKS_IN_RESPONSE as u32 - } else { - net_config.network_config.max_blocks_per_request - }; - let syncing_config = SyncingConfig { - mode, - max_parallel_downloads, - max_blocks_per_request, - metrics_registry: metrics_registry.cloned(), - }; let cache_capacity = (net_config.network_config.default_peers_set.in_peers + net_config.network_config.default_peers_set.out_peers) .max(1); @@ -388,10 +361,6 @@ where Arc::clone(&peer_store_handle), ); - // Initialize syncing strategy. - let strategy = - PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; - let block_announce_protocol_name = block_announce_config.protocol_name().clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); let num_connected = Arc::new(AtomicUsize::new(0)); @@ -413,7 +382,7 @@ where Self { roles, client, - strategy, + strategy: syncing_strategy, network_service, peers: HashMap::new(), block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)), @@ -450,8 +419,6 @@ where }, pending_responses: PendingResponses::new(), block_downloader, - state_request_protocol_name, - warp_sync_protocol_name, import_queue, }, SyncingService::new(tx, num_connected, is_major_syncing), @@ -652,16 +619,16 @@ where "Processed {action:?}, response removed: {removed}.", ); }, - SyncingAction::SendStateRequest { peer_id, key, request } => { - self.send_state_request(peer_id, key, request); + SyncingAction::SendStateRequest { peer_id, key, protocol_name, request } => { + self.send_state_request(peer_id, key, protocol_name, request); trace!( target: LOG_TARGET, "Processed `ChainSyncAction::SendStateRequest` to {peer_id}.", ); }, - SyncingAction::SendWarpProofRequest { peer_id, key, request } => { - self.send_warp_proof_request(peer_id, key, request.clone()); + SyncingAction::SendWarpProofRequest { peer_id, key, protocol_name, request } => { + self.send_warp_proof_request(peer_id, key, protocol_name, request.clone()); trace!( target: LOG_TARGET, @@ -1054,6 +1021,7 @@ where &mut self, peer_id: PeerId, key: StrategyKey, + protocol_name: ProtocolName, request: OpaqueStateRequest, ) { if !self.peers.contains_key(&peer_id) { @@ -1070,7 +1038,7 @@ where Ok(data) => { self.network_service.start_request( peer_id, - self.state_request_protocol_name.clone(), + protocol_name, data, tx, IfDisconnected::ImmediateError, @@ -1089,6 +1057,7 @@ where &mut self, peer_id: PeerId, key: StrategyKey, + protocol_name: ProtocolName, request: WarpProofRequest, ) { if !self.peers.contains_key(&peer_id) { @@ -1101,21 +1070,13 @@ where self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed()); - match &self.warp_sync_protocol_name { - Some(name) => self.network_service.start_request( - peer_id, - name.clone(), - request.encode(), - tx, - IfDisconnected::ImmediateError, - ), - None => { - log::warn!( - target: LOG_TARGET, - "Trying to send warp sync request when no protocol is configured {request:?}", - ); - }, - } + self.network_service.start_request( + peer_id, + protocol_name, + request.encode(), + tx, + IfDisconnected::ImmediateError, + ); } fn encode_state_request(request: &OpaqueStateRequest) -> Result, String> { diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index f8d6976bbaa0..81998b7576bb 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -26,6 +26,7 @@ pub mod state_sync; pub mod warp; use crate::{ + block_request_handler::MAX_BLOCKS_IN_RESPONSE, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus}, LOG_TARGET, }; @@ -34,6 +35,7 @@ use log::{debug, error, info}; use prometheus_endpoint::Registry; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_network::ProtocolName; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest}, SyncMode, @@ -172,6 +174,8 @@ pub struct SyncingConfig { pub max_blocks_per_request: u32, /// Prometheus metrics registry. pub metrics_registry: Option, + /// Protocol name used to send out state requests + pub state_request_protocol_name: ProtocolName, } /// The key identifying a specific strategy for responses routing. @@ -190,9 +194,19 @@ pub enum SyncingAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, key: StrategyKey, request: BlockRequest }, /// Send state request to peer. - SendStateRequest { peer_id: PeerId, key: StrategyKey, request: OpaqueStateRequest }, + SendStateRequest { + peer_id: PeerId, + key: StrategyKey, + protocol_name: ProtocolName, + request: OpaqueStateRequest, + }, /// Send warp proof request to peer. - SendWarpProofRequest { peer_id: PeerId, key: StrategyKey, request: WarpProofRequest }, + SendWarpProofRequest { + peer_id: PeerId, + key: StrategyKey, + protocol_name: ProtocolName, + request: WarpProofRequest, + }, /// Drop stale request. CancelRequest { peer_id: PeerId, key: StrategyKey }, /// Peer misbehaved. Disconnect, report it and cancel any requests to it. @@ -219,8 +233,13 @@ impl SyncingAction { impl From> for SyncingAction { fn from(action: WarpSyncAction) -> Self { match action { - WarpSyncAction::SendWarpProofRequest { peer_id, request } => - SyncingAction::SendWarpProofRequest { peer_id, key: StrategyKey::Warp, request }, + WarpSyncAction::SendWarpProofRequest { peer_id, protocol_name, request } => + SyncingAction::SendWarpProofRequest { + peer_id, + key: StrategyKey::Warp, + protocol_name, + request, + }, WarpSyncAction::SendBlockRequest { peer_id, request } => SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::Warp, request }, WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), @@ -232,8 +251,13 @@ impl From> for SyncingAction { impl From> for SyncingAction { fn from(action: StateStrategyAction) -> Self { match action { - StateStrategyAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, key: StrategyKey::State, request }, + StateStrategyAction::SendStateRequest { peer_id, protocol_name, request } => + SyncingAction::SendStateRequest { + peer_id, + key: StrategyKey::State, + protocol_name, + request, + }, StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), StateStrategyAction::ImportBlocks { origin, blocks } => SyncingAction::ImportBlocks { origin, blocks }, @@ -509,14 +533,24 @@ where { /// Initialize a new syncing strategy. pub fn new( - config: SyncingConfig, + mut config: SyncingConfig, client: Arc, warp_sync_config: Option>, + warp_sync_protocol_name: Option, ) -> Result { + if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 { + info!( + target: LOG_TARGET, + "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}", + ); + config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32; + } + if let SyncMode::Warp = config.mode { let warp_sync_config = warp_sync_config .expect("Warp sync configuration must be supplied in warp sync mode."); - let warp_sync = WarpSync::new(client.clone(), warp_sync_config); + let warp_sync = + WarpSync::new(client.clone(), warp_sync_config, warp_sync_protocol_name); Ok(Self { config, client, @@ -531,6 +565,7 @@ where client.clone(), config.max_parallel_downloads, config.max_blocks_per_request, + config.state_request_protocol_name.clone(), config.metrics_registry.as_ref(), std::iter::empty(), )?; @@ -564,6 +599,7 @@ where self.peer_best_blocks .iter() .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)), + self.config.state_request_protocol_name.clone(), ); self.warp = None; @@ -580,6 +616,7 @@ where self.client.clone(), self.config.max_parallel_downloads, self.config.max_blocks_per_request, + self.config.state_request_protocol_name.clone(), self.config.metrics_registry.as_ref(), self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { (*peer_id, *best_hash, *best_number) @@ -608,6 +645,7 @@ where self.client.clone(), self.config.max_parallel_downloads, self.config.max_blocks_per_request, + self.config.state_request_protocol_name.clone(), self.config.metrics_registry.as_ref(), self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { (*peer_id, *best_hash, *best_number) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index a8ba5558d1bc..fd0e3ea1a76c 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -47,6 +47,7 @@ use log::{debug, error, info, trace, warn}; use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64}; use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_network::ProtocolName; use sc_network_common::sync::message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }; @@ -318,6 +319,8 @@ pub struct ChainSync { max_parallel_downloads: u32, /// Maximum blocks per request. max_blocks_per_request: u32, + /// Protocol name used to send out state requests + state_request_protocol_name: ProtocolName, /// Total number of downloaded blocks. downloaded_blocks: usize, /// State sync in progress, if any. @@ -880,7 +883,12 @@ where self.actions.extend(justification_requests); let state_request = self.state_request().into_iter().map(|(peer_id, request)| { - SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request } + SyncingAction::SendStateRequest { + peer_id, + key: StrategyKey::ChainSync, + protocol_name: self.state_request_protocol_name.clone(), + request, + } }); self.actions.extend(state_request); @@ -905,6 +913,7 @@ where client: Arc, max_parallel_downloads: u32, max_blocks_per_request: u32, + state_request_protocol_name: ProtocolName, metrics_registry: Option<&Registry>, initial_peers: impl Iterator)>, ) -> Result { @@ -923,6 +932,7 @@ where allowed_requests: Default::default(), max_parallel_downloads, max_blocks_per_request, + state_request_protocol_name, downloaded_blocks: 0, state_sync: None, import_existing: false, diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index 59436f387db6..d13f034e2e8d 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -38,9 +38,16 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { let client = Arc::new(TestClientBuilder::new().build()); let peer_id = PeerId::random(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let (a1_hash, a1_number) = { let a1 = BlockBuilderBuilder::new(&*client) @@ -95,9 +102,16 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // we request max 8 blocks to always initiate block requests to both peers for the test to be // deterministic - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 8, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 8, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -291,9 +305,16 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { let client = Arc::new(TestClientBuilder::new().build()); let info = client.info(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -438,9 +459,16 @@ fn can_sync_huge_fork() { let info = client.info(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -572,9 +600,16 @@ fn syncs_fork_without_duplicate_requests() { let info = client.info(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -709,9 +744,16 @@ fn removes_target_fork_on_disconnect() { let client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&client, None, false)).collect::>(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); @@ -736,9 +778,16 @@ fn can_import_response_with_missing_blocks() { let empty_client = Arc::new(TestClientBuilder::new().build()); - let mut sync = - ChainSync::new(ChainSyncMode::Full, empty_client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + empty_client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let best_block = blocks[3].clone(); @@ -769,9 +818,16 @@ fn ancestor_search_repeat() { #[test] fn sync_restart_removes_block_but_not_justification_requests() { let client = Arc::new(TestClientBuilder::new().build()); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peers = vec![PeerId::random(), PeerId::random()]; @@ -913,9 +969,16 @@ fn request_across_forks() { fork_blocks }; - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); // Add the peers, all at the common ancestor 100. let common_block = blocks.last().unwrap(); diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index 6f06f238fe3a..a04ab8be4fea 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -30,6 +30,7 @@ use crate::{ use log::{debug, error, trace}; use sc_client_api::ProofProvider; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_network::ProtocolName; use sc_network_common::sync::message::BlockAnnounce; use sc_network_types::PeerId; use sp_consensus::BlockOrigin; @@ -52,7 +53,7 @@ mod rep { /// Action that should be performed on [`StateStrategy`]'s behalf. pub enum StateStrategyAction { /// Send state request to peer. - SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + SendStateRequest { peer_id: PeerId, protocol_name: ProtocolName, request: OpaqueStateRequest }, /// Disconnect and report peer. DropPeer(BadPeer), /// Import blocks. @@ -83,6 +84,7 @@ pub struct StateStrategy { peers: HashMap>, disconnected_peers: DisconnectedPeers, actions: Vec>, + protocol_name: ProtocolName, succeeded: bool, } @@ -95,6 +97,7 @@ impl StateStrategy { target_justifications: Option, skip_proof: bool, initial_peers: impl Iterator)>, + protocol_name: ProtocolName, ) -> Self where Client: ProofProvider + Send + Sync + 'static, @@ -115,6 +118,7 @@ impl StateStrategy { peers, disconnected_peers: DisconnectedPeers::new(), actions: Vec::new(), + protocol_name, succeeded: false, } } @@ -125,6 +129,7 @@ impl StateStrategy { fn new_with_provider( state_sync_provider: Box>, initial_peers: impl Iterator)>, + protocol_name: ProtocolName, ) -> Self { Self { state_sync: state_sync_provider, @@ -135,6 +140,7 @@ impl StateStrategy { .collect(), disconnected_peers: DisconnectedPeers::new(), actions: Vec::new(), + protocol_name, succeeded: false, } } @@ -349,10 +355,13 @@ impl StateStrategy { /// Get actions that should be performed by the owner on [`WarpSync`]'s behalf #[must_use] pub fn actions(&mut self) -> impl Iterator> { - let state_request = self - .state_request() - .into_iter() - .map(|(peer_id, request)| StateStrategyAction::SendStateRequest { peer_id, request }); + let state_request = self.state_request().into_iter().map(|(peer_id, request)| { + StateStrategyAction::SendStateRequest { + peer_id, + protocol_name: self.protocol_name.clone(), + request, + } + }); self.actions.extend(state_request); std::mem::take(&mut self.actions).into_iter() @@ -409,8 +418,15 @@ mod test { .block; let target_header = target_block.header().clone(); - let mut state_strategy = - StateStrategy::new(client, target_header, None, None, false, std::iter::empty()); + let mut state_strategy = StateStrategy::new( + client, + target_header, + None, + None, + false, + std::iter::empty(), + ProtocolName::Static(""), + ); assert!(state_strategy .schedule_next_peer(PeerState::DownloadingState, Zero::zero()) @@ -442,6 +458,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); let peer_id = @@ -475,6 +492,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10); @@ -508,6 +526,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); // Disconnecting a peer without an inflight request has no effect on persistent states. @@ -557,6 +576,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); let (_peer_id, mut opaque_request) = state_strategy.state_request().unwrap(); @@ -587,6 +607,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); // First request is sent. @@ -602,8 +623,11 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::Continue); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state. state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; @@ -620,8 +644,11 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state. state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default())); @@ -639,8 +666,11 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::Continue); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state . state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; @@ -698,8 +728,11 @@ mod test { // Prepare `StateStrategy`. let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state . state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; @@ -722,8 +755,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + ProtocolName::Static(""), + ); // Unknown block imported. state_strategy.on_blocks_processed( @@ -745,8 +781,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + ProtocolName::Static(""), + ); // Target block imported. state_strategy.on_blocks_processed( @@ -769,8 +808,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + ProtocolName::Static(""), + ); // Target block import failed. state_strategy.on_blocks_processed( @@ -797,8 +839,11 @@ mod test { // Get enough peers for possible spurious requests. let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); state_strategy.on_blocks_processed( 1, diff --git a/substrate/client/network/sync/src/strategy/warp.rs b/substrate/client/network/sync/src/strategy/warp.rs index 99405c2e5f08..cce6a93caf43 100644 --- a/substrate/client/network/sync/src/strategy/warp.rs +++ b/substrate/client/network/sync/src/strategy/warp.rs @@ -26,7 +26,8 @@ use crate::{ LOG_TARGET, }; use codec::{Decode, Encode}; -use log::{debug, error, trace}; +use log::{debug, error, trace, warn}; +use sc_network::ProtocolName; use sc_network_common::sync::message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock, }; @@ -188,7 +189,11 @@ struct Peer { /// Action that should be performed on [`WarpSync`]'s behalf. pub enum WarpSyncAction { /// Send warp proof request to peer. - SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest }, + SendWarpProofRequest { + peer_id: PeerId, + protocol_name: ProtocolName, + request: WarpProofRequest, + }, /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, /// Disconnect and report peer. @@ -211,6 +216,7 @@ pub struct WarpSync { total_state_bytes: u64, peers: HashMap>, disconnected_peers: DisconnectedPeers, + protocol_name: Option, actions: Vec>, result: Option>, } @@ -223,7 +229,11 @@ where /// Create a new instance. When passing a warp sync provider we will be checking for proof and /// authorities. Alternatively we can pass a target block when we want to skip downloading /// proofs, in this case we will continue polling until the target block is known. - pub fn new(client: Arc, warp_sync_config: WarpSyncConfig) -> Self { + pub fn new( + client: Arc, + warp_sync_config: WarpSyncConfig, + protocol_name: Option, + ) -> Self { if client.info().finalized_state.is_some() { error!( target: LOG_TARGET, @@ -236,6 +246,7 @@ where total_state_bytes: 0, peers: HashMap::new(), disconnected_peers: DisconnectedPeers::new(), + protocol_name, actions: vec![WarpSyncAction::Finished], result: None, } @@ -254,6 +265,7 @@ where total_state_bytes: 0, peers: HashMap::new(), disconnected_peers: DisconnectedPeers::new(), + protocol_name, actions: Vec::new(), result: None, } @@ -469,7 +481,7 @@ where } /// Produce warp proof request. - fn warp_proof_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { + fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest)> { let Phase::WarpProof { last_hash, .. } = &self.phase else { return None }; // Copy `last_hash` early to cut the borrowing tie. @@ -487,7 +499,17 @@ where let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?; trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}."); - Some((peer_id, WarpProofRequest { begin })) + let request = WarpProofRequest { begin }; + + let Some(protocol_name) = self.protocol_name.clone() else { + warn!( + target: LOG_TARGET, + "Trying to send warp sync request when no protocol is configured {request:?}", + ); + return None; + }; + + Some((peer_id, protocol_name, request)) } /// Produce target block request. @@ -585,10 +607,10 @@ where /// Get actions that should be performed by the owner on [`WarpSync`]'s behalf #[must_use] pub fn actions(&mut self) -> impl Iterator> { - let warp_proof_request = self - .warp_proof_request() - .into_iter() - .map(|(peer_id, request)| WarpSyncAction::SendWarpProofRequest { peer_id, request }); + let warp_proof_request = + self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| { + WarpSyncAction::SendWarpProofRequest { peer_id, protocol_name, request } + }); self.actions.extend(warp_proof_request); let target_block_request = self @@ -694,7 +716,7 @@ mod test { let client = mock_client_with_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Warp sync instantly finishes let actions = warp_sync.actions().collect::>(); @@ -715,7 +737,7 @@ mod test { Default::default(), Default::default(), )); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Warp sync instantly finishes let actions = warp_sync.actions().collect::>(); @@ -731,7 +753,7 @@ mod test { let client = mock_client_without_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // No actions are emitted. assert_eq!(warp_sync.actions().count(), 0) @@ -747,7 +769,7 @@ mod test { Default::default(), Default::default(), )); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // No actions are emitted. assert_eq!(warp_sync.actions().count(), 0) @@ -762,7 +784,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Warp sync is not started when there is not enough peers. for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) { @@ -780,7 +802,7 @@ mod test { let client = mock_client_without_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none()); } @@ -804,7 +826,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); for best_number in 1..11 { warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); @@ -825,7 +847,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); for best_number in 1..11 { warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); @@ -845,7 +867,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); for best_number in 1..11 { warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); @@ -889,7 +911,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -918,7 +940,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -936,7 +958,7 @@ mod test { _ => panic!("Invalid phase."), } - let (_peer_id, request) = warp_sync.warp_proof_request().unwrap(); + let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap(); assert_eq!(request.begin, known_last_hash); } @@ -949,7 +971,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make requests. for best_number in 1..11 { @@ -976,7 +998,7 @@ mod test { Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure"))) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1017,7 +1039,7 @@ mod test { Ok(VerificationResult::Partial(set_id, authorities, Hash::random())) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1061,7 +1083,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1094,7 +1116,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1129,7 +1151,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1161,7 +1183,7 @@ mod test { .block; let target_header = target_block.header().clone(); let config = WarpSyncConfig::WithTarget(target_header); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1201,7 +1223,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1239,7 +1261,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1293,7 +1315,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1370,7 +1392,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1423,7 +1445,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index f84f353fb4a0..0f73e3194baa 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -66,8 +66,12 @@ use sc_network_sync::{ block_request_handler::BlockRequestHandler, service::{network::NetworkServiceProvider, syncing_service::SyncingService}, state_request_handler::StateRequestHandler, - strategy::warp::{ - AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncConfig, WarpSyncProvider, + strategy::{ + warp::{ + AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncConfig, + WarpSyncProvider, + }, + PolkadotSyncingStrategy, SyncingConfig, }, warp_request_handler, }; @@ -905,6 +909,24 @@ pub trait TestNetFactory: Default + Sized + Send { ::Hash, >>::register_notification_metrics(None); + let syncing_config = SyncingConfig { + mode: network_config.sync_mode, + max_parallel_downloads: network_config.max_parallel_downloads, + max_blocks_per_request: network_config.max_blocks_per_request, + metrics_registry: None, + state_request_protocol_name: state_request_protocol_config.name.clone(), + }; + // Initialize syncing strategy. + let syncing_strategy = Box::new( + PolkadotSyncingStrategy::new( + syncing_config, + client.clone(), + Some(warp_sync_config), + Some(warp_protocol_config.name.clone()), + ) + .unwrap(), + ); + let (engine, sync_service, block_announce_config) = sc_network_sync::engine::SyncingEngine::new( Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), @@ -915,12 +937,10 @@ pub trait TestNetFactory: Default + Sized + Send { protocol_id.clone(), &fork_id, block_announce_validator, - Some(warp_sync_config), + syncing_strategy, chain_sync_network_handle, import_queue.service(), block_relay_params.downloader, - state_request_protocol_config.name.clone(), - Some(warp_protocol_config.name.clone()), peer_store_handle.clone(), ) .unwrap(); diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index a5cee97531ca..ad2d1d9ec24d 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -34,6 +34,7 @@ use sc_network_sync::{ engine::SyncingEngine, service::network::{NetworkServiceHandle, NetworkServiceProvider}, state_request_handler::StateRequestHandler, + strategy::{PolkadotSyncingStrategy, SyncingConfig}, }; use sp_blockchain::HeaderBackend; use sp_runtime::traits::{Block as BlockT, Zero}; @@ -202,6 +203,18 @@ impl TestNetworkBuilder { let peer_store_handle: Arc = Arc::new(peer_store.handle()); tokio::spawn(peer_store.run().boxed()); + let syncing_config = SyncingConfig { + mode: network_config.sync_mode, + max_parallel_downloads: network_config.max_parallel_downloads, + max_blocks_per_request: network_config.max_blocks_per_request, + metrics_registry: None, + state_request_protocol_name: state_request_protocol_config.name.clone(), + }; + // Initialize syncing strategy. + let syncing_strategy = Box::new( + PolkadotSyncingStrategy::new(syncing_config, client.clone(), None, None).unwrap(), + ); + let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), client.clone(), @@ -211,12 +224,10 @@ impl TestNetworkBuilder { protocol_id.clone(), &None, Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), - None, + syncing_strategy, chain_sync_network_handle, import_queue.service(), block_relay_params.downloader, - state_request_protocol_config.name.clone(), - None, Arc::clone(&peer_store_handle), ) .unwrap(); diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 28a76847ac06..f27b7ec6fbad 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -42,7 +42,7 @@ use sc_executor::{ }; use sc_keystore::LocalKeystore; use sc_network::{ - config::{FullNetworkConfiguration, SyncMode}, + config::{FullNetworkConfiguration, ProtocolId, SyncMode}, multiaddr::Protocol, service::{ traits::{PeerStore, RequestResponseConfig}, @@ -53,10 +53,14 @@ use sc_network::{ use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_relay_protocol::BlockRelayParams, block_request_handler::BlockRequestHandler, - engine::SyncingEngine, service::network::NetworkServiceProvider, + block_relay_protocol::BlockRelayParams, + block_request_handler::BlockRequestHandler, + engine::SyncingEngine, + service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler, - warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, WarpSyncConfig, + strategy::{PolkadotSyncingStrategy, SyncingConfig, SyncingStrategy}, + warp_request_handler::RequestHandler as WarpSyncRequestHandler, + SyncingService, WarpSyncConfig, }; use sc_rpc::{ author::AuthorApiServer, @@ -777,65 +781,63 @@ where } /// Parameters to pass into `build_network`. -pub struct BuildNetworkParams< - 'a, - TBl: BlockT, - TNet: NetworkBackend::Hash>, - TExPool, - TImpQu, - TCl, -> { +pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client> +where + Block: BlockT, + Net: NetworkBackend::Hash>, +{ /// The service configuration. pub config: &'a Configuration, /// Full network configuration. - pub net_config: FullNetworkConfiguration::Hash, TNet>, + pub net_config: FullNetworkConfiguration::Hash, Net>, /// A shared client returned by `new_full_parts`. - pub client: Arc, + pub client: Arc, /// A shared transaction pool. - pub transaction_pool: Arc, + pub transaction_pool: Arc, /// A handle for spawning tasks. pub spawn_handle: SpawnTaskHandle, /// An import queue. - pub import_queue: TImpQu, + pub import_queue: IQ, /// A block announce validator builder. - pub block_announce_validator_builder: - Option) -> Box + Send> + Send>>, - /// Optional warp sync config. - pub warp_sync_config: Option>, + pub block_announce_validator_builder: Option< + Box) -> Box + Send> + Send>, + >, + /// Syncing strategy to use in syncing engine. + pub syncing_strategy: Box>, /// User specified block relay params. If not specified, the default /// block request handler will be used. - pub block_relay: Option>, + pub block_relay: Option>, /// Metrics. pub metrics: NotificationMetrics, } /// Build the network service, the network status sinks and an RPC sender. -pub fn build_network( - params: BuildNetworkParams, +pub fn build_network( + params: BuildNetworkParams, ) -> Result< ( Arc, - TracingUnboundedSender>, - sc_network_transactions::TransactionsHandlerController<::Hash>, + TracingUnboundedSender>, + sc_network_transactions::TransactionsHandlerController<::Hash>, NetworkStarter, - Arc>, + Arc>, ), Error, > where - TBl: BlockT, - TCl: ProvideRuntimeApi - + HeaderMetadata - + Chain - + BlockBackend - + BlockIdTo - + ProofProvider - + HeaderBackend - + BlockchainEvents + Block: BlockT, + Client: ProvideRuntimeApi + + HeaderMetadata + + Chain + + BlockBackend + + BlockIdTo + + ProofProvider + + HeaderBackend + + BlockchainEvents + 'static, - TExPool: TransactionPool::Hash> + 'static, - TImpQu: ImportQueue + 'static, - TNet: NetworkBackend::Hash>, + TxPool: TransactionPool::Hash> + 'static, + IQ: ImportQueue + 'static, + Net: NetworkBackend::Hash>, { let BuildNetworkParams { config, @@ -845,30 +847,13 @@ where spawn_handle, import_queue, block_announce_validator_builder, - warp_sync_config, + syncing_strategy, block_relay, metrics, } = params; - if warp_sync_config.is_none() && config.network.sync_mode.is_warp() { - return Err("Warp sync enabled, but no warp sync provider configured.".into()) - } - - if client.requires_full_sync() { - match config.network.sync_mode { - SyncMode::LightState { .. } => - return Err("Fast sync doesn't work for archive nodes".into()), - SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()), - SyncMode::Full => {}, - } - } - let protocol_id = config.protocol_id(); - let genesis_hash = client - .block_hash(0u32.into()) - .ok() - .flatten() - .expect("Genesis block exists; qed"); + let genesis_hash = client.info().genesis_hash; let block_announce_validator = if let Some(f) = block_announce_validator_builder { f(client.clone()) @@ -882,7 +867,7 @@ where None => { // Custom protocol was not specified, use the default block handler. // Allow both outgoing and incoming requests. - let params = BlockRequestHandler::new::( + let params = BlockRequestHandler::new::( chain_sync_network_handle.clone(), &protocol_id, config.chain_spec.fork_id(), @@ -897,42 +882,9 @@ where block_server.run().await; }); - let (state_request_protocol_config, state_request_protocol_name) = { - let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize + - net_config.network_config.default_peers_set.reserved_nodes.len(); - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = StateRequestHandler::new::( - &protocol_id, - config.chain_spec.fork_id(), - client.clone(), - num_peer_hint, - ); - let config_name = protocol_config.protocol_name().clone(); - - spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); - (protocol_config, config_name) - }; - - let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_config.as_ref() { - Some(WarpSyncConfig::WithProvider(warp_with_provider)) => { - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, TNet>( - protocol_id.clone(), - genesis_hash, - config.chain_spec.fork_id(), - warp_with_provider.clone(), - ); - let config_name = protocol_config.protocol_name().clone(); - - spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); - (Some(protocol_config), Some(config_name)) - }, - _ => (None, None), - }; - let light_client_request_protocol_config = { // Allow both outgoing and incoming requests. - let (handler, protocol_config) = LightClientRequestHandler::new::( + let (handler, protocol_config) = LightClientRequestHandler::new::( &protocol_id, config.chain_spec.fork_id(), client.clone(), @@ -943,15 +895,10 @@ where // install request handlers to `FullNetworkConfiguration` net_config.add_request_response_protocol(block_request_protocol_config); - net_config.add_request_response_protocol(state_request_protocol_config); net_config.add_request_response_protocol(light_client_request_protocol_config); - if let Some(config) = warp_sync_protocol_config { - net_config.add_request_response_protocol(config); - } - let bitswap_config = config.network.ipfs_server.then(|| { - let (handler, config) = TNet::bitswap_server(client.clone()); + let (handler, config) = Net::bitswap_server(client.clone()); spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler); config @@ -960,7 +907,7 @@ where // create transactions protocol and add it to the list of supported protocols of let peer_store_handle = net_config.peer_store_handle(); let (transactions_handler_proto, transactions_config) = - sc_network_transactions::TransactionsHandlerPrototype::new::<_, TBl, TNet>( + sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>( protocol_id.clone(), genesis_hash, config.chain_spec.fork_id(), @@ -983,19 +930,16 @@ where protocol_id.clone(), &config.chain_spec.fork_id().map(ToOwned::to_owned), block_announce_validator, - warp_sync_config, + syncing_strategy, chain_sync_network_handle, import_queue.service(), block_downloader, - state_request_protocol_name, - warp_request_protocol_name, Arc::clone(&peer_store_handle), )?; let sync_service_import_queue = sync_service.clone(); let sync_service = Arc::new(sync_service); - let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); - let network_params = sc_network::config::Params::::Hash, TNet> { + let network_params = sc_network::config::Params::::Hash, Net> { role: config.role, executor: { let spawn_handle = Clone::clone(&spawn_handle); @@ -1005,7 +949,7 @@ where }, network_config: net_config, genesis_hash, - protocol_id: protocol_id.clone(), + protocol_id, fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, @@ -1014,7 +958,7 @@ where }; let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty(); - let network_mut = TNet::new(network_params)?; + let network_mut = Net::new(network_params)?; let network = network_mut.network_service().clone(); let (tx_handler, tx_handler_controller) = transactions_handler_proto.build( @@ -1041,7 +985,7 @@ where spawn_handle.spawn( "system-rpc-handler", Some("networking"), - build_system_rpc_future::<_, _, ::Hash>( + build_system_rpc_future::<_, _, ::Hash>( config.role, network_mut.network_service(), sync_service.clone(), @@ -1051,7 +995,7 @@ where ), ); - let future = build_network_future::<_, _, ::Hash, _>( + let future = build_network_future::<_, _, ::Hash, _>( network_mut, client, sync_service.clone(), @@ -1103,6 +1047,91 @@ where )) } +/// Build standard polkadot syncing strategy +pub fn build_polkadot_syncing_strategy( + protocol_id: ProtocolId, + fork_id: Option<&str>, + net_config: &mut FullNetworkConfiguration::Hash, Net>, + warp_sync_config: Option>, + client: Arc, + spawn_handle: &SpawnTaskHandle, + metrics_registry: Option<&Registry>, +) -> Result>, Error> +where + Block: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, + + Net: NetworkBackend::Hash>, +{ + if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() { + return Err("Warp sync enabled, but no warp sync provider configured.".into()) + } + + if client.requires_full_sync() { + match net_config.network_config.sync_mode { + SyncMode::LightState { .. } => + return Err("Fast sync doesn't work for archive nodes".into()), + SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()), + SyncMode::Full => {}, + } + } + + let genesis_hash = client.info().genesis_hash; + + let (state_request_protocol_config, state_request_protocol_name) = { + let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize + + net_config.network_config.default_peers_set.reserved_nodes.len(); + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = + StateRequestHandler::new::(&protocol_id, fork_id, client.clone(), num_peer_hint); + let config_name = protocol_config.protocol_name().clone(); + + spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); + (protocol_config, config_name) + }; + net_config.add_request_response_protocol(state_request_protocol_config); + + let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() { + Some(WarpSyncConfig::WithProvider(warp_with_provider)) => { + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>( + protocol_id, + genesis_hash, + fork_id, + warp_with_provider.clone(), + ); + let config_name = protocol_config.protocol_name().clone(); + + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + (Some(protocol_config), Some(config_name)) + }, + _ => (None, None), + }; + if let Some(config) = warp_sync_protocol_config { + net_config.add_request_response_protocol(config); + } + + let syncing_config = SyncingConfig { + mode: net_config.network_config.sync_mode, + max_parallel_downloads: net_config.network_config.max_parallel_downloads, + max_blocks_per_request: net_config.network_config.max_blocks_per_request, + metrics_registry: metrics_registry.cloned(), + state_request_protocol_name, + }; + Ok(Box::new(PolkadotSyncingStrategy::new( + syncing_config, + client, + warp_sync_config, + warp_sync_protocol_name, + )?)) +} + /// Object used to start the network. #[must_use] pub struct NetworkStarter(oneshot::Sender<()>); diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index babb76f022f0..b6acdb8ed002 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -59,11 +59,11 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; pub use self::{ builder::{ - build_network, gen_rpc_module, init_telemetry, new_client, new_db_backend, new_full_client, - new_full_parts, new_full_parts_record_import, new_full_parts_with_genesis_builder, - new_wasm_executor, propagate_transaction_notifications, spawn_tasks, BuildNetworkParams, - KeystoreContainer, NetworkStarter, SpawnTasksParams, TFullBackend, TFullCallExecutor, - TFullClient, + build_network, build_polkadot_syncing_strategy, gen_rpc_module, init_telemetry, new_client, + new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import, + new_full_parts_with_genesis_builder, new_wasm_executor, + propagate_transaction_notifications, spawn_tasks, BuildNetworkParams, KeystoreContainer, + NetworkStarter, SpawnTasksParams, TFullBackend, TFullCallExecutor, TFullClient, }, client::{ClientConfig, LocalCallExecutor}, error::Error, diff --git a/templates/minimal/node/src/service.rs b/templates/minimal/node/src/service.rs index a42eb10ccec6..08cd345f1e3e 100644 --- a/templates/minimal/node/src/service.rs +++ b/templates/minimal/node/src/service.rs @@ -15,12 +15,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::cli::Consensus; use futures::FutureExt; use minimal_template_runtime::{interface::OpaqueBlock as Block, RuntimeApi}; use polkadot_sdk::{ sc_client_api::backend::Backend, sc_executor::WasmExecutor, - sc_service::{error::Error as ServiceError, Configuration, TaskManager}, + sc_service::{ + build_polkadot_syncing_strategy, error::Error as ServiceError, Configuration, TaskManager, + }, sc_telemetry::{Telemetry, TelemetryWorker}, sc_transaction_pool_api::OffchainTransactionPoolFactory, sp_runtime::traits::Block as BlockT, @@ -28,8 +31,6 @@ use polkadot_sdk::{ }; use std::sync::Arc; -use crate::cli::Consensus; - type HostFunctions = sp_io::SubstrateHostFunctions; #[docify::export] @@ -120,7 +121,7 @@ pub fn new_full::Ha other: mut telemetry, } = new_partial(&config)?; - let net_config = sc_network::config::FullNetworkConfiguration::< + let mut net_config = sc_network::config::FullNetworkConfiguration::< Block, ::Hash, Network, @@ -132,6 +133,16 @@ pub fn new_full::Ha config.prometheus_config.as_ref().map(|cfg| &cfg.registry), ); + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + None, + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -141,7 +152,7 @@ pub fn new_full::Ha import_queue, net_config, block_announce_validator_builder: None, - warp_sync_config: None, + syncing_strategy, block_relay: None, metrics, })?; diff --git a/templates/solochain/node/src/service.rs b/templates/solochain/node/src/service.rs index 7d37c5ce87f8..2de543235ec8 100644 --- a/templates/solochain/node/src/service.rs +++ b/templates/solochain/node/src/service.rs @@ -4,7 +4,10 @@ use futures::FutureExt; use sc_client_api::{Backend, BlockBackend}; use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; use sc_consensus_grandpa::SharedVoterState; -use sc_service::{error::Error as ServiceError, Configuration, TaskManager, WarpSyncConfig}; +use sc_service::{ + build_polkadot_syncing_strategy, error::Error as ServiceError, Configuration, TaskManager, + WarpSyncConfig, +}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use solochain_template_runtime::{self, apis::RuntimeApi, opaque::Block}; @@ -166,6 +169,16 @@ pub fn new_full< Vec::default(), )); + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + Some(WarpSyncConfig::WithProvider(warp_sync)), + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -175,7 +188,7 @@ pub fn new_full< spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), + syncing_strategy, block_relay: None, metrics, })?;