From 668eb63fadba8b092faa80831fc600ef805d9d71 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 22 Jun 2021 11:32:43 +0200 Subject: [PATCH] Fast sync (#8884) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * State sync * Importing state fixes * Bugfixes * Sync with proof * Status reporting * Unsafe sync mode * Sync test * Cleanup * Apply suggestions from code review Co-authored-by: cheme Co-authored-by: Pierre Krieger * set_genesis_storage * Extract keys from range proof * Detect iter completion * Download and import bodies with fast sync * Replaced meta updates tuple with a struct * Fixed reverting finalized state * Reverted timeout * Typo * Doc * Doc * Fixed light client test * Fixed error handling * Tweaks * More UpdateMeta changes * Rename convert_transaction * Apply suggestions from code review Co-authored-by: Bastian Köcher * Apply suggestions from code review Co-authored-by: Bastian Köcher * Code review suggestions * Fixed count handling Co-authored-by: cheme Co-authored-by: Pierre Krieger Co-authored-by: Bastian Köcher --- Cargo.lock | 5 +- client/api/src/backend.rs | 5 + client/api/src/in_mem.rs | 57 ++- client/api/src/lib.rs | 1 + client/api/src/proof_provider.rs | 27 ++ .../authority-discovery/src/worker/tests.rs | 1 + client/cli/src/arg_enums.rs | 24 + client/cli/src/params/network_params.rs | 9 + client/consensus/aura/src/lib.rs | 6 +- client/consensus/babe/src/lib.rs | 12 +- .../consensus/manual-seal/src/seal_block.rs | 6 +- client/consensus/pow/src/worker.rs | 7 +- client/db/src/bench.rs | 12 + client/db/src/lib.rs | 454 +++++++++++++----- client/db/src/light.rs | 7 +- client/db/src/storage_cache.rs | 22 + client/db/src/utils.rs | 14 +- client/finality-grandpa/src/import.rs | 6 +- client/informant/src/display.rs | 17 +- client/light/src/backend.rs | 26 +- client/network/src/behaviour.rs | 25 +- client/network/src/chain.rs | 1 + client/network/src/config.rs | 30 ++ client/network/src/gossip/tests.rs | 12 + client/network/src/lib.rs | 6 +- client/network/src/protocol.rs | 157 ++++-- client/network/src/protocol/sync.rs | 328 ++++++++++--- client/network/src/protocol/sync/state.rs | 187 ++++++++ client/network/src/schema/api.v1.proto | 25 + client/network/src/service.rs | 14 +- client/network/src/service/tests.rs | 12 + client/network/src/state_request_handler.rs | 246 ++++++++++ client/network/test/src/block_import.rs | 2 + client/network/test/src/lib.rs | 45 +- client/network/test/src/sync.rs | 40 ++ client/service/Cargo.toml | 1 + client/service/src/builder.rs | 22 +- client/service/src/chain_ops/import_blocks.rs | 2 + client/service/src/client/client.rs | 223 ++++++--- client/service/test/src/client/light.rs | 2 +- primitives/blockchain/src/backend.rs | 4 +- primitives/blockchain/src/error.rs | 4 +- .../consensus/common/src/block_import.rs | 65 ++- .../consensus/common/src/import_queue.rs | 18 +- .../common/src/import_queue/basic_queue.rs | 2 + primitives/consensus/common/src/lib.rs | 3 +- primitives/runtime/src/generic/block.rs | 13 + primitives/state-machine/src/backend.rs | 16 + primitives/state-machine/src/lib.rs | 146 +++++- .../src/overlayed_changes/mod.rs | 2 +- .../state-machine/src/proving_backend.rs | 19 + primitives/state-machine/src/trie_backend.rs | 11 + .../state-machine/src/trie_backend_essence.rs | 81 +++- test-utils/client/src/lib.rs | 9 + 54 files changed, 2120 insertions(+), 371 deletions(-) create mode 100644 client/network/src/protocol/sync/state.rs create mode 100644 client/network/src/state_request_handler.rs diff --git a/Cargo.lock b/Cargo.lock index a33cb02f7f0d4..ffcf95820342d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8022,6 +8022,7 @@ dependencies = [ "sp-runtime", "sp-session", "sp-state-machine", + "sp-storage", "sp-tracing", "sp-transaction-pool", "sp-transaction-storage-proof", @@ -10551,9 +10552,9 @@ dependencies = [ [[package]] name = "trie-db" -version = "0.22.3" +version = "0.22.5" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "ec051edf7f0fc9499a2cb0947652cab2148b9d7f61cee7605e312e9f970dacaf" +checksum = "cd81fe0c8bc2b528a51c9d2c31dae4483367a26a723a3c9a4a8120311d7774e3" dependencies = [ "hash-db", "hashbrown", diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 09e9e0cb2e173..1f1ad13067b34 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -41,6 +41,7 @@ use sp_consensus::BlockOrigin; use parking_lot::RwLock; pub use sp_state_machine::Backend as StateBackend; +pub use sp_consensus::ImportedState; use std::marker::PhantomData; /// Extracts the state backend type for the given backend. @@ -161,6 +162,10 @@ pub trait BlockImportOperation { update: TransactionForSB, ) -> sp_blockchain::Result<()>; + /// Set genesis state. If `commit` is `false` the state is saved in memory, but is not written + /// to the database. + fn set_genesis_state(&mut self, storage: Storage, commit: bool) -> sp_blockchain::Result; + /// Inject storage data into the database replacing any existing data. fn reset_storage(&mut self, storage: Storage) -> sp_blockchain::Result; diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index 0d40bb3354cc3..916b830f6189d 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -347,6 +347,11 @@ impl HeaderBackend for Blockchain { genesis_hash: storage.genesis_hash, finalized_hash: storage.finalized_hash, finalized_number: storage.finalized_number, + finalized_state: if storage.finalized_hash != Default::default() { + Some((storage.finalized_hash.clone(), storage.finalized_number)) + } else { + None + }, number_leaves: storage.leaves.count() } } @@ -528,6 +533,32 @@ pub struct BlockImportOperation { set_head: Option>, } +impl BlockImportOperation where + Block::Hash: Ord, +{ + fn apply_storage(&mut self, storage: Storage, commit: bool) -> sp_blockchain::Result { + check_genesis_storage(&storage)?; + + let child_delta = storage.children_default.iter() + .map(|(_storage_key, child_content)| + ( + &child_content.child_info, + child_content.data.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref()))) + ) + ); + + let (root, transaction) = self.old_state.full_storage_root( + storage.top.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref()))), + child_delta, + ); + + if commit { + self.new_state = Some(transaction); + } + Ok(root) + } +} + impl backend::BlockImportOperation for BlockImportOperation where Block::Hash: Ord, { @@ -569,24 +600,12 @@ impl backend::BlockImportOperation for BlockImportOperatio Ok(()) } - fn reset_storage(&mut self, storage: Storage) -> sp_blockchain::Result { - check_genesis_storage(&storage)?; - - let child_delta = storage.children_default.iter() - .map(|(_storage_key, child_content)| - ( - &child_content.child_info, - child_content.data.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref()))) - ) - ); - - let (root, transaction) = self.old_state.full_storage_root( - storage.top.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref()))), - child_delta, - ); + fn set_genesis_state(&mut self, storage: Storage, commit: bool) -> sp_blockchain::Result { + self.apply_storage(storage, commit) + } - self.new_state = Some(transaction); - Ok(root) + fn reset_storage(&mut self, storage: Storage) -> sp_blockchain::Result { + self.apply_storage(storage, true) } fn insert_aux(&mut self, ops: I) -> sp_blockchain::Result<()> @@ -806,12 +825,12 @@ impl backend::RemoteBackend for Backend where Block /// Check that genesis storage is valid. pub fn check_genesis_storage(storage: &Storage) -> sp_blockchain::Result<()> { if storage.top.iter().any(|(k, _)| well_known_keys::is_child_storage_key(k)) { - return Err(sp_blockchain::Error::GenesisInvalid.into()); + return Err(sp_blockchain::Error::InvalidState.into()); } if storage.children_default.keys() .any(|child_key| !well_known_keys::is_child_storage_key(&child_key)) { - return Err(sp_blockchain::Error::GenesisInvalid.into()); + return Err(sp_blockchain::Error::InvalidState.into()); } Ok(()) diff --git a/client/api/src/lib.rs b/client/api/src/lib.rs index f3cef0e36ff47..71cf499f79943 100644 --- a/client/api/src/lib.rs +++ b/client/api/src/lib.rs @@ -41,6 +41,7 @@ pub use proof_provider::*; pub use sp_blockchain::HeaderBackend; pub use sp_state_machine::{StorageProof, ExecutionStrategy}; +pub use sp_storage::{StorageData, StorageKey, PrefixedStorageKey, ChildInfo}; /// Usage Information Provider interface /// diff --git a/client/api/src/proof_provider.rs b/client/api/src/proof_provider.rs index a0dbcf1d1e807..0e9fd5318ba90 100644 --- a/client/api/src/proof_provider.rs +++ b/client/api/src/proof_provider.rs @@ -70,4 +70,31 @@ pub trait ProofProvider { storage_key: Option<&PrefixedStorageKey>, key: &StorageKey, ) -> sp_blockchain::Result>; + + /// Given a `BlockId` iterate over all storage values starting at `start_key` exclusively, + /// building proofs until size limit is reached. Returns combined proof and the number of collected keys. + fn read_proof_collection( + &self, + id: &BlockId, + start_key: &[u8], + size_limit: usize, + ) -> sp_blockchain::Result<(StorageProof, u32)>; + + /// Given a `BlockId` iterate over all storage values starting at `start_key`. + /// Returns collected keys and values. + fn storage_collection( + &self, + id: &BlockId, + start_key: &[u8], + size_limit: usize, + ) -> sp_blockchain::Result, Vec)>>; + + /// Verify read storage proof for a set of keys. + /// Returns collected key-value pairs and a flag indicating if iteration is complete. + fn verify_range_proof( + &self, + root: Block::Hash, + proof: StorageProof, + start_key: &[u8], + ) -> sp_blockchain::Result<(Vec<(Vec, Vec)>, bool)>; } diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index b702cd8c40085..8be23e4840bde 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -69,6 +69,7 @@ impl HeaderBackend for TestApi { finalized_number: Zero::zero(), genesis_hash: Default::default(), number_leaves: Default::default(), + finalized_state: None, } } diff --git a/client/cli/src/arg_enums.rs b/client/cli/src/arg_enums.rs index fb2f8fdbc21d8..1bca67e782a3b 100644 --- a/client/cli/src/arg_enums.rs +++ b/client/cli/src/arg_enums.rs @@ -232,6 +232,30 @@ arg_enum! { } } +arg_enum! { + /// Syncing mode. + #[allow(missing_docs)] + #[derive(Debug, Clone, Copy)] + pub enum SyncMode { + // Full sync. Donwnload end verify all blocks. + Full, + // Download blocks without executing them. Download latest state with proofs. + Fast, + // Download blocks without executing them. Download latest state without proofs. + FastUnsafe, + } +} + +impl Into for SyncMode { + fn into(self) -> sc_network::config::SyncMode { + match self { + SyncMode::Full => sc_network::config::SyncMode::Full, + SyncMode::Fast => sc_network::config::SyncMode::Fast { skip_proofs: false }, + SyncMode::FastUnsafe => sc_network::config::SyncMode::Fast { skip_proofs: true }, + } + } +} + /// Default value for the `--execution-syncing` parameter. pub const DEFAULT_EXECUTION_SYNCING: ExecutionStrategy = ExecutionStrategy::NativeElseWasm; /// Default value for the `--execution-import-block` parameter. diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 7549c76378bea..69f4c9d1ba74b 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use crate::params::node_key_params::NodeKeyParams; +use crate::arg_enums::SyncMode; use sc_network::{ config::{NetworkConfiguration, NodeKeyConfig, NonReservedPeerMode, SetConfig, TransportConfig}, multiaddr::Protocol, @@ -125,6 +126,13 @@ pub struct NetworkParams { /// Join the IPFS network and serve transactions over bitswap protocol. #[structopt(long)] pub ipfs_server: bool, + + /// Blockchain syncing mode. + /// Full - Download and validate full blockchain history (Default). + /// Fast - Download blocks and the latest state only. + /// FastUnsafe - Same as Fast, but do skips downloading state proofs. + #[structopt(long, default_value = "Full")] + pub sync: SyncMode, } impl NetworkParams { @@ -218,6 +226,7 @@ impl NetworkParams { kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths, yamux_window_size: None, ipfs_server: self.ipfs_server, + sync_mode: self.sync.into(), } } } diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 845e920cfc11a..d08ce5dfee259 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -42,7 +42,7 @@ use codec::{Encode, Decode, Codec}; use sp_consensus::{ BlockImport, Environment, Proposer, CanAuthorWith, ForkChoiceStrategy, BlockImportParams, - BlockOrigin, Error as ConsensusError, SelectChain, + BlockOrigin, Error as ConsensusError, SelectChain, StateAction, }; use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider}; use sp_blockchain::{Result as CResult, ProvideCache, HeaderBackend}; @@ -421,7 +421,9 @@ where let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); import_block.post_digests.push(signature_digest_item); import_block.body = Some(body); - import_block.storage_changes = Some(storage_changes); + import_block.state_action = StateAction::ApplyChanges( + sp_consensus::StorageChanges::Changes(storage_changes) + ); import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain); Ok(import_block) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 15d16c91f4304..61b58bf1b5999 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -101,6 +101,7 @@ use sp_consensus::{ import_queue::{BasicQueue, CacheKeyId, DefaultImportQueue, Verifier}, BlockCheckParams, BlockImport, BlockImportParams, BlockOrigin, Environment, Error as ConsensusError, ForkChoiceStrategy, Proposer, SelectChain, SlotData, + StateAction, }; use sp_consensus_babe::inherents::BabeInherentData; use sp_consensus_slots::Slot; @@ -790,7 +791,9 @@ where let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); import_block.post_digests.push(digest_item); import_block.body = Some(body); - import_block.storage_changes = Some(storage_changes); + import_block.state_action = StateAction::ApplyChanges( + sp_consensus::StorageChanges::Changes(storage_changes) + ); import_block.intermediates.insert( Cow::from(INTERMEDIATE_KEY), Box::new(BabeIntermediate:: { epoch_descriptor }) as Box<_>, @@ -1295,7 +1298,12 @@ impl BlockImport for BabeBlockImport return Ok(ImportResult::AlreadyInChain), + Ok(sp_blockchain::BlockStatus::InChain) => { + // When re-importing existing block strip away intermediates. + let _ = block.take_intermediate::>(INTERMEDIATE_KEY)?; + block.fork_choice = Some(ForkChoiceStrategy::Custom(false)); + return self.inner.import_block(block, new_cache).await.map_err(Into::into) + }, Ok(sp_blockchain::BlockStatus::Unknown) => {}, Err(e) => return Err(ConsensusError::ClientImport(e.to_string())), } diff --git a/client/consensus/manual-seal/src/seal_block.rs b/client/consensus/manual-seal/src/seal_block.rs index 6ddd2cb05d498..89da02ac49612 100644 --- a/client/consensus/manual-seal/src/seal_block.rs +++ b/client/consensus/manual-seal/src/seal_block.rs @@ -28,7 +28,7 @@ use futures::prelude::*; use sc_transaction_pool::txpool; use sp_consensus::{ self, BlockImport, Environment, Proposer, ForkChoiceStrategy, - BlockImportParams, BlockOrigin, ImportResult, SelectChain, + BlockImportParams, BlockOrigin, ImportResult, SelectChain, StateAction, }; use sp_blockchain::HeaderBackend; use std::collections::HashMap; @@ -145,7 +145,9 @@ pub async fn seal_block( params.body = Some(body); params.finalized = finalize; params.fork_choice = Some(ForkChoiceStrategy::LongestChain); - params.storage_changes = Some(proposal.storage_changes); + params.state_action = StateAction::ApplyChanges( + sp_consensus::StorageChanges::Changes(proposal.storage_changes) + ); if let Some(digest_provider) = digest_provider { digest_provider.append_block_import(&parent, &mut params, &inherent_data)?; diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs index e5d76592b7fd1..74fbcce81341d 100644 --- a/client/consensus/pow/src/worker.rs +++ b/client/consensus/pow/src/worker.rs @@ -18,7 +18,8 @@ use std::{pin::Pin, time::Duration, collections::HashMap, borrow::Cow}; use sc_client_api::ImportNotifications; -use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, import_queue::BoxBlockImport}; +use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, StorageChanges, + StateAction, import_queue::BoxBlockImport}; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header as HeaderT}, @@ -136,7 +137,9 @@ where let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); import_block.post_digests.push(seal); import_block.body = Some(body); - import_block.storage_changes = Some(build.proposal.storage_changes); + import_block.state_action = StateAction::ApplyChanges( + StorageChanges::Changes(build.proposal.storage_changes) + ); let intermediate = PowIntermediate:: { difficulty: Some(build.metadata.difficulty), diff --git a/client/db/src/bench.rs b/client/db/src/bench.rs index 1f2f46af0079e..470448df76f0b 100644 --- a/client/db/src/bench.rs +++ b/client/db/src/bench.rs @@ -373,6 +373,18 @@ impl StateBackend> for BenchmarkingState { } } + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: F, + allow_missing: bool, + ) -> Result { + self.state.borrow().as_ref().ok_or_else(state_err)? + .apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing) + } + fn apply_to_keys_while bool>( &self, child_info: Option<&ChildInfo>, diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index 38b9d7a7adff4..024f2e5f4e649 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -205,6 +205,17 @@ impl StateBackend> for RefTrackingState { self.state.for_key_values_with_prefix(prefix, f) } + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: F, + allow_missing: bool, + ) -> Result { + self.state.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing) + } + fn apply_to_keys_while bool>( &self, child_info: Option<&ChildInfo>, @@ -387,6 +398,14 @@ impl<'a> sc_state_db::MetaDb for StateMetaDb<'a> { } } +struct MetaUpdate { + pub hash: Block::Hash, + pub number: NumberFor, + pub is_best: bool, + pub is_finalized: bool, + pub with_state: bool, +} + fn cache_header( cache: &mut LinkedHashMap>, hash: Hash, @@ -427,11 +446,9 @@ impl BlockchainDb { fn update_meta( &self, - hash: Block::Hash, - number: ::Number, - is_best: bool, - is_finalized: bool + update: MetaUpdate, ) { + let MetaUpdate { hash, number, is_best, is_finalized, with_state } = update; let mut meta = self.meta.write(); if number.is_zero() { meta.genesis_hash = hash; @@ -444,6 +461,9 @@ impl BlockchainDb { } if is_finalized { + if with_state { + meta.finalized_state = Some((hash.clone(), number)); + } meta.finalized_number = number; meta.finalized_hash = hash; } @@ -484,6 +504,7 @@ impl sc_client_api::blockchain::HeaderBackend for Blockcha genesis_hash: meta.genesis_hash, finalized_hash: meta.finalized_hash, finalized_number: meta.finalized_number, + finalized_state: meta.finalized_state.clone(), number_leaves: self.leaves.read().count(), } } @@ -754,6 +775,42 @@ impl BlockImportOperation { } } } + + fn apply_new_state( + &mut self, + storage: Storage, + ) -> ClientResult { + if storage.top.keys().any(|k| well_known_keys::is_child_storage_key(&k)) { + return Err(sp_blockchain::Error::InvalidState.into()); + } + + let child_delta = storage.children_default.iter().map(|(_storage_key, child_content)|( + &child_content.child_info, + child_content.data.iter().map(|(k, v)| (&k[..], Some(&v[..]))), + )); + + let mut changes_trie_config = None; + let (root, transaction) = self.old_state.full_storage_root( + storage.top.iter().map(|(k, v)| { + if &k[..] == well_known_keys::CHANGES_TRIE_CONFIG { + changes_trie_config = Some(Decode::decode(&mut &v[..])); + } + (&k[..], Some(&v[..])) + }), + child_delta + ); + + let changes_trie_config = match changes_trie_config { + Some(Ok(c)) => Some(c), + Some(Err(_)) => return Err(sp_blockchain::Error::InvalidState.into()), + None => None, + }; + + self.db_updates = transaction; + self.changes_trie_config_update = Some(changes_trie_config); + Ok(root) + } + } impl sc_client_api::backend::BlockImportOperation for BlockImportOperation { @@ -796,35 +853,21 @@ impl sc_client_api::backend::BlockImportOperation for Bloc &mut self, storage: Storage, ) -> ClientResult { - if storage.top.keys().any(|k| well_known_keys::is_child_storage_key(&k)) { - return Err(sp_blockchain::Error::GenesisInvalid.into()); - } - - let child_delta = storage.children_default.iter().map(|(_storage_key, child_content)|( - &child_content.child_info, - child_content.data.iter().map(|(k, v)| (&k[..], Some(&v[..]))), - )); - - let mut changes_trie_config: Option = None; - let (root, transaction) = self.old_state.full_storage_root( - storage.top.iter().map(|(k, v)| { - if &k[..] == well_known_keys::CHANGES_TRIE_CONFIG { - changes_trie_config = Some( - Decode::decode(&mut &v[..]) - .expect("changes trie configuration is encoded properly at genesis") - ); - } - (&k[..], Some(&v[..])) - }), - child_delta - ); - - self.db_updates = transaction; - self.changes_trie_config_update = Some(changes_trie_config); + let root = self.apply_new_state(storage)?; self.commit_state = true; Ok(root) } + fn set_genesis_state( + &mut self, + storage: Storage, + commit: bool, + ) -> ClientResult { + let root = self.apply_new_state(storage)?; + self.commit_state = commit; + Ok(root) + } + fn update_changes_trie( &mut self, update: ChangesTrieTransaction, NumberFor>, @@ -907,18 +950,39 @@ impl sc_state_db::NodeDb for StorageDb { } } -struct DbGenesisStorage(pub Block::Hash); +struct DbGenesisStorage { + root: Block::Hash, + storage: PrefixedMemoryDB>, +} impl DbGenesisStorage { + pub fn new(root: Block::Hash, storage: PrefixedMemoryDB>) -> Self { + DbGenesisStorage { + root, + storage, + } + } +} + +impl sp_state_machine::Storage> for DbGenesisStorage { + fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result, String> { + use hash_db::HashDB; + Ok(self.storage.get(key, prefix)) + } +} + +struct EmptyStorage(pub Block::Hash); + +impl EmptyStorage { pub fn new() -> Self { let mut root = Block::Hash::default(); let mut mdb = MemoryDB::>::default(); sp_state_machine::TrieDBMut::>::new(&mut mdb, &mut root); - DbGenesisStorage(root) + EmptyStorage(root) } } -impl sp_state_machine::Storage> for DbGenesisStorage { +impl sp_state_machine::Storage> for EmptyStorage { fn get(&self, _key: &Block::Hash, _prefix: Prefix) -> Result, String> { Ok(None) } @@ -980,6 +1044,7 @@ pub struct Backend { transaction_storage: TransactionStorageMode, io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>, state_usage: Arc, + genesis_state: RwLock>>>, } impl Backend { @@ -1058,7 +1123,7 @@ impl Backend { }, )?; - Ok(Backend { + let backend = Backend { storage: Arc::new(storage_db), offchain_storage, changes_tries_storage, @@ -1074,7 +1139,24 @@ impl Backend { state_usage: Arc::new(StateUsageStats::new()), keep_blocks: config.keep_blocks.clone(), transaction_storage: config.transaction_storage.clone(), - }) + genesis_state: RwLock::new(None), + }; + + // Older DB versions have no last state key. Check if the state is available and set it. + let info = backend.blockchain.info(); + if info.finalized_state.is_none() + && info.finalized_hash != Default::default() + && sc_client_api::Backend::have_state_at(&backend, &info.finalized_hash, info.finalized_number) + { + backend.blockchain.update_meta(MetaUpdate { + hash: info.finalized_hash, + number: info.finalized_number, + is_best: info.finalized_hash == info.best_hash, + is_finalized: true, + with_state: true, + }); + } + Ok(backend) } /// Handle setting head within a transaction. `route_to` should be the last @@ -1170,10 +1252,11 @@ impl Backend { justification: Option, changes_trie_cache_ops: &mut Option>, finalization_displaced: &mut Option>>, - ) -> ClientResult<(Block::Hash, ::Number, bool, bool)> { + ) -> ClientResult> { // TODO: ensure best chain contains this block. let number = *header.number(); self.ensure_sequential_finalization(header, last_finalized)?; + let with_state = sc_client_api::Backend::have_state_at(self, &hash, number); self.note_finalized( transaction, @@ -1182,6 +1265,7 @@ impl Backend { *hash, changes_trie_cache_ops, finalization_displaced, + with_state, )?; if let Some(justification) = justification { @@ -1191,7 +1275,13 @@ impl Backend { Justifications::from(justification).encode(), ); } - Ok((*hash, number, false, true)) + Ok(MetaUpdate { + hash: *hash, + number, + is_best: false, + is_finalized: true, + with_state, + }) } // performs forced canonicalization with a delay after importing a non-finalized block. @@ -1219,6 +1309,9 @@ impl Backend { )?.expect("existence of block with number `new_canonical` \ implies existence of blocks with all numbers before it; qed") }; + if !sc_client_api::Backend::have_state_at(self, &hash, new_canonical.saturated_into()) { + return Ok(()) + } trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash); let commit = self.storage.state_db.canonicalize_block(&hash) @@ -1240,12 +1333,13 @@ impl Backend { let mut meta_updates = Vec::with_capacity(operation.finalized_blocks.len()); let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash; + let mut last_finalized_num = self.blockchain.meta.read().finalized_number; + let best_num = self.blockchain.meta.read().best_number; let mut changes_trie_cache_ops = None; for (block, justification) in operation.finalized_blocks { let block_hash = self.blockchain.expect_block_hash_from_id(&block)?; let block_header = self.blockchain.expect_header(BlockId::Hash(block_hash))?; - meta_updates.push(self.finalize_block_with_transaction( &mut transaction, &block_hash, @@ -1256,12 +1350,16 @@ impl Backend { &mut finalization_displaced_leaves, )?); last_finalized_hash = block_hash; + last_finalized_num = block_header.number().clone(); } let imported = if let Some(pending_block) = operation.pending_block { + let hash = pending_block.header.hash(); + let parent_hash = *pending_block.header.parent_hash(); let number = pending_block.header.number().clone(); + let existing_header = number <= best_num && self.blockchain.header(BlockId::hash(hash))?.is_some(); // blocks are keyed by number + hash. let lookup_key = utils::number_and_hash_to_lookup_key(number, hash)?; @@ -1296,13 +1394,24 @@ impl Backend { } if number.is_zero() { - transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key); + transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key.clone()); transaction.set(columns::META, meta_keys::GENESIS_HASH, hash.as_ref()); // for tests, because config is set from within the reset_storage if operation.changes_trie_config_update.is_none() { operation.changes_trie_config_update = Some(None); } + + if operation.commit_state { + transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key); + } else { + // When we don't want to commit the genesis state, we still preserve it in memory + // to bootstrap consensus. It is queried for an initial list of authorities, etc. + *self.genesis_state.write() = Some(Arc::new(DbGenesisStorage::new( + pending_block.header.state_root().clone(), + operation.db_updates.clone() + ))); + } } let finalized = if operation.commit_state { @@ -1361,79 +1470,111 @@ impl Backend { changeset, ).map_err(|e: sc_state_db::Error| sp_blockchain::Error::from_state_db(e))?; apply_state_commit(&mut transaction, commit); + if number <= last_finalized_num { + // Canonicalize in the db when re-importing existing blocks with state. + let commit = self.storage.state_db.canonicalize_block(&hash) + .map_err(|e: sc_state_db::Error| sp_blockchain::Error::from_state_db(e))?; + apply_state_commit(&mut transaction, commit); + meta_updates.push(MetaUpdate { + hash, + number, + is_best: false, + is_finalized: true, + with_state: true, + }); + } + // Check if need to finalize. Genesis is always finalized instantly. let finalized = number_u64 == 0 || pending_block.leaf_state.is_final(); finalized } else { - false + number.is_zero() || pending_block.leaf_state.is_final() }; let header = &pending_block.header; let is_best = pending_block.leaf_state.is_best(); let changes_trie_updates = operation.changes_trie_updates; - let changes_trie_config_update = operation.changes_trie_config_update; - changes_trie_cache_ops = Some(self.changes_tries_storage.commit( - &mut transaction, - changes_trie_updates, - cache::ComplexBlockId::new( - *header.parent_hash(), - if number.is_zero() { Zero::zero() } else { number - One::one() }, - ), - cache::ComplexBlockId::new(hash, number), - header, - finalized, - changes_trie_config_update, - changes_trie_cache_ops, - )?); - self.state_usage.merge_sm(operation.old_state.usage_info()); - // release state reference so that it can be finalized - let cache = operation.old_state.into_cache_changes(); - - if finalized { - // TODO: ensure best chain contains this block. - self.ensure_sequential_finalization(header, Some(last_finalized_hash))?; - self.note_finalized( + debug!(target: "db", + "DB Commit {:?} ({}), best={}, state={}, existing={}", + hash, number, is_best, operation.commit_state, existing_header, + ); + + if !existing_header { + let changes_trie_config_update = operation.changes_trie_config_update; + changes_trie_cache_ops = Some(self.changes_tries_storage.commit( &mut transaction, - true, + changes_trie_updates, + cache::ComplexBlockId::new( + *header.parent_hash(), + if number.is_zero() { Zero::zero() } else { number - One::one() }, + ), + cache::ComplexBlockId::new(hash, number), header, - hash, - &mut changes_trie_cache_ops, - &mut finalization_displaced_leaves, - )?; - } else { - // canonicalize blocks which are old enough, regardless of finality. - self.force_delayed_canonicalize(&mut transaction, hash, *header.number())? - } + finalized, + changes_trie_config_update, + changes_trie_cache_ops, + )?); + + self.state_usage.merge_sm(operation.old_state.usage_info()); + // release state reference so that it can be finalized + let cache = operation.old_state.into_cache_changes(); + + if finalized { + // TODO: ensure best chain contains this block. + self.ensure_sequential_finalization(header, Some(last_finalized_hash))?; + self.note_finalized( + &mut transaction, + true, + header, + hash, + &mut changes_trie_cache_ops, + &mut finalization_displaced_leaves, + operation.commit_state, + )?; + } else { + // canonicalize blocks which are old enough, regardless of finality. + self.force_delayed_canonicalize(&mut transaction, hash, *header.number())? + } - debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best); - let displaced_leaf = { - let mut leaves = self.blockchain.leaves.write(); - let displaced_leaf = leaves.import(hash, number, parent_hash); - leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX); + let displaced_leaf = { + let mut leaves = self.blockchain.leaves.write(); + let displaced_leaf = leaves.import(hash, number, parent_hash); + leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX); - displaced_leaf - }; + displaced_leaf + }; - let mut children = children::read_children( - &*self.storage.db, - columns::META, - meta_keys::CHILDREN_PREFIX, - parent_hash, - )?; - children.push(hash); - children::write_children( - &mut transaction, - columns::META, - meta_keys::CHILDREN_PREFIX, - parent_hash, - children, - ); + let mut children = children::read_children( + &*self.storage.db, + columns::META, + meta_keys::CHILDREN_PREFIX, + parent_hash, + )?; + if !children.contains(&hash) { + children.push(hash); + } + children::write_children( + &mut transaction, + columns::META, + meta_keys::CHILDREN_PREFIX, + parent_hash, + children, + ); - meta_updates.push((hash, number, pending_block.leaf_state.is_best(), finalized)); + meta_updates.push(MetaUpdate { + hash, + number, + is_best: pending_block.leaf_state.is_best(), + is_finalized: finalized, + with_state: operation.commit_state, + }); - Some((pending_block.header, number, hash, enacted, retracted, displaced_leaf, is_best, cache)) + Some((pending_block.header, number, hash, enacted, retracted, displaced_leaf, is_best, cache)) + } else { + None + } } else { None }; @@ -1448,7 +1589,13 @@ impl Backend { hash.clone(), (number.clone(), hash.clone()) )?; - meta_updates.push((hash, *number, true, false)); + meta_updates.push(MetaUpdate { + hash, + number: *number, + is_best: true, + is_finalized: false, + with_state: false, + }); Some((enacted, retracted)) } else { return Err(sp_blockchain::Error::UnknownBlock(format!("Cannot set head {:?}", set_head))) @@ -1472,6 +1619,7 @@ impl Backend { is_best, mut cache, )) = imported { + trace!(target: "db", "DB Commit done {:?}", hash); let header_metadata = CachedHeaderMetadata::from(&header); self.blockchain.insert_header_metadata( header_metadata.hash, @@ -1498,8 +1646,8 @@ impl Backend { self.shared_cache.lock().sync(&enacted, &retracted); } - for (hash, number, is_best, is_finalized) in meta_updates { - self.blockchain.update_meta(hash, number, is_best, is_finalized); + for m in meta_updates { + self.blockchain.update_meta(m); } Ok(()) @@ -1515,29 +1663,35 @@ impl Backend { f_header: &Block::Header, f_hash: Block::Hash, changes_trie_cache_ops: &mut Option>, - displaced: &mut Option>> + displaced: &mut Option>>, + with_state: bool, ) -> ClientResult<()> { let f_num = f_header.number().clone(); - if self.storage.state_db.best_canonical().map(|c| f_num.saturated_into::() > c).unwrap_or(true) { - let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone())?; - transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key); + let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone())?; + if with_state { + transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key.clone()); + } + transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key); + if sc_client_api::Backend::have_state_at(self, &f_hash, f_num) && + self.storage.state_db.best_canonical().map(|c| f_num.saturated_into::() > c).unwrap_or(true) + { let commit = self.storage.state_db.canonicalize_block(&f_hash) .map_err(|e: sc_state_db::Error| sp_blockchain::Error::from_state_db(e))?; apply_state_commit(transaction, commit); + } - if !f_num.is_zero() { - let new_changes_trie_cache_ops = self.changes_tries_storage.finalize( - transaction, - *f_header.parent_hash(), - f_hash, - f_num, - if is_inserted { Some(&f_header) } else { None }, - changes_trie_cache_ops.take(), - )?; - *changes_trie_cache_ops = Some(new_changes_trie_cache_ops); - } + if !f_num.is_zero() { + let new_changes_trie_cache_ops = self.changes_tries_storage.finalize( + transaction, + *f_header.parent_hash(), + f_hash, + f_num, + if is_inserted { Some(&f_header) } else { None }, + changes_trie_cache_ops.take(), + )?; + *changes_trie_cache_ops = Some(new_changes_trie_cache_ops); } let new_displaced = self.blockchain.leaves.write().finalize_height(f_num); @@ -1628,6 +1782,23 @@ impl Backend { } Ok(()) } + + fn empty_state(&self) -> ClientResult, Block>> { + let root = EmptyStorage::::new().0; // Empty trie + let db_state = DbState::::new(self.storage.clone(), root); + let state = RefTrackingState::new(db_state, self.storage.clone(), None); + let caching_state = CachingState::new( + state, + self.shared_cache.clone(), + None, + ); + Ok(SyncingCachingState::new( + caching_state, + self.state_usage.clone(), + self.blockchain.meta.clone(), + self.import_lock.clone(), + )) + } } @@ -1737,7 +1908,7 @@ impl sc_client_api::backend::Backend for Backend { type OffchainStorage = offchain::LocalStorage; fn begin_operation(&self) -> ClientResult { - let mut old_state = self.state_at(BlockId::Hash(Default::default()))?; + let mut old_state = self.empty_state()?; old_state.disable_syncing(); Ok(BlockImportOperation { @@ -1763,7 +1934,11 @@ impl sc_client_api::backend::Backend for Backend { operation: &mut Self::BlockImportOperation, block: BlockId, ) -> ClientResult<()> { - operation.old_state = self.state_at(block)?; + if block.is_pre_genesis() { + operation.old_state = self.empty_state()?; + } else { + operation.old_state = self.state_at(block)?; + } operation.old_state.disable_syncing(); operation.commit_state = true; @@ -1800,7 +1975,7 @@ impl sc_client_api::backend::Backend for Backend { let mut displaced = None; let mut changes_trie_cache_ops = None; - let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction( + let m = self.finalize_block_with_transaction( &mut transaction, &hash, &header, @@ -1810,7 +1985,7 @@ impl sc_client_api::backend::Backend for Backend { &mut displaced, )?; self.storage.db.commit(transaction)?; - self.blockchain.update_meta(hash, number, is_best, is_finalized); + self.blockchain.update_meta(m); self.changes_tries_storage.post_commit(changes_trie_cache_ops); Ok(()) } @@ -1967,14 +2142,36 @@ impl sc_client_api::backend::Backend for Backend { meta_keys::FINALIZED_BLOCK, key.clone() ); + reverted_finalized.insert(removed_hash); + if let Some((hash, _)) = self.blockchain.info().finalized_state { + if hash == best_hash { + if !best_number.is_zero() + && self.have_state_at(&prev_hash, best_number - One::one()) + { + let lookup_key = utils::number_and_hash_to_lookup_key( + best_number - One::one(), + prev_hash + )?; + transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key); + } else { + transaction.remove(columns::META, meta_keys::FINALIZED_STATE); + } + } + } } transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, key); transaction.remove(columns::KEY_LOOKUP, removed.hash().as_ref()); children::remove_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, best_hash); self.storage.db.commit(transaction)?; self.changes_tries_storage.post_commit(Some(changes_trie_cache_ops)); - self.blockchain.update_meta(best_hash, best_number, true, update_finalized); + self.blockchain.update_meta(MetaUpdate { + hash: best_hash, + number: best_number, + is_best: true, + is_finalized: update_finalized, + with_state: false + }); } None => return Ok(c.saturated_into::>()) } @@ -2061,26 +2258,30 @@ impl sc_client_api::backend::Backend for Backend { fn state_at(&self, block: BlockId) -> ClientResult { use sc_client_api::blockchain::HeaderBackend as BcHeaderBackend; - // special case for genesis initialization - match block { - BlockId::Hash(h) if h == Default::default() => { - let genesis_storage = DbGenesisStorage::::new(); - let root = genesis_storage.0.clone(); - let db_state = DbState::::new(Arc::new(genesis_storage), root); + let is_genesis = match &block { + BlockId::Number(n) if n.is_zero() => true, + BlockId::Hash(h) if h == &self.blockchain.meta.read().genesis_hash => true, + _ => false, + }; + if is_genesis { + if let Some(genesis_state) = &*self.genesis_state.read() { + let root = genesis_state.root.clone(); + let db_state = DbState::::new(genesis_state.clone(), root); let state = RefTrackingState::new(db_state, self.storage.clone(), None); let caching_state = CachingState::new( state, self.shared_cache.clone(), None, ); - return Ok(SyncingCachingState::new( + let mut state = SyncingCachingState::new( caching_state, self.state_usage.clone(), self.blockchain.meta.clone(), self.import_lock.clone(), - )); - }, - _ => {} + ); + state.disable_syncing(); + return Ok(state) + } } let hash = match block { @@ -2305,7 +2506,6 @@ pub(crate) mod tests { let db = Backend::::new_test(2, 0); let hash = { let mut op = db.begin_operation().unwrap(); - db.begin_state_operation(&mut op, BlockId::Hash(Default::default())).unwrap(); let mut header = Header { number: 0, parent_hash: Default::default(), diff --git a/client/db/src/light.rs b/client/db/src/light.rs index bf24197c5b5d9..4e61a9c2ee03d 100644 --- a/client/db/src/light.rs +++ b/client/db/src/light.rs @@ -151,9 +151,14 @@ impl BlockchainHeaderBackend for LightStorage BlockchainInfo { best_hash: meta.best_hash, best_number: meta.best_number, - genesis_hash: meta.genesis_hash, + genesis_hash: meta.genesis_hash.clone(), finalized_hash: meta.finalized_hash, finalized_number: meta.finalized_number, + finalized_state: if meta.finalized_hash != Default::default() { + Some((meta.genesis_hash, Zero::zero())) + } else { + None + }, number_leaves: 1, } } diff --git a/client/db/src/storage_cache.rs b/client/db/src/storage_cache.rs index 788e011fb2f05..9934cccd155a1 100644 --- a/client/db/src/storage_cache.rs +++ b/client/db/src/storage_cache.rs @@ -605,6 +605,17 @@ impl>, B: BlockT> StateBackend> for Cachin self.state.exists_child_storage(child_info, key) } + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: F, + allow_missing: bool, + ) -> Result { + self.state.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing) + } + fn apply_to_keys_while bool>( &self, child_info: Option<&ChildInfo>, @@ -788,6 +799,17 @@ impl>, B: BlockT> StateBackend> for Syncin self.caching_state().exists_child_storage(child_info, key) } + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: F, + allow_missing: bool, + ) -> Result { + self.caching_state().apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing) + } + fn apply_to_keys_while bool>( &self, child_info: Option<&ChildInfo>, diff --git a/client/db/src/utils.rs b/client/db/src/utils.rs index 7f82cb8489121..bd6dc9841aa63 100644 --- a/client/db/src/utils.rs +++ b/client/db/src/utils.rs @@ -49,6 +49,8 @@ pub mod meta_keys { pub const BEST_BLOCK: &[u8; 4] = b"best"; /// Last finalized block key. pub const FINALIZED_BLOCK: &[u8; 5] = b"final"; + /// Last finalized state key. + pub const FINALIZED_STATE: &[u8; 6] = b"fstate"; /// Meta information prefix for list-based caches. pub const CACHE_META_PREFIX: &[u8; 5] = b"cache"; /// Meta information for changes tries key. @@ -74,6 +76,8 @@ pub struct Meta { pub finalized_number: N, /// Hash of the genesis block. pub genesis_hash: H, + /// Finalized state, if any + pub finalized_state: Option<(H, N)>, } /// A block lookup key: used for canonical lookup from block number to hash @@ -391,6 +395,7 @@ pub fn read_meta(db: &dyn Database, col_header: u32) -> Result< finalized_hash: Default::default(), finalized_number: Zero::zero(), genesis_hash: Default::default(), + finalized_state: None, }), }; @@ -408,12 +413,18 @@ pub fn read_meta(db: &dyn Database, col_header: u32) -> Result< ); Ok((hash, *header.number())) } else { - Ok((genesis_hash.clone(), Zero::zero())) + Ok((Default::default(), Zero::zero())) } }; let (best_hash, best_number) = load_meta_block("best", meta_keys::BEST_BLOCK)?; let (finalized_hash, finalized_number) = load_meta_block("final", meta_keys::FINALIZED_BLOCK)?; + let (finalized_state_hash, finalized_state_number) = load_meta_block("final_state", meta_keys::FINALIZED_STATE)?; + let finalized_state = if finalized_state_hash != Default::default() { + Some((finalized_state_hash, finalized_state_number)) + } else { + None + }; Ok(Meta { best_hash, @@ -421,6 +432,7 @@ pub fn read_meta(db: &dyn Database, col_header: u32) -> Result< finalized_hash, finalized_number, genesis_hash, + finalized_state, }) } diff --git a/client/finality-grandpa/src/import.rs b/client/finality-grandpa/src/import.rs index 481f38b617eaf..c287cc0b3b896 100644 --- a/client/finality-grandpa/src/import.rs +++ b/client/finality-grandpa/src/import.rs @@ -456,7 +456,11 @@ where // early exit if block already in chain, otherwise the check for // authority changes will error when trying to re-import a change block match self.inner.status(BlockId::Hash(hash)) { - Ok(BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain), + Ok(BlockStatus::InChain) => { + // Strip justifications when re-importing an existing block. + let _justifications = block.justifications.take(); + return (&*self.inner).import_block(block, new_cache).await + } Ok(BlockStatus::Unknown) => {}, Err(e) => return Err(ConsensusError::ClientImport(e.to_string())), } diff --git a/client/informant/src/display.rs b/client/informant/src/display.rs index 0caef4e5fbae8..00c2116fac60a 100644 --- a/client/informant/src/display.rs +++ b/client/informant/src/display.rs @@ -93,10 +93,19 @@ impl InformantDisplay { (diff_bytes_inbound, diff_bytes_outbound) }; - let (level, status, target) = match (net_status.sync_state, net_status.best_seen_block) { - (SyncState::Idle, _) => ("💤", "Idle".into(), "".into()), - (SyncState::Downloading, None) => ("⚙️ ", format!("Preparing{}", speed), "".into()), - (SyncState::Downloading, Some(n)) => ( + let (level, status, target) = match ( + net_status.sync_state, + net_status.best_seen_block, + net_status.state_sync + ) { + (_, _, Some(state)) => ( + "⚙️ ", + "Downloading state".into(), + format!(", {}%, ({:.2}) Mib", state.percentage, (state.size as f32) / (1024f32 * 1024f32)), + ), + (SyncState::Idle, _, _) => ("💤", "Idle".into(), "".into()), + (SyncState::Downloading, None, _) => ("⚙️ ", format!("Preparing{}", speed), "".into()), + (SyncState::Downloading, Some(n), None) => ( "⚙️ ", format!("Syncing{}", speed), format!(", target=#{}", n), diff --git a/client/light/src/backend.rs b/client/light/src/backend.rs index a7f1b8e0c1696..3e53d3b81cc77 100644 --- a/client/light/src/backend.rs +++ b/client/light/src/backend.rs @@ -321,7 +321,7 @@ impl BlockImportOperation for ImportOperation Ok(()) } - fn reset_storage(&mut self, input: Storage) -> ClientResult { + fn set_genesis_state(&mut self, input: Storage, commit: bool) -> ClientResult { check_genesis_storage(&input)?; // changes trie configuration @@ -347,11 +347,17 @@ impl BlockImportOperation for ImportOperation let storage_update = InMemoryBackend::from(storage); let (storage_root, _) = storage_update.full_storage_root(std::iter::empty(), child_delta); - self.storage_update = Some(storage_update); + if commit { + self.storage_update = Some(storage_update); + } Ok(storage_root) } + fn reset_storage(&mut self, _input: Storage) -> ClientResult { + Err(ClientError::NotAvailableOnLightClient) + } + fn insert_aux(&mut self, ops: I) -> ClientResult<()> where I: IntoIterator, Option>)> { @@ -461,6 +467,22 @@ impl StateBackend for GenesisOrUnavailableState } } + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + action: A, + allow_missing: bool, + ) -> ClientResult { + match *self { + GenesisOrUnavailableState::Genesis(ref state) => + Ok(state.apply_to_key_values_while(child_info, prefix, start_at, action, allow_missing) + .expect(IN_MEMORY_EXPECT_PROOF)), + GenesisOrUnavailableState::Unavailable => Err(ClientError::NotAvailableOnLightClient), + } + } + fn apply_to_keys_while bool>( &self, child_info: Option<&ChildInfo>, diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 17c38b6f95456..576c49d1da366 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -79,6 +79,11 @@ pub struct Behaviour { /// [`request_responses::RequestResponsesBehaviour`]. #[behaviour(ignore)] block_request_protocol_name: String, + + /// Protocol name used to send out state requests via + /// [`request_responses::RequestResponsesBehaviour`]. + #[behaviour(ignore)] + state_request_protocol_name: String, } /// Event generated by `Behaviour`. @@ -186,6 +191,7 @@ impl Behaviour { light_client_request_sender: light_client_requests::sender::LightClientRequestSender, disco_config: DiscoveryConfig, block_request_protocol_config: request_responses::ProtocolConfig, + state_request_protocol_config: request_responses::ProtocolConfig, bitswap: Option>, light_client_request_protocol_config: request_responses::ProtocolConfig, // All remaining request protocol configs. @@ -193,7 +199,9 @@ impl Behaviour { ) -> Result { // Extract protocol name and add to `request_response_protocols`. let block_request_protocol_name = block_request_protocol_config.name.to_string(); + let state_request_protocol_name = state_request_protocol_config.name.to_string(); request_response_protocols.push(block_request_protocol_config); + request_response_protocols.push(state_request_protocol_config); request_response_protocols.push(light_client_request_protocol_config); @@ -206,8 +214,8 @@ impl Behaviour { request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, light_client_request_sender, events: VecDeque::new(), - block_request_protocol_name, + state_request_protocol_name, }) } @@ -329,6 +337,21 @@ Behaviour { &target, &self.block_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError, ); }, + CustomMessageOutcome::StateRequest { target, request, pending_response } => { + let mut buf = Vec::with_capacity(request.encoded_len()); + if let Err(err) = request.encode(&mut buf) { + log::warn!( + target: "sync", + "Failed to encode state request {:?}: {:?}", + request, err + ); + return + } + + self.request_responses.send_request( + &target, &self.state_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError, + ); + }, CustomMessageOutcome::NotificationStreamOpened { remote, protocol, negotiated_fallback, roles, notifications_sink } => { diff --git a/client/network/src/chain.rs b/client/network/src/chain.rs index 081d4b0d3ac3d..32d4cc9ff024f 100644 --- a/client/network/src/chain.rs +++ b/client/network/src/chain.rs @@ -21,6 +21,7 @@ use sp_blockchain::{Error, HeaderBackend, HeaderMetadata}; use sc_client_api::{BlockBackend, ProofProvider}; use sp_runtime::traits::{Block as BlockT, BlockIdTo}; +pub use sc_client_api::{StorageKey, StorageData, ImportedState}; /// Local client abstraction for the network. pub trait Client: HeaderBackend + ProofProvider + BlockIdTo diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 4942d1b0fb878..36ae1e831b8ce 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -123,6 +123,15 @@ pub struct Params { /// [`crate::light_client_requests::handler::LightClientRequestHandler::new`] allowing /// both outgoing and incoming requests. pub light_client_request_protocol_config: RequestResponseConfig, + + /// Request response configuration for the state request protocol. + /// + /// Can be constructed either via + /// [`crate::state_requests::generate_protocol_config`] allowing outgoing but not + /// incoming requests, or constructed via + /// [`crate::state_requests::handler::StateRequestHandler::new`] allowing + /// both outgoing and incoming requests. + pub state_request_protocol_config: RequestResponseConfig, } /// Role of the local node. @@ -373,6 +382,24 @@ impl From for ParseErr { } } +#[derive(Clone, Debug, Eq, PartialEq)] +/// Sync operation mode. +pub enum SyncMode { + /// Full block download and verification. + Full, + /// Download blocks and the latest state. + Fast { + /// Skip state proof download and verification. + skip_proofs: bool + }, +} + +impl Default for SyncMode { + fn default() -> Self { + SyncMode::Full + } +} + /// Network service configuration. #[derive(Clone, Debug)] pub struct NetworkConfiguration { @@ -400,6 +427,8 @@ pub struct NetworkConfiguration { pub transport: TransportConfig, /// Maximum number of peers to ask the same blocks in parallel. pub max_parallel_downloads: u32, + /// Initial syncing mode. + pub sync_mode: SyncMode, /// True if Kademlia random discovery should be enabled. /// @@ -462,6 +491,7 @@ impl NetworkConfiguration { wasm_external_transport: None, }, max_parallel_downloads: 5, + sync_mode: SyncMode::Full, enable_dht_random_walk: true, allow_non_globals_in_dht: false, kademlia_disjoint_query_paths: false, diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs index 19ac002aac869..bdef28f9bebe5 100644 --- a/client/network/src/gossip/tests.rs +++ b/client/network/src/gossip/tests.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use crate::block_request_handler::BlockRequestHandler; +use crate::state_request_handler::StateRequestHandler; use crate::light_client_requests::handler::LightClientRequestHandler; use crate::gossip::QueuedSender; use crate::{config, Event, NetworkService, NetworkWorker}; @@ -107,6 +108,16 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) protocol_config }; + let state_request_protocol_config = { + let (handler, protocol_config) = StateRequestHandler::new( + &protocol_id, + client.clone(), + 50, + ); + async_std::task::spawn(handler.run().boxed()); + protocol_config + }; + let light_client_request_protocol_config = { let (handler, protocol_config) = LightClientRequestHandler::new( &protocol_id, @@ -131,6 +142,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) ), metrics_registry: None, block_request_protocol_config, + state_request_protocol_config, light_client_request_protocol_config, }) .unwrap(); diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 556e71da23831..11e235bb81ae7 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -260,6 +260,7 @@ mod utils; pub mod block_request_handler; pub mod bitswap; pub mod light_client_requests; +pub mod state_request_handler; pub mod config; pub mod error; pub mod gossip; @@ -268,7 +269,8 @@ pub mod transactions; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; -pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo}; +pub use protocol::{event::{DhtEvent, Event, ObservedRole}, PeerInfo}; +pub use protocol::sync::{SyncState, StateDownloadProgress}; pub use service::{ NetworkService, NetworkWorker, RequestFailure, OutboundFailure, NotificationSender, NotificationSenderReady, IfDisconnected, @@ -321,4 +323,6 @@ pub struct NetworkStatus { pub total_bytes_inbound: u64, /// The total number of bytes sent. pub total_bytes_outbound: u64, + /// State sync in progress. + pub state_sync: Option, } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index a3a490e097780..b9a189a0f384f 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -22,6 +22,7 @@ use crate::{ error, request_responses::RequestFailure, utils::{interval, LruHashSet}, + schema::v1::StateResponse, }; use bytes::Bytes; @@ -49,7 +50,7 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub}, }; use sp_arithmetic::traits::SaturatedConversion; -use sync::{ChainSync, SyncState}; +use sync::{ChainSync, Status as SyncStatus}; use std::borrow::Cow; use std::convert::TryFrom as _; use std::collections::{HashMap, HashSet, VecDeque}; @@ -179,13 +180,19 @@ pub struct Protocol { block_announce_data_cache: lru::LruCache>, } +#[derive(Debug)] +enum PeerRequest { + Block(message::BlockRequest), + State, +} + /// Peer information #[derive(Debug)] struct Peer { info: PeerInfo, - /// Current block request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`]. - block_request: Option<( - message::BlockRequest, + /// Current request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`]. + request: Option<( + PeerRequest, oneshot::Receiver, RequestFailure>>, )>, /// Holds a set of blocks known to this peer. @@ -210,6 +217,21 @@ pub struct ProtocolConfig { pub roles: Roles, /// Maximum number of peers to ask the same blocks in parallel. pub max_parallel_downloads: u32, + /// Enable state sync. + pub sync_mode: config::SyncMode, +} + +impl ProtocolConfig { + fn sync_mode(&self) -> sync::SyncMode { + if self.roles.is_light() { + sync::SyncMode::Light + } else { + match self.sync_mode { + config::SyncMode::Full => sync::SyncMode::Full, + config::SyncMode::Fast { skip_proofs } => sync::SyncMode::LightState { skip_proofs }, + } + } + } } impl Default for ProtocolConfig { @@ -217,6 +239,7 @@ impl Default for ProtocolConfig { ProtocolConfig { roles: Roles::FULL, max_parallel_downloads: 5, + sync_mode: config::SyncMode::Full, } } } @@ -263,12 +286,11 @@ impl Protocol { ) -> error::Result<(Protocol, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); let sync = ChainSync::new( - config.roles, + config.sync_mode(), chain.clone(), - &info, block_announce_validator, config.max_parallel_downloads, - ); + ).map_err(Box::new)?; let boot_node_ids = { let mut list = HashSet::new(); @@ -454,13 +476,13 @@ impl Protocol { pub fn num_active_peers(&self) -> usize { self.peers .values() - .filter(|p| p.block_request.is_some()) + .filter(|p| p.request.is_some()) .count() } /// Current global sync state. - pub fn sync_state(&self) -> SyncState { - self.sync.status().state + pub fn sync_state(&self) -> SyncStatus { + self.sync.status() } /// Target sync block number. @@ -656,6 +678,27 @@ impl Protocol { } } + /// Must be called in response to a [`CustomMessageOutcome::StateRequest`] being emitted. + /// Must contain the same `PeerId` and request that have been emitted. + pub fn on_state_response( + &mut self, + peer_id: PeerId, + response: StateResponse, + ) -> CustomMessageOutcome { + match self.sync.on_state_data(&peer_id, response) { + Ok(sync::OnStateData::Import(origin, block)) => + CustomMessageOutcome::BlockImport(origin, vec![block]), + Ok(sync::OnStateData::Request(peer, req)) => { + prepare_state_request::(&mut self.peers, peer, req) + } + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } + } + } + /// Perform time based maintenance. /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. @@ -736,7 +779,7 @@ impl Protocol { best_hash: status.best_hash, best_number: status.best_number }, - block_request: None, + request: None, known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) .expect("Constant is nonzero")), }; @@ -1137,7 +1180,7 @@ fn prepare_block_request( let (tx, rx) = oneshot::channel(); if let Some(ref mut peer) = peers.get_mut(&who) { - peer.block_request = Some((request.clone(), rx)); + peer.request = Some((PeerRequest::Block(request.clone()), rx)); } let request = crate::schema::v1::BlockRequest { @@ -1161,6 +1204,23 @@ fn prepare_block_request( } } +fn prepare_state_request( + peers: &mut HashMap>, + who: PeerId, + request: crate::schema::v1::StateRequest, +) -> CustomMessageOutcome { + let (tx, rx) = oneshot::channel(); + + if let Some(ref mut peer) = peers.get_mut(&who) { + peer.request = Some((PeerRequest::State, rx)); + } + CustomMessageOutcome::StateRequest { + target: who, + request: request, + pending_response: tx, + } +} + /// Outcome of an incoming custom message. #[derive(Debug)] #[must_use] @@ -1192,6 +1252,12 @@ pub enum CustomMessageOutcome { request: crate::schema::v1::BlockRequest, pending_response: oneshot::Sender, RequestFailure>>, }, + /// A new storage request must be emitted. + StateRequest { + target: PeerId, + request: crate::schema::v1::StateRequest, + pending_response: oneshot::Sender, RequestFailure>>, + }, /// Peer has a reported a new head of chain. PeerNewBest(PeerId, NumberFor), /// Now connected to a new peer for syncing purposes. @@ -1254,27 +1320,54 @@ impl NetworkBehaviour for Protocol { // Check for finished outgoing requests. let mut finished_block_requests = Vec::new(); + let mut finished_state_requests = Vec::new(); for (id, peer) in self.peers.iter_mut() { - if let Peer { block_request: Some((_, pending_response)), .. } = peer { + if let Peer { request: Some((_, pending_response)), .. } = peer { match pending_response.poll_unpin(cx) { Poll::Ready(Ok(Ok(resp))) => { - let (req, _) = peer.block_request.take().unwrap(); + let (req, _) = peer.request.take().unwrap(); + match req { + PeerRequest::Block(req) => { + let protobuf_response = match crate::schema::v1::BlockResponse::decode(&resp[..]) { + Ok(proto) => proto, + Err(e) => { + debug!( + target: "sync", + "Failed to decode block response from peer {:?}: {:?}.", + id, + e + ); + self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE); + self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); + continue; + } + }; - let protobuf_response = match crate::schema::v1::BlockResponse::decode(&resp[..]) { - Ok(proto) => proto, - Err(e) => { - debug!(target: "sync", "Failed to decode block request to peer {:?}: {:?}.", id, e); - self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE); - self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - continue; - } - }; + finished_block_requests.push((id.clone(), req, protobuf_response)); + }, + PeerRequest::State => { + let protobuf_response = match crate::schema::v1::StateResponse::decode(&resp[..]) { + Ok(proto) => proto, + Err(e) => { + debug!( + target: "sync", + "Failed to decode state response from peer {:?}: {:?}.", + id, + e + ); + self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE); + self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); + continue; + } + }; - finished_block_requests.push((id.clone(), req, protobuf_response)); + finished_state_requests.push((id.clone(), protobuf_response)); + }, + } }, Poll::Ready(Ok(Err(e))) => { - peer.block_request.take(); - debug!(target: "sync", "Block request to peer {:?} failed: {:?}.", id, e); + peer.request.take(); + debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e); match e { RequestFailure::Network(OutboundFailure::Timeout) => { @@ -1309,10 +1402,10 @@ impl NetworkBehaviour for Protocol { } }, Poll::Ready(Err(oneshot::Canceled)) => { - peer.block_request.take(); + peer.request.take(); trace!( target: "sync", - "Block request to peer {:?} failed due to oneshot being canceled.", + "Request to peer {:?} failed due to oneshot being canceled.", id, ); self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); @@ -1325,6 +1418,10 @@ impl NetworkBehaviour for Protocol { let ev = self.on_block_response(id, req, protobuf_response); self.pending_messages.push_back(ev); } + for (id, protobuf_response) in finished_state_requests { + let ev = self.on_state_response(id, protobuf_response); + self.pending_messages.push_back(ev); + } while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) { self.tick(); @@ -1334,6 +1431,10 @@ impl NetworkBehaviour for Protocol { let event = prepare_block_request(&mut self.peers, id.clone(), request); self.pending_messages.push_back(event); } + if let Some((id, request)) = self.sync.state_request() { + let event = prepare_state_request(&mut self.peers, id, request); + self.pending_messages.push_back(event); + } for (id, request) in self.sync.justification_requests() { let event = prepare_block_request(&mut self.peers, id, request); self.pending_messages.push_back(event); diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 7b7ac721b5b47..82df21fe9d044 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -31,14 +31,16 @@ use codec::Encode; use blocks::BlockCollection; -use sp_blockchain::{Error as ClientError, Info as BlockchainInfo, HeaderMetadata}; +use state::StateSync; +use sp_blockchain::{Error as ClientError, HeaderMetadata}; use sp_consensus::{BlockOrigin, BlockStatus, block_validation::{BlockAnnounceValidator, Validation}, import_queue::{IncomingBlock, BlockImportResult, BlockImportError} }; use crate::protocol::message::{ - self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse, Roles, + self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse, }; +use crate::schema::v1::{StateResponse, StateRequest}; use either::Either; use extra_requests::ExtraRequests; use libp2p::PeerId; @@ -59,6 +61,7 @@ use futures::{task::Poll, Future, stream::FuturesUnordered, FutureExt, StreamExt mod blocks; mod extra_requests; +mod state; /// Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; @@ -84,6 +87,9 @@ const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256; /// See [`MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS`] for more information. const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4; +/// Pick the state to sync as the latest finalized number minus this. +const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8; + /// We use a heuristic that with a high likelihood, by the time /// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same /// chain as (or at least closer to) the peer so we want to delay @@ -183,11 +189,8 @@ pub struct ChainSync { best_queued_number: NumberFor, /// The best block hash in our queue of blocks to import best_queued_hash: B::Hash, - /// The role of this node, e.g. light or full - role: Roles, - /// What block attributes we require for this node, usually derived from - /// what role we are, but could be customized - required_block_attributes: message::BlockAttributes, + /// Current mode (full/light) + mode: SyncMode, /// Any extra justification requests. extra_justifications: ExtraRequests, /// A set of hashes of blocks that are being downloaded or have been @@ -209,6 +212,11 @@ pub struct ChainSync { >, /// Stats per peer about the number of concurrent block announce validations. block_announce_validation_per_peer_stats: HashMap, + /// State sync in progress, if any. + state_sync: Option>, + /// Enable importing existing blocks. This is used used after the state download to + /// catch up to the latest state while re-importing blocks. + import_existing: bool, } /// All the data we have about a Peer that we are trying to sync with @@ -281,6 +289,8 @@ pub enum PeerSyncState { DownloadingStale(B::Hash), /// Downloading justification for given block hash. DownloadingJustification(B::Hash), + /// Downloading state. + DownloadingState, } impl PeerSyncState { @@ -298,6 +308,15 @@ pub enum SyncState { Downloading } +/// Reported state download progress. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct StateDownloadProgress { + /// Estimated download percentage. + pub percentage: u32, + /// Total state size in bytes downloaded so far. + pub size: u64, +} + /// Syncing status and statistics. #[derive(Clone)] pub struct Status { @@ -309,6 +328,8 @@ pub struct Status { pub num_peers: u32, /// Number of blocks queued for import pub queued_blocks: u32, + /// State sync status in progress, if any. + pub state_sync: Option, } /// A peer did not behave as expected and should be reported. @@ -344,6 +365,15 @@ impl OnBlockData { } } +/// Result of [`ChainSync::on_state_data`]. +#[derive(Debug)] +pub enum OnStateData { + /// The block and state that should be imported. + Import(BlockOrigin, IncomingBlock), + /// A new state request needs to be made to the given peer. + Request(PeerId, StateRequest) +} + /// Result of [`ChainSync::poll_block_announce_validation`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum PollBlockAnnounceValidation { @@ -429,6 +459,20 @@ pub enum OnBlockJustification { } } + +/// Operation mode. +#[derive(Debug, PartialEq, Eq)] +pub enum SyncMode { + // Sync headers only + Light, + // Sync headers and block bodies + Full, + // Sync headers and the last finalied state + LightState { + skip_proofs: bool + }, +} + /// Result of [`ChainSync::has_slot_for_block_announce_validation`]. enum HasSlotForBlockAnnounceValidation { /// Yes, there is a slot for the block announce validation. @@ -442,27 +486,19 @@ enum HasSlotForBlockAnnounceValidation { impl ChainSync { /// Create a new instance. pub fn new( - role: Roles, + mode: SyncMode, client: Arc>, - info: &BlockchainInfo, block_announce_validator: Box + Send>, max_parallel_downloads: u32, - ) -> Self { - let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION; - - if role.is_full() { - required_block_attributes |= BlockAttributes::BODY - } - - ChainSync { + ) -> Result { + let mut sync = ChainSync { client, peers: HashMap::new(), blocks: BlockCollection::new(), - best_queued_hash: info.best_hash, - best_queued_number: info.best_number, + best_queued_hash: Default::default(), + best_queued_number: Zero::zero(), extra_justifications: ExtraRequests::new("justification"), - role, - required_block_attributes, + mode, queue_blocks: Default::default(), fork_targets: Default::default(), pending_requests: Default::default(), @@ -471,6 +507,27 @@ impl ChainSync { downloaded_blocks: 0, block_announce_validation: Default::default(), block_announce_validation_per_peer_stats: Default::default(), + state_sync: None, + import_existing: false, + }; + sync.reset_sync_start_point()?; + Ok(sync) + } + + fn required_block_attributes(&self) -> BlockAttributes { + match self.mode { + SyncMode::Full => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, + SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, + SyncMode::LightState { .. } => + BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, + } + } + + fn skip_execution(&self) -> bool { + match self.mode { + SyncMode::Full => false, + SyncMode::Light => true, + SyncMode::LightState { .. } => true, } } @@ -502,6 +559,7 @@ impl ChainSync { best_seen_block: best_seen, num_peers: self.peers.len() as u32, queued_blocks: self.queue_blocks.len() as u32, + state_sync: self.state_sync.as_ref().map(|s| s.progress()), } } @@ -607,7 +665,7 @@ impl ChainSync { ); self.peers.insert(who.clone(), PeerSync { peer_id: who.clone(), - common_number: best_number, + common_number: std::cmp::min(self.best_queued_number, best_number), best_hash, best_number, state: PeerSyncState::Available, @@ -718,7 +776,7 @@ impl ChainSync { /// Get an iterator over all block requests of all peers. pub fn block_requests(&mut self) -> impl Iterator)> + '_ { - if self.pending_requests.is_empty() { + if self.pending_requests.is_empty() || self.state_sync.is_some() { return Either::Left(std::iter::empty()) } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { @@ -726,10 +784,10 @@ impl ChainSync { return Either::Left(std::iter::empty()) } let major_sync = self.status().state == SyncState::Downloading; + let attrs = self.required_block_attributes(); let blocks = &mut self.blocks; - let attrs = &self.required_block_attributes; let fork_targets = &mut self.fork_targets; - let last_finalized = self.client.info().finalized_number; + let last_finalized = std::cmp::min(self.best_queued_number, self.client.info().finalized_number); let best_queued = self.best_queued_number; let client = &self.client; let queue = &self.queue_blocks; @@ -804,6 +862,28 @@ impl ChainSync { Either::Right(iter) } + /// Get a state request, if any + pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> { + if let Some(sync) = &self.state_sync { + if sync.is_complete() { + return None; + } + if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) { + // Only one pending state request is allowed. + return None; + } + for (id, peer) in self.peers.iter_mut() { + if peer.state.is_available() && peer.common_number >= sync.target_block_num() { + trace!(target: "sync", "New StateRequest for {}", id); + peer.state = PeerSyncState::DownloadingState; + let request = sync.next_request(); + return Some((id.clone(), request)) + } + } + } + None + } + /// Handle a response from the remote to a block request that we made. /// /// `request` must be the original request that triggered `response`. @@ -848,7 +928,9 @@ impl ChainSync { justifications, origin: block_data.origin, allow_missing_state: true, - import_existing: false, + import_existing: self.import_existing, + skip_execution: self.skip_execution(), + state: None, } }).collect() } @@ -870,7 +952,9 @@ impl ChainSync { justifications, origin: Some(who.clone()), allow_missing_state: true, - import_existing: false, + import_existing: self.import_existing, + skip_execution: self.skip_execution(), + state: None, } }).collect() } @@ -963,10 +1047,11 @@ impl ChainSync { peer.state = PeerSyncState::Available; Vec::new() } - } - - | PeerSyncState::Available - | PeerSyncState::DownloadingJustification(..) => Vec::new() + }, + PeerSyncState::Available + | PeerSyncState::DownloadingJustification(..) + | PeerSyncState::DownloadingState + => Vec::new() } } else { // When request.is_none() this is a block announcement. Just accept blocks. @@ -983,6 +1068,8 @@ impl ChainSync { origin: Some(who.clone()), allow_missing_state: true, import_existing: false, + skip_execution: true, + state: None, } }).collect() } @@ -994,6 +1081,60 @@ impl ChainSync { Ok(self.validate_and_queue_blocks(new_blocks)) } + /// Handle a response from the remote to a state request that we made. + /// + /// Returns next request if any. + pub fn on_state_data( + &mut self, + who: &PeerId, + response: StateResponse, + ) -> Result, BadPeer> { + let import_result = if let Some(sync) = &mut self.state_sync { + debug!( + target: "sync", + "Importing state data from {} with {} keys, {} proof nodes.", + who, + response.entries.len(), + response.proof.len(), + ); + sync.import(response) + } else { + debug!(target: "sync", "Ignored obsolete state response from {}", who); + return Err(BadPeer(who.clone(), rep::NOT_REQUESTED)); + }; + + match import_result { + state::ImportResult::Import(hash, header, state) => { + let origin = if self.status().state != SyncState::Downloading { + BlockOrigin::NetworkBroadcast + } else { + BlockOrigin::NetworkInitialSync + }; + + let block = IncomingBlock { + hash, + header: Some(header), + body: None, + justifications: None, + origin: None, + allow_missing_state: true, + import_existing: true, + skip_execution: self.skip_execution(), + state: Some(state), + }; + debug!(target: "sync", "State sync is complete. Import is queued"); + Ok(OnStateData::Import(origin, block)) + } + state::ImportResult::Continue(request) => { + Ok(OnStateData::Request(who.clone(), request)) + } + state::ImportResult::BadResponse => { + debug!(target: "sync", "Bad state data received from {}", who); + Err(BadPeer(who.clone(), rep::BAD_BLOCK)) + } + } + } + fn validate_and_queue_blocks( &mut self, mut new_blocks: Vec>, @@ -1048,7 +1189,7 @@ impl ChainSync { // We only request one justification at a time let justification = if let Some(block) = response.blocks.into_iter().next() { if hash != block.hash { - info!( + warn!( target: "sync", "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", who, hash, block.hash ); @@ -1137,7 +1278,7 @@ impl ChainSync { if aux.bad_justification { if let Some(ref peer) = who { - info!("💔 Sent block with bad justification to import"); + warn!("💔 Sent block with bad justification to import"); output.push(Err(BadPeer(peer.clone(), rep::BAD_JUSTIFICATION))); } } @@ -1145,6 +1286,17 @@ impl ChainSync { if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) { peer.update_common_number(number); } + let state_sync_complete = self.state_sync.as_ref().map_or(false, |s| s.target() == hash); + if state_sync_complete { + info!( + target: "sync", + "State sync is complete ({} MiB), restarting block sync.", + self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), + ); + self.state_sync = None; + self.mode = SyncMode::Full; + output.extend(self.restart()); + } }, Err(BlockImportError::IncompleteHeader(who)) => { if let Some(peer) = who { @@ -1171,7 +1323,7 @@ impl ChainSync { }, Err(BlockImportError::BadBlock(who)) => { if let Some(peer) = who { - info!( + warn!( target: "sync", "💔 Block {:?} received from peer {} has been blacklisted", hash, @@ -1189,6 +1341,7 @@ impl ChainSync { e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { warn!(target: "sync", "💔 Error importing block {:?}: {:?}", hash, e); + self.state_sync = None; output.extend(self.restart()); }, Err(BlockImportError::Cancelled) => {} @@ -1214,6 +1367,29 @@ impl ChainSync { is_descendent_of(&**client, base, block) }); + if let SyncMode::LightState { skip_proofs } = &self.mode { + if self.state_sync.is_none() + && !self.peers.is_empty() + && self.queue_blocks.is_empty() + { + // Finalized a recent block. + let mut heads: Vec<_> = self.peers.iter().map(|(_, peer)| peer.best_number).collect(); + heads.sort(); + let median = heads[heads.len() / 2]; + if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { + if let Ok(Some(header)) = self.client.header(BlockId::hash(hash.clone())) { + log::debug!( + target: "sync", + "Starting state sync for #{} ({})", + number, + hash, + ); + self.state_sync = Some(StateSync::new(self.client.clone(), header, *skip_proofs)); + } + } + } + } + if let Err(err) = r { warn!( target: "sync", @@ -1536,7 +1712,7 @@ impl ChainSync { return PollBlockAnnounceValidation::Nothing { is_best, who, announce } } - let requires_additional_data = !self.role.is_light() || !known_parent; + let requires_additional_data = self.mode != SyncMode::Light || !known_parent; if !requires_additional_data { trace!( target: "sync", @@ -1595,6 +1771,8 @@ impl ChainSync { origin: block_data.origin, allow_missing_state: true, import_existing: false, + skip_execution: self.skip_execution(), + state: None, } }).collect(); if !blocks.is_empty() { @@ -1611,9 +1789,9 @@ impl ChainSync { &'a mut self, ) -> impl Iterator), BadPeer>> + 'a { self.blocks.clear(); - let info = self.client.info(); - self.best_queued_hash = info.best_hash; - self.best_queued_number = info.best_number; + if let Err(e) = self.reset_sync_start_point() { + warn!(target: "sync", "💔 Unable to restart sync. :{:?}", e); + } self.pending_requests.set_all(); debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); let old_peers = std::mem::take(&mut self.peers); @@ -1624,7 +1802,7 @@ impl ChainSync { match p.state { PeerSyncState::DownloadingJustification(_) => { // We make sure our commmon number is at least something we have. - p.common_number = info.best_number; + p.common_number = self.best_queued_number; self.peers.insert(id, p); return None; } @@ -1640,6 +1818,38 @@ impl ChainSync { }) } + /// Find a block to start sync from. If we sync with state, that's the latest block we have state for. + fn reset_sync_start_point(&mut self) -> Result<(), ClientError> { + let info = self.client.info(); + if matches!(self.mode, SyncMode::LightState {..}) && info.finalized_state.is_some() { + log::warn!( + target: "sync", + "Can't use fast sync mode with a partially synced database. Reverting to full sync mode." + ); + self.mode = SyncMode::Full; + } + self.import_existing = false; + self.best_queued_hash = info.best_hash; + self.best_queued_number = info.best_number; + if self.mode == SyncMode::Full { + if self.client.block_status(&BlockId::hash(info.best_hash))? != BlockStatus::InChainWithState { + self.import_existing = true; + // Latest state is missing, start with the last finalized state or genesis instead. + if let Some((hash, number)) = info.finalized_state { + log::debug!(target: "sync", "Starting from finalized state #{}", number); + self.best_queued_hash = hash; + self.best_queued_number = number; + } else { + log::debug!(target: "sync", "Restarting from genesis"); + self.best_queued_hash = Default::default(); + self.best_queued_number = Zero::zero(); + } + } + } + log::trace!(target: "sync", "Restarted sync at #{} ({:?})", self.best_queued_number, self.best_queued_hash); + Ok(()) + } + /// What is the status of the block corresponding to the given hash? fn block_status(&self, hash: &B::Hash) -> Result { if self.queue_blocks.contains(hash) { @@ -1764,7 +1974,7 @@ fn peer_block_request( id: &PeerId, peer: &PeerSync, blocks: &mut BlockCollection, - attrs: &message::BlockAttributes, + attrs: message::BlockAttributes, max_parallel_downloads: u32, finalized: NumberFor, best_num: NumberFor, @@ -1815,7 +2025,7 @@ fn fork_sync_request( targets: &mut HashMap>, best_num: NumberFor, finalized: NumberFor, - attributes: &message::BlockAttributes, + attributes: message::BlockAttributes, check_block: impl Fn(&B::Hash) -> BlockStatus, ) -> Option<(B::Hash, BlockRequest)> { targets.retain(|hash, r| { @@ -1994,17 +2204,15 @@ mod test { // internally we should process the response as the justification not being available. let client = Arc::new(TestClientBuilder::new().build()); - let info = client.info(); let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let peer_id = PeerId::random(); let mut sync = ChainSync::new( - Roles::AUTHORITY, + SyncMode::Full, client.clone(), - &info, block_announce_validator, 1, - ); + ).unwrap(); let (a1_hash, a1_number) = { let a1 = client.new_block(Default::default()).unwrap().build().unwrap().block; @@ -2067,15 +2275,12 @@ mod test { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); - let info = client.info(); - let mut sync = ChainSync::new( - Roles::AUTHORITY, + SyncMode::Full, client.clone(), - &info, Box::new(DefaultBlockAnnounceValidator), 1, - ); + ).unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -2242,15 +2447,13 @@ mod test { sp_tracing::try_init_simple(); let mut client = Arc::new(TestClientBuilder::new().build()); - let info = client.info(); let mut sync = ChainSync::new( - Roles::AUTHORITY, + SyncMode::Full, client.clone(), - &info, Box::new(DefaultBlockAnnounceValidator), 5, - ); + ).unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -2359,12 +2562,11 @@ mod test { let info = client.info(); let mut sync = ChainSync::new( - Roles::AUTHORITY, + SyncMode::Full, client.clone(), - &info, Box::new(DefaultBlockAnnounceValidator), 5, - ); + ).unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -2481,12 +2683,11 @@ mod test { let info = client.info(); let mut sync = ChainSync::new( - Roles::AUTHORITY, + SyncMode::Full, client.clone(), - &info, Box::new(DefaultBlockAnnounceValidator), 5, - ); + ).unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -2592,15 +2793,12 @@ mod test { .map(|_| build_block(&mut client, None, false)) .collect::>(); - let info = client.info(); - let mut sync = ChainSync::new( - Roles::AUTHORITY, + SyncMode::Full, client.clone(), - &info, Box::new(DefaultBlockAnnounceValidator), 1, - ); + ).unwrap(); let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); diff --git a/client/network/src/protocol/sync/state.rs b/client/network/src/protocol/sync/state.rs new file mode 100644 index 0000000000000..fc9dfdbb8c376 --- /dev/null +++ b/client/network/src/protocol/sync/state.rs @@ -0,0 +1,187 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::sync::Arc; +use codec::{Encode, Decode}; +use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; +use sc_client_api::StorageProof; +use crate::schema::v1::{StateRequest, StateResponse, StateEntry}; +use crate::chain::{Client, ImportedState}; +use super::StateDownloadProgress; + +/// State sync support. + +/// State sync state machine. Accumulates partial state data until it +/// is ready to be imported. +pub struct StateSync { + target_block: B::Hash, + target_header: B::Header, + target_root: B::Hash, + last_key: Vec, + state: Vec<(Vec, Vec)>, + complete: bool, + client: Arc>, + imported_bytes: u64, + skip_proof: bool, +} + +/// Import state chunk result. +pub enum ImportResult { + /// State is complete and ready for import. + Import(B::Hash, B::Header, ImportedState), + /// Continue dowloading. + Continue(StateRequest), + /// Bad state chunk. + BadResponse, +} + +impl StateSync { + /// Create a new instance. + pub fn new(client: Arc>, target: B::Header, skip_proof: bool) -> Self { + StateSync { + client, + target_block: target.hash(), + target_root: target.state_root().clone(), + target_header: target, + last_key: Vec::default(), + state: Vec::default(), + complete: false, + imported_bytes: 0, + skip_proof, + } + } + + /// Validate and import a state reponse. + pub fn import(&mut self, response: StateResponse) -> ImportResult { + if response.entries.is_empty() && response.proof.is_empty() && !response.complete { + log::debug!( + target: "sync", + "Bad state response", + ); + return ImportResult::BadResponse; + } + if !self.skip_proof && response.proof.is_empty() { + log::debug!( + target: "sync", + "Missing proof", + ); + return ImportResult::BadResponse; + } + let complete = if !self.skip_proof { + log::debug!( + target: "sync", + "Importing state from {} trie nodes", + response.proof.len(), + ); + let proof_size = response.proof.len() as u64; + let proof = match StorageProof::decode(&mut response.proof.as_ref()) { + Ok(proof) => proof, + Err(e) => { + log::debug!(target: "sync", "Error decoding proof: {:?}", e); + return ImportResult::BadResponse; + } + }; + let (values, complete) = match self.client.verify_range_proof( + self.target_root, + proof, + &self.last_key + ) { + Err(e) => { + log::debug!( + target: "sync", + "StateResponse failed proof verification: {:?}", + e, + ); + return ImportResult::BadResponse; + }, + Ok(values) => values, + }; + log::debug!(target: "sync", "Imported with {} keys", values.len()); + + if let Some(last) = values.last().map(|(k, _)| k) { + self.last_key = last.clone(); + } + + for (key, value) in values { + self.imported_bytes += key.len() as u64; + self.state.push((key, value)) + }; + self.imported_bytes += proof_size; + complete + } else { + log::debug!( + target: "sync", + "Importing state from {:?} to {:?}", + response.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), + response.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), + ); + + if let Some(e) = response.entries.last() { + self.last_key = e.key.clone(); + } + for StateEntry { key, value } in response.entries { + self.imported_bytes += (key.len() + value.len()) as u64; + self.state.push((key, value)) + } + response.complete + }; + if complete { + self.complete = true; + ImportResult::Import(self.target_block.clone(), self.target_header.clone(), ImportedState { + block: self.target_block.clone(), + state: std::mem::take(&mut self.state) + }) + } else { + ImportResult::Continue(self.next_request()) + } + } + + /// Produce next state request. + pub fn next_request(&self) -> StateRequest { + StateRequest { + block: self.target_block.encode(), + start: self.last_key.clone(), + no_proof: self.skip_proof, + } + } + + /// Check if the state is complete. + pub fn is_complete(&self) -> bool { + self.complete + } + + /// Returns target block number. + pub fn target_block_num(&self) -> NumberFor { + self.target_header.number().clone() + } + + /// Returns target block hash. + pub fn target(&self) -> B::Hash { + self.target_block.clone() + } + + /// Returns state sync estimated progress. + pub fn progress(&self) -> StateDownloadProgress { + let percent_done = (*self.last_key.get(0).unwrap_or(&0u8) as u32) * 100 / 256; + StateDownloadProgress { + percentage: percent_done, + size: self.imported_bytes, + } + } +} + diff --git a/client/network/src/schema/api.v1.proto b/client/network/src/schema/api.v1.proto index 23d585b05e9cd..a16fdbaebc81b 100644 --- a/client/network/src/schema/api.v1.proto +++ b/client/network/src/schema/api.v1.proto @@ -68,3 +68,28 @@ message BlockData { bytes justifications = 8; // optional } +// Request storage data from a peer. +message StateRequest { + // Block header hash. + bytes block = 1; + // Start from this key. Equivalent to if omitted. + bytes start = 2; // optional + // if 'true' indicates that response should contain raw key-values, rather than proof. + bool no_proof = 3; +} + +message StateResponse { + // A collection of keys-values. Only populated if `no_proof` is `true` + repeated StateEntry entries = 1; + // If `no_proof` is false in request, this contains proof nodes. + bytes proof = 2; + // Set to true when there are no more keys to return. + bool complete = 3; +} + +// A key-value pair +message StateEntry { + bytes key = 1; + bytes value = 2; +} + diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 666108363f640..0bc28288501a4 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -48,7 +48,7 @@ use crate::{ Protocol, Ready, event::Event, - sync::SyncState, + sync::{SyncState, Status as SyncStatus}, }, transactions, transport, ReputationChange, @@ -196,6 +196,7 @@ impl NetworkWorker { protocol::ProtocolConfig { roles: From::from(¶ms.role), max_parallel_downloads: params.network_config.max_parallel_downloads, + sync_mode: params.network_config.sync_mode.clone(), }, params.chain.clone(), params.protocol_id.clone(), @@ -331,7 +332,7 @@ impl NetworkWorker { }; let behaviour = { - let bitswap = if params.network_config.ipfs_server { Some(Bitswap::new(client)) } else { None }; + let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(client)); let result = Behaviour::new( protocol, user_agent, @@ -339,6 +340,7 @@ impl NetworkWorker { light_client_request_sender, discovery_config, params.block_request_protocol_config, + params.state_request_protocol_config, bitswap, params.light_client_request_protocol_config, params.network_config.request_response_protocols, @@ -442,14 +444,16 @@ impl NetworkWorker { /// High-level network status information. pub fn status(&self) -> NetworkStatus { + let status = self.sync_state(); NetworkStatus { - sync_state: self.sync_state(), + sync_state: status.state, best_seen_block: self.best_seen_block(), num_sync_peers: self.num_sync_peers(), num_connected_peers: self.num_connected_peers(), num_active_peers: self.num_active_peers(), total_bytes_inbound: self.total_bytes_inbound(), total_bytes_outbound: self.total_bytes_outbound(), + state_sync: status.state_sync, } } @@ -474,7 +478,7 @@ impl NetworkWorker { } /// Current global sync state. - pub fn sync_state(&self) -> SyncState { + pub fn sync_state(&self) -> SyncStatus { self.network_service.behaviour().user_protocol().sync_state() } @@ -1869,7 +1873,7 @@ impl Future for NetworkWorker { *this.external_addresses.lock() = external_addresses; } - let is_major_syncing = match this.network_service.behaviour_mut().user_protocol_mut().sync_state() { + let is_major_syncing = match this.network_service.behaviour_mut().user_protocol_mut().sync_state().state { SyncState::Idle => false, SyncState::Downloading => true, }; diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 4e5bba8f7d33f..c2e3844849f5c 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -18,6 +18,7 @@ use crate::{config, Event, NetworkService, NetworkWorker}; use crate::block_request_handler::BlockRequestHandler; +use crate::state_request_handler::StateRequestHandler; use crate::light_client_requests::handler::LightClientRequestHandler; use libp2p::PeerId; @@ -107,6 +108,16 @@ fn build_test_full_node(config: config::NetworkConfiguration) protocol_config }; + let state_request_protocol_config = { + let (handler, protocol_config) = StateRequestHandler::new( + &protocol_id, + client.clone(), + 50, + ); + async_std::task::spawn(handler.run().boxed()); + protocol_config + }; + let light_client_request_protocol_config = { let (handler, protocol_config) = LightClientRequestHandler::new( &protocol_id, @@ -131,6 +142,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) ), metrics_registry: None, block_request_protocol_config, + state_request_protocol_config, light_client_request_protocol_config, }) .unwrap(); diff --git a/client/network/src/state_request_handler.rs b/client/network/src/state_request_handler.rs new file mode 100644 index 0000000000000..bf47b412f46d5 --- /dev/null +++ b/client/network/src/state_request_handler.rs @@ -0,0 +1,246 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Helper for handling (i.e. answering) state requests from a remote peer via the +//! [`crate::request_responses::RequestResponsesBehaviour`]. + +use codec::{Encode, Decode}; +use crate::chain::Client; +use crate::config::ProtocolId; +use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; +use crate::schema::v1::{StateResponse, StateRequest, StateEntry}; +use crate::{PeerId, ReputationChange}; +use futures::channel::{mpsc, oneshot}; +use futures::stream::StreamExt; +use log::debug; +use lru::LruCache; +use prost::Message; +use sp_runtime::generic::BlockId; +use sp_runtime::traits::Block as BlockT; +use std::sync::Arc; +use std::time::Duration; +use std::hash::{Hasher, Hash}; + +const LOG_TARGET: &str = "sync"; +const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; // Actual reponse may be bigger. +const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; + +mod rep { + use super::ReputationChange as Rep; + + /// Reputation change when a peer sent us the same request multiple times. + pub const SAME_REQUEST: Rep = Rep::new(i32::min_value(), "Same state request multiple times"); +} + +/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests. +pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig { + ProtocolConfig { + name: generate_protocol_name(protocol_id).into(), + max_request_size: 1024 * 1024, + max_response_size: 16 * 1024 * 1024, + request_timeout: Duration::from_secs(40), + inbound_queue: None, + } +} + +/// Generate the state protocol name from chain specific protocol identifier. +fn generate_protocol_name(protocol_id: &ProtocolId) -> String { + let mut s = String::new(); + s.push_str("/"); + s.push_str(protocol_id.as_ref()); + s.push_str("/state/1"); + s +} + +/// The key of [`BlockRequestHandler::seen_requests`]. +#[derive(Eq, PartialEq, Clone)] +struct SeenRequestsKey { + peer: PeerId, + block: B::Hash, + start: Vec, +} + +impl Hash for SeenRequestsKey { + fn hash(&self, state: &mut H) { + self.peer.hash(state); + self.block.hash(state); + self.start.hash(state); + } +} + +/// The value of [`StateRequestHandler::seen_requests`]. +enum SeenRequestsValue { + /// First time we have seen the request. + First, + /// We have fulfilled the request `n` times. + Fulfilled(usize), +} + +/// Handler for incoming block requests from a remote peer. +pub struct StateRequestHandler { + client: Arc>, + request_receiver: mpsc::Receiver, + /// Maps from request to number of times we have seen this request. + /// + /// This is used to check if a peer is spamming us with the same request. + seen_requests: LruCache, SeenRequestsValue>, +} + +impl StateRequestHandler { + /// Create a new [`StateRequestHandler`]. + pub fn new( + protocol_id: &ProtocolId, + client: Arc>, + num_peer_hint: usize, + ) -> (Self, ProtocolConfig) { + // Reserve enough request slots for one request per peer when we are at the maximum + // number of peers. + let (tx, request_receiver) = mpsc::channel(num_peer_hint); + + let mut protocol_config = generate_protocol_config(protocol_id); + protocol_config.inbound_queue = Some(tx); + + let seen_requests = LruCache::new(num_peer_hint * 2); + + (Self { client, request_receiver, seen_requests }, protocol_config) + } + + /// Run [`StateRequestHandler`]. + pub async fn run(mut self) { + while let Some(request) = self.request_receiver.next().await { + let IncomingRequest { peer, payload, pending_response } = request; + + match self.handle_request(payload, pending_response, &peer) { + Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer), + Err(e) => debug!( + target: LOG_TARGET, + "Failed to handle state request from {}: {}", + peer, + e, + ), + } + } + } + + fn handle_request( + &mut self, + payload: Vec, + pending_response: oneshot::Sender, + peer: &PeerId, + ) -> Result<(), HandleRequestError> { + let request = StateRequest::decode(&payload[..])?; + let block: B::Hash = Decode::decode(&mut request.block.as_ref())?; + + let key = SeenRequestsKey { + peer: *peer, + block: block.clone(), + start: request.start.clone(), + }; + + let mut reputation_changes = Vec::new(); + + match self.seen_requests.get_mut(&key) { + Some(SeenRequestsValue::First) => {}, + Some(SeenRequestsValue::Fulfilled(ref mut requests)) => { + *requests = requests.saturating_add(1); + + if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER { + reputation_changes.push(rep::SAME_REQUEST); + } + }, + None => { + self.seen_requests.put(key.clone(), SeenRequestsValue::First); + } + } + + log::trace!( + target: LOG_TARGET, + "Handling state request from {}: Block {:?}, Starting at {:?}, no_proof={}", + peer, + request.block, + sp_core::hexdisplay::HexDisplay::from(&request.start), + request.no_proof, + ); + + let result = if reputation_changes.is_empty() { + let mut response = StateResponse::default(); + + if !request.no_proof { + let (proof, count) = self.client.read_proof_collection( + &BlockId::hash(block), + &request.start, + MAX_RESPONSE_BYTES, + )?; + response.proof = proof.encode(); + if count == 0 { + response.complete = true; + } + } else { + let entries = self.client.storage_collection( + &BlockId::hash(block), + &request.start, + MAX_RESPONSE_BYTES, + )?; + response.entries = entries.into_iter().map(|(key, value)| StateEntry { key, value }).collect(); + if response.entries.is_empty() { + response.complete = true; + } + } + + log::trace!( + target: LOG_TARGET, + "StateResponse contains {} keys, {}, proof nodes, complete={}, from {:?} to {:?}", + response.entries.len(), + response.proof.len(), + response.complete, + response.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), + response.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)), + ); + if let Some(value) = self.seen_requests.get_mut(&key) { + // If this is the first time we have processed this request, we need to change + // it to `Fulfilled`. + if let SeenRequestsValue::First = value { + *value = SeenRequestsValue::Fulfilled(1); + } + } + + let mut data = Vec::with_capacity(response.encoded_len()); + response.encode(&mut data)?; + Ok(data) + } else { + Err(()) + }; + + pending_response.send(OutgoingResponse { + result, + reputation_changes, + sent_feedback: None, + }).map_err(|_| HandleRequestError::SendResponse) + } +} + +#[derive(derive_more::Display, derive_more::From)] +enum HandleRequestError { + #[display(fmt = "Failed to decode request: {}.", _0)] + DecodeProto(prost::DecodeError), + #[display(fmt = "Failed to encode response: {}.", _0)] + EncodeProto(prost::EncodeError), + #[display(fmt = "Failed to decode block hash: {}.", _0)] + InvalidHash(codec::Error), + Client(sp_blockchain::Error), + #[display(fmt = "Failed to send response.")] + SendResponse, +} diff --git a/client/network/test/src/block_import.rs b/client/network/test/src/block_import.rs index b3641d4b41214..05169aba8d730 100644 --- a/client/network/test/src/block_import.rs +++ b/client/network/test/src/block_import.rs @@ -46,6 +46,8 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock) origin: Some(peer_id.clone()), allow_missing_state: false, import_existing: false, + state: None, + skip_execution: false, }) } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index f55444f8cf121..b6e8f897bb809 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -29,6 +29,7 @@ use std::{ use libp2p::build_multiaddr; use log::trace; use sc_network::block_request_handler::{self, BlockRequestHandler}; +use sc_network::state_request_handler::{self, StateRequestHandler}; use sc_network::light_client_requests::{self, handler::LightClientRequestHandler}; use sp_blockchain::{ HeaderBackend, Result as ClientResult, @@ -55,7 +56,7 @@ use sc_network::{ NetworkWorker, NetworkService, config::{ProtocolId, MultiaddrWithPeerId, NonReservedPeerMode}, Multiaddr, }; -use sc_network::config::{NetworkConfiguration, NonDefaultSetConfig, TransportConfig}; +use sc_network::config::{NetworkConfiguration, NonDefaultSetConfig, TransportConfig, SyncMode}; use libp2p::PeerId; use parking_lot::Mutex; use sp_core::H256; @@ -179,6 +180,19 @@ impl PeersClient { } } + pub fn has_state_at(&self, block: &BlockId) -> bool { + let header = match self.header(block).unwrap() { + Some(header) => header, + None => return false, + }; + match self { + PeersClient::Full(_client, backend) => + backend.have_state_at(&header.hash(), *header.number()), + PeersClient::Light(_client, backend) => + backend.have_state_at(&header.hash(), *header.number()), + } + } + pub fn justifications(&self, block: &BlockId) -> ClientResult> { match *self { PeersClient::Full(ref client, ref _backend) => client.justifications(block), @@ -235,9 +249,9 @@ impl BlockImport for PeersClient { ) -> Result { match self { PeersClient::Full(client, _) => - client.import_block(block.convert_transaction(), cache).await, + client.import_block(block.clear_storage_changes_and_mutate(), cache).await, PeersClient::Light(client, _) => - client.import_block(block.convert_transaction(), cache).await, + client.import_block(block.clear_storage_changes_and_mutate(), cache).await, } } } @@ -584,7 +598,7 @@ impl BlockImport for BlockImportAdapter where block: BlockImportParams, cache: HashMap>, ) -> Result { - self.inner.import_block(block.convert_transaction(), cache).await + self.inner.import_block(block.clear_storage_changes_and_mutate(), cache).await } } @@ -644,6 +658,8 @@ pub struct FullPeerConfig { pub connect_to_peers: Option>, /// Whether the full peer should have the authority role. pub is_authority: bool, + /// Syncing mode + pub sync_mode: SyncMode, } pub trait TestNetFactory: Sized where >::Transaction: Send { @@ -699,10 +715,13 @@ pub trait TestNetFactory: Sized where >: /// Add a full peer. fn add_full_peer_with_config(&mut self, config: FullPeerConfig) { - let test_client_builder = match config.keep_blocks { + let mut test_client_builder = match config.keep_blocks { Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks), None => TestClientBuilder::with_default_backend(), }; + if matches!(config.sync_mode, SyncMode::Fast{..}) { + test_client_builder = test_client_builder.set_no_genesis(); + } let backend = test_client_builder.backend(); let (c, longest_chain) = test_client_builder.build_with_longest_chain(); let client = Arc::new(c); @@ -736,6 +755,7 @@ pub trait TestNetFactory: Sized where >: Default::default(), None, ); + network_config.sync_mode = config.sync_mode; network_config.transport = TransportConfig::MemoryOnly; network_config.listen_addresses = vec![listen_addr.clone()]; network_config.allow_non_globals_in_dht = true; @@ -769,6 +789,16 @@ pub trait TestNetFactory: Sized where >: protocol_config }; + let state_request_protocol_config = { + let (handler, protocol_config) = StateRequestHandler::new( + &protocol_id, + client.clone(), + 50, + ); + self.spawn_task(handler.run().boxed()); + protocol_config + }; + let light_client_request_protocol_config = { let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, client.clone()); self.spawn_task(handler.run().boxed()); @@ -789,6 +819,7 @@ pub trait TestNetFactory: Sized where >: .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)), metrics_registry: None, block_request_protocol_config, + state_request_protocol_config, light_client_request_protocol_config, }).unwrap(); @@ -862,6 +893,9 @@ pub trait TestNetFactory: Sized where >: let block_request_protocol_config = block_request_handler::generate_protocol_config( &protocol_id, ); + let state_request_protocol_config = state_request_handler::generate_protocol_config( + &protocol_id, + ); let light_client_request_protocol_config = light_client_requests::generate_protocol_config(&protocol_id); @@ -879,6 +913,7 @@ pub trait TestNetFactory: Sized where >: block_announce_validator: Box::new(DefaultBlockAnnounceValidator), metrics_registry: None, block_request_protocol_config, + state_request_protocol_config, light_client_request_protocol_config, }).unwrap(); diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 553a769ec14a4..56cec7e4cdfd9 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1087,3 +1087,43 @@ fn syncs_after_missing_announcement() { net.block_until_sync(); assert!(net.peer(1).client().header(&BlockId::Hash(final_block)).unwrap().is_some()); } + +#[test] +fn syncs_state() { + sp_tracing::try_init_simple(); + for skip_proofs in &[ false, true ] { + let mut net = TestNet::new(0); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(FullPeerConfig { + sync_mode: SyncMode::Fast { skip_proofs: *skip_proofs }, + ..Default::default() + }); + net.peer(0).push_blocks(64, false); + // Wait for peer 1 to sync header chain. + net.block_until_sync(); + assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64))); + + let just = (*b"FRNK", Vec::new()); + net.peer(1).client().finalize_block(BlockId::Number(60), Some(just), true).unwrap(); + // Wait for state sync. + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(1).client.info().finalized_state.is_some() { + Poll::Ready(()) + } else { + Poll::Pending + } + })); + assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64))); + // Wait for the rest of the states to be imported. + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(1).client().has_state_at(&BlockId::Number(64)) { + Poll::Ready(()) + } else { + Poll::Pending + } + })); + } +} + diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 6a98cf82f3e55..a90efb02dc5f2 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -55,6 +55,7 @@ sp-state-machine = { version = "0.9.0", path = "../../primitives/state-machine" sp-application-crypto = { version = "3.0.0", path = "../../primitives/application-crypto" } sp-consensus = { version = "0.9.0", path = "../../primitives/consensus/common" } sp-inherents = { version = "3.0.0", path = "../../primitives/inherents" } +sp-storage = { version = "3.0.0", path = "../../primitives/storage" } sc-network = { version = "0.9.0", path = "../network" } sc-chain-spec = { version = "3.0.0", path = "../chain-spec" } sc-light = { version = "3.0.0", path = "../light" } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index ca22322798463..b0bffc3c4e12d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -43,6 +43,7 @@ use log::info; use sc_network::config::{Role, OnDemand}; use sc_network::NetworkService; use sc_network::block_request_handler::{self, BlockRequestHandler}; +use sc_network::state_request_handler::{self, StateRequestHandler}; use sc_network::light_client_requests::{self, handler::LightClientRequestHandler}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{ @@ -70,7 +71,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::BuildStorage; use sc_client_api::{ BlockBackend, BlockchainEvents, - backend::StorageProvider, + StorageProvider, proof_provider::ProofProvider, execution_extensions::ExecutionExtensions }; @@ -377,6 +378,7 @@ pub fn new_full_parts( offchain_worker_enabled : config.offchain_worker.enabled, offchain_indexing_api: config.offchain_worker.indexing_enabled, wasm_runtime_overrides: config.wasm_runtime_overrides.clone(), + no_genesis: matches!(config.network.sync_mode, sc_network::config::SyncMode::Fast {..}), wasm_runtime_substitutes, }, )?; @@ -912,6 +914,23 @@ pub fn build_network( } }; + let state_request_protocol_config = { + if matches!(config.role, Role::Light) { + // Allow outgoing requests but deny incoming requests. + state_request_handler::generate_protocol_config(&protocol_id) + } else { + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = StateRequestHandler::new( + &protocol_id, + client.clone(), + config.network.default_peers_set.in_peers as usize + + config.network.default_peers_set.out_peers as usize, + ); + spawn_handle.spawn("state_request_handler", handler.run()); + protocol_config + } + }; + let light_client_request_protocol_config = { if matches!(config.role, Role::Light) { // Allow outgoing requests but deny incoming requests. @@ -950,6 +969,7 @@ pub fn build_network( block_announce_validator, metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_request_protocol_config, + state_request_protocol_config, light_client_request_protocol_config, }; diff --git a/client/service/src/chain_ops/import_blocks.rs b/client/service/src/chain_ops/import_blocks.rs index defa4128702a8..90bcc94cb8996 100644 --- a/client/service/src/chain_ops/import_blocks.rs +++ b/client/service/src/chain_ops/import_blocks.rs @@ -172,6 +172,8 @@ fn import_block_to_queue( origin: None, allow_missing_state: false, import_existing: force, + state: None, + skip_execution: false, } ]); } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 06d9aec4e4fd3..4a998a12d2b7f 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -52,11 +52,12 @@ use sp_state_machine::{ DBValue, Backend as StateBackend, ChangesTrieAnchorBlockId, prove_read, prove_child_read, ChangesTrieRootsStorage, ChangesTrieStorage, ChangesTrieConfigurationRange, key_changes, key_changes_proof, + prove_range_read_with_size, read_range_proof_check, }; use sc_executor::RuntimeVersion; use sp_consensus::{ Error as ConsensusError, BlockStatus, BlockImportParams, BlockCheckParams, - ImportResult, BlockOrigin, ForkChoiceStrategy, + ImportResult, BlockOrigin, ForkChoiceStrategy, StateAction, }; use sp_blockchain::{ self as blockchain, @@ -86,7 +87,7 @@ use sc_client_api::{ execution_extensions::ExecutionExtensions, notifications::{StorageNotifications, StorageEventStream}, KeyIterator, CallExecutor, ExecutorProvider, ProofProvider, - cht, UsageProvider + cht, UsageProvider, }; use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded}; use sp_blockchain::Error; @@ -150,6 +151,11 @@ impl PrePostHeader { } } +enum PrepareStorageChangesResult, Block: BlockT> { + Discard(ImportResult), + Import(Option>>), +} + /// Create an instance of in-memory client. #[cfg(feature="test-helpers")] pub fn new_in_mem( @@ -191,6 +197,8 @@ pub struct ClientConfig { pub offchain_indexing_api: bool, /// Path where WASM files exist to override the on-chain WASM. pub wasm_runtime_overrides: Option, + /// Skip writing genesis state on first start. + pub no_genesis: bool, /// Map of WASM runtime substitute starting at the child of the given block until the runtime /// version doesn't match anymore. pub wasm_runtime_substitutes: HashMap>, @@ -202,6 +210,7 @@ impl Default for ClientConfig { offchain_worker_enabled: false, offchain_indexing_api: false, wasm_runtime_overrides: None, + no_genesis: false, wasm_runtime_substitutes: HashMap::new(), } } @@ -324,22 +333,29 @@ impl Client where telemetry: Option, config: ClientConfig, ) -> sp_blockchain::Result { - if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() { + let info = backend.blockchain().info(); + if info.finalized_state.is_none() { let genesis_storage = build_genesis_storage.build_storage() .map_err(sp_blockchain::Error::Storage)?; let mut op = backend.begin_operation()?; - backend.begin_state_operation(&mut op, BlockId::Hash(Default::default()))?; - let state_root = op.reset_storage(genesis_storage)?; + let state_root = op.set_genesis_state(genesis_storage, !config.no_genesis)?; let genesis_block = genesis::construct_genesis_block::(state_root.into()); info!("🔨 Initializing Genesis block/state (state: {}, header-hash: {})", genesis_block.header().state_root(), genesis_block.header().hash() ); + // Genesis may be written after some blocks have been imported and finalized. + // So we only finalize it when the database is empty. + let block_state = if info.best_hash == Default::default() { + NewBlockState::Final + } else { + NewBlockState::Normal + }; op.set_block_data( genesis_block.deconstruct().0, Some(vec![]), None, - NewBlockState::Final + block_state, )?; backend.commit_operation(op)?; } @@ -629,6 +645,7 @@ impl Client where operation: &mut ClientImportOperation, import_block: BlockImportParams>, new_cache: HashMap>, + storage_changes: Option>>, ) -> sp_blockchain::Result where Self: ProvideRuntimeApi, >::Api: CoreApi + @@ -640,7 +657,6 @@ impl Client where justifications, post_digests, body, - storage_changes, finalized, auxiliary, fork_choice, @@ -718,7 +734,7 @@ impl Client where import_headers: PrePostHeader, justifications: Option, body: Option>, - storage_changes: Option, Block>>, + storage_changes: Option>>, new_cache: HashMap>, finalized: bool, aux: Vec<(Vec, Option>)>, @@ -735,15 +751,16 @@ impl Client where (false, blockchain::BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain), (false, blockchain::BlockStatus::Unknown) => {}, (true, blockchain::BlockStatus::InChain) => {}, - (true, blockchain::BlockStatus::Unknown) => - return Err(Error::UnknownBlock(format!("{:?}", hash))), + (true, blockchain::BlockStatus::Unknown) => {}, } let info = self.backend.blockchain().info(); // the block is lower than our last finalized block so it must revert // finality, refusing import. - if *import_headers.post().number() <= info.finalized_number { + if status == blockchain::BlockStatus::Unknown + && *import_headers.post().number() <= info.finalized_number + { return Err(sp_blockchain::Error::NotInFinalizedChain); } @@ -757,7 +774,48 @@ impl Client where let storage_changes = match storage_changes { Some(storage_changes) => { - self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?; + let storage_changes = match storage_changes { + sp_consensus::StorageChanges::Changes(storage_changes) => { + self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?; + let ( + main_sc, + child_sc, + offchain_sc, + tx, _, + changes_trie_tx, + tx_index, + ) = storage_changes.into_inner(); + + if self.config.offchain_indexing_api { + operation.op.update_offchain_storage(offchain_sc)?; + } + + operation.op.update_db_storage(tx)?; + operation.op.update_storage(main_sc.clone(), child_sc.clone())?; + operation.op.update_transaction_index(tx_index)?; + + if let Some(changes_trie_transaction) = changes_trie_tx { + operation.op.update_changes_trie(changes_trie_transaction)?; + } + + Some((main_sc, child_sc)) + } + sp_consensus::StorageChanges::Import(changes) => { + let storage = sp_storage::Storage { + top: changes.state.into_iter().collect(), + children_default: Default::default(), + }; + + let state_root = operation.op.reset_storage(storage)?; + if state_root != *import_headers.post().state_root() { + // State root mismatch when importing state. This should not happen in safe fast sync mode, + // but may happen in unsafe mode. + warn!("Error imporing state: State root mismatch."); + return Err(Error::InvalidStateRoot); + } + None + } + }; // ensure parent block is finalized to maintain invariant that // finality is called sequentially. @@ -772,29 +830,8 @@ impl Client where } operation.op.update_cache(new_cache); + storage_changes - let ( - main_sc, - child_sc, - offchain_sc, - tx, _, - changes_trie_tx, - tx_index, - ) = storage_changes.into_inner(); - - if self.config.offchain_indexing_api { - operation.op.update_offchain_storage(offchain_sc)?; - } - - operation.op.update_db_storage(tx)?; - operation.op.update_storage(main_sc.clone(), child_sc.clone())?; - operation.op.update_transaction_index(tx_index)?; - - if let Some(changes_trie_transaction) = changes_trie_tx { - operation.op.update_changes_trie(changes_trie_transaction)?; - } - - Some((main_sc, child_sc)) }, None => None, }; @@ -867,7 +904,7 @@ impl Client where fn prepare_block_storage_changes( &self, import_block: &mut BlockImportParams>, - ) -> sp_blockchain::Result> + ) -> sp_blockchain::Result> where Self: ProvideRuntimeApi, >::Api: CoreApi + @@ -875,21 +912,28 @@ impl Client where { let parent_hash = import_block.header.parent_hash(); let at = BlockId::Hash(*parent_hash); - let enact_state = match self.block_status(&at)? { - BlockStatus::Unknown => return Ok(Some(ImportResult::UnknownParent)), - BlockStatus::InChainWithState | BlockStatus::Queued => true, - BlockStatus::InChainPruned if import_block.allow_missing_state => false, - BlockStatus::InChainPruned => return Ok(Some(ImportResult::MissingState)), - BlockStatus::KnownBad => return Ok(Some(ImportResult::KnownBad)), + let state_action = std::mem::replace(&mut import_block.state_action, StateAction::Skip); + let (enact_state, storage_changes) = match (self.block_status(&at)?, state_action) { + (BlockStatus::Unknown, _) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::UnknownParent)), + (BlockStatus::KnownBad, _) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::KnownBad)), + (_, StateAction::Skip) => (false, None), + (BlockStatus::InChainPruned, StateAction::ApplyChanges(sp_consensus::StorageChanges::Changes(_))) => + return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)), + (BlockStatus::InChainPruned, StateAction::Execute) => + return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)), + (BlockStatus::InChainPruned, StateAction::ExecuteIfPossible) => (false, None), + (_, StateAction::Execute) => (true, None), + (_, StateAction::ExecuteIfPossible) => (true, None), + (_, StateAction::ApplyChanges(changes)) => (true, Some(changes)), }; - match (enact_state, &mut import_block.storage_changes, &mut import_block.body) { + let storage_changes = match (enact_state, storage_changes, &import_block.body) { // We have storage changes and should enact the state, so we don't need to do anything // here - (true, Some(_), _) => {}, + (true, changes @ Some(_), _) => changes, // We should enact state, but don't have any storage changes, so we need to execute the // block. - (true, ref mut storage_changes @ None, Some(ref body)) => { + (true, None, Some(ref body)) => { let runtime_api = self.runtime_api(); let execution_context = if import_block.origin == BlockOrigin::NetworkInitialSync { ExecutionContext::Syncing @@ -919,19 +963,16 @@ impl Client where != &gen_storage_changes.transaction_storage_root { return Err(Error::InvalidStateRoot) - } else { - **storage_changes = Some(gen_storage_changes); } + Some(sp_consensus::StorageChanges::Changes(gen_storage_changes)) }, // No block body, no storage changes - (true, None, None) => {}, + (true, None, None) => None, // We should not enact the state, so we set the storage changes to `None`. - (false, changes, _) => { - changes.take(); - } + (false, _, _) => None, }; - Ok(None) + Ok(PrepareStorageChangesResult::Import(storage_changes)) } fn apply_finality_with_block_hash( @@ -1307,6 +1348,68 @@ impl ProofProvider for Client where cht::size(), ) } + + fn read_proof_collection( + &self, + id: &BlockId, + start_key: &[u8], + size_limit: usize, + ) -> sp_blockchain::Result<(StorageProof, u32)> { + let state = self.state_at(id)?; + Ok(prove_range_read_with_size::<_, HashFor>( + state, + None, + None, + size_limit, + Some(start_key) + )?) + } + + fn storage_collection( + &self, + id: &BlockId, + start_key: &[u8], + size_limit: usize, + ) -> sp_blockchain::Result, Vec)>> { + let state = self.state_at(id)?; + let mut current_key = start_key.to_vec(); + let mut total_size = 0; + let mut entries = Vec::new(); + while let Some(next_key) = state + .next_storage_key(¤t_key) + .map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))? + { + let value = state + .storage(next_key.as_ref()) + .map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))? + .unwrap_or_default(); + let size = value.len() + next_key.len(); + if total_size + size > size_limit && !entries.is_empty() { + break; + } + total_size += size; + entries.push((next_key.clone(), value)); + current_key = next_key; + } + Ok(entries) + + } + + fn verify_range_proof( + &self, + root: Block::Hash, + proof: StorageProof, + start_key: &[u8], + ) -> sp_blockchain::Result<(Vec<(Vec, Vec)>, bool)> { + Ok(read_range_proof_check::>( + root, + proof, + None, + None, + None, + Some(start_key), + )?) + } } @@ -1751,15 +1854,16 @@ impl sp_consensus::BlockImport for &Client return Ok(res), + PrepareStorageChangesResult::Import(storage_changes) => storage_changes, + }; self.lock_import_and_run(|operation| { - self.apply_block(operation, import_block, new_cache) + self.apply_block(operation, import_block, new_cache, storage_changes) }).map_err(|e| { warn!("Block import error:\n{:?}", e); ConsensusError::ClientImport(e.to_string()).into() @@ -1801,9 +1905,14 @@ impl sp_consensus::BlockImport for &Client return Ok(ImportResult::AlreadyInChain), + BlockStatus::InChainWithState | BlockStatus::Queued if !import_existing => { + return Ok(ImportResult::AlreadyInChain) + }, BlockStatus::InChainWithState | BlockStatus::Queued => {}, - BlockStatus::InChainPruned => return Ok(ImportResult::AlreadyInChain), + BlockStatus::InChainPruned if !import_existing => { + return Ok(ImportResult::AlreadyInChain) + }, + BlockStatus::InChainPruned => {}, BlockStatus::Unknown => {}, BlockStatus::KnownBad => return Ok(ImportResult::KnownBad), } diff --git a/client/service/test/src/client/light.rs b/client/service/test/src/client/light.rs index a183cbce62bdb..8841d498ecfb0 100644 --- a/client/service/test/src/client/light.rs +++ b/client/service/test/src/client/light.rs @@ -272,7 +272,7 @@ fn local_state_is_created_when_genesis_state_is_available() { ); let mut op = backend.begin_operation().unwrap(); op.set_block_data(header0, None, None, NewBlockState::Final).unwrap(); - op.reset_storage(Default::default()).unwrap(); + op.set_genesis_state(Default::default(), true).unwrap(); backend.commit_operation(op).unwrap(); match backend.state_at(BlockId::Number(0)).unwrap() { diff --git a/primitives/blockchain/src/backend.rs b/primitives/blockchain/src/backend.rs index 3441a4f6cf544..dbce364ce7987 100644 --- a/primitives/blockchain/src/backend.rs +++ b/primitives/blockchain/src/backend.rs @@ -269,12 +269,14 @@ pub struct Info { pub finalized_hash: Block::Hash, /// Last finalized block number. pub finalized_number: <::Header as HeaderT>::Number, + /// Last finalized state. + pub finalized_state: Option<(Block::Hash, <::Header as HeaderT>::Number)>, /// Number of concurrent leave forks. pub number_leaves: usize } /// Block status. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BlockStatus { /// Already in the blockchain. InChain, diff --git a/primitives/blockchain/src/error.rs b/primitives/blockchain/src/error.rs index 58d08d06f049e..0d6ac10a8800e 100644 --- a/primitives/blockchain/src/error.rs +++ b/primitives/blockchain/src/error.rs @@ -90,8 +90,8 @@ pub enum Error { #[error("Failed to get runtime version: {0}")] VersionInvalid(String), - #[error("Genesis config provided is invalid")] - GenesisInvalid, + #[error("Provided state is invalid")] + InvalidState, #[error("error decoding justification for header")] JustificationDecode, diff --git a/primitives/consensus/common/src/block_import.rs b/primitives/consensus/common/src/block_import.rs index 67978232009e8..447ea5761f767 100644 --- a/primitives/consensus/common/src/block_import.rs +++ b/primitives/consensus/common/src/block_import.rs @@ -135,6 +135,43 @@ pub struct BlockCheckParams { pub import_existing: bool, } +/// Precomputed storage. +pub enum StorageChanges { + /// Changes coming from block execution. + Changes(sp_state_machine::StorageChanges, NumberFor>), + /// Whole new state. + Import(ImportedState), +} + +/// Imported state data. A vector of key-value pairs that should form a trie. +#[derive(PartialEq, Eq, Clone)] +pub struct ImportedState { + /// Target block hash. + pub block: B::Hash, + /// State keys and values. + pub state: Vec<(Vec, Vec)>, +} + +impl std::fmt::Debug for ImportedState { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("ImportedState") + .field("block", &self.block) + .finish() + } +} + +/// Defines how a new state is computed for a given imported block. +pub enum StateAction { + /// Apply precomputed changes coming from block execution or state sync. + ApplyChanges(StorageChanges), + /// Execute block body (required) and compute state. + Execute, + /// Execute block body if parent state is available and compute state. + ExecuteIfPossible, + /// Don't execute or import state. + Skip, +} + /// Data required to import a Block. #[non_exhaustive] pub struct BlockImportParams { @@ -159,11 +196,8 @@ pub struct BlockImportParams { pub post_digests: Vec>, /// The body of the block. pub body: Option>, - /// The changes to the storage to create the state for the block. If this is `Some(_)`, - /// the block import will not need to re-execute the block for importing it. - pub storage_changes: Option< - sp_state_machine::StorageChanges, NumberFor> - >, + /// Specify how the new state is computed. + pub state_action: StateAction, /// Is this block finalized already? /// `true` implies instant finality. pub finalized: bool, @@ -182,8 +216,6 @@ pub struct BlockImportParams { /// to modify it. If `None` is passed all the way down to bottom block /// importer, the import fails with an `IncompletePipeline` error. pub fork_choice: Option, - /// Allow importing the block skipping state verification if parent state is missing. - pub allow_missing_state: bool, /// Re-validate existing block. pub import_existing: bool, /// Cached full header hash (with post-digests applied). @@ -201,12 +233,11 @@ impl BlockImportParams { justifications: None, post_digests: Vec::new(), body: None, - storage_changes: None, + state_action: StateAction::Execute, finalized: false, intermediates: HashMap::new(), auxiliary: Vec::new(), fork_choice: None, - allow_missing_state: false, import_existing: false, post_hash: None, } @@ -237,20 +268,28 @@ impl BlockImportParams { /// Auxiliary function for "converting" the transaction type. /// - /// Actually this just sets `storage_changes` to `None` and makes rustc think that `Self` now + /// Actually this just sets `StorageChanges::Changes` to `None` and makes rustc think that `Self` now /// uses a different transaction type. - pub fn convert_transaction(self) -> BlockImportParams { + pub fn clear_storage_changes_and_mutate(self) -> BlockImportParams { + // Preserve imported state. + let state_action = match self.state_action { + StateAction::ApplyChanges(StorageChanges::Import(state)) => + StateAction::ApplyChanges(StorageChanges::Import(state)), + StateAction::ApplyChanges(StorageChanges::Changes(_)) => StateAction::Skip, + StateAction::Execute => StateAction::Execute, + StateAction::ExecuteIfPossible => StateAction::ExecuteIfPossible, + StateAction::Skip => StateAction::Skip, + }; BlockImportParams { origin: self.origin, header: self.header, justifications: self.justifications, post_digests: self.post_digests, body: self.body, - storage_changes: None, + state_action, finalized: self.finalized, auxiliary: self.auxiliary, intermediates: self.intermediates, - allow_missing_state: self.allow_missing_state, fork_choice: self.fork_choice, import_existing: self.import_existing, post_hash: self.post_hash, diff --git a/primitives/consensus/common/src/import_queue.rs b/primitives/consensus/common/src/import_queue.rs index 4220c7b14162d..fba5b51e921ca 100644 --- a/primitives/consensus/common/src/import_queue.rs +++ b/primitives/consensus/common/src/import_queue.rs @@ -34,7 +34,7 @@ use crate::{ error::Error as ConsensusError, block_import::{ BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult, - BlockCheckParams, + BlockCheckParams, ImportedState, StateAction, }, metrics::Metrics, }; @@ -74,8 +74,12 @@ pub struct IncomingBlock { pub origin: Option, /// Allow importing the block skipping state verification if parent state is missing. pub allow_missing_state: bool, + /// Skip block exection and state verification. + pub skip_execution: bool, /// Re-validate existing block. pub import_existing: bool, + /// Do not compute new state, but rather set it to the given set. + pub state: Option>, } /// Type of keys in the blockchain cache that consensus module could use for its needs. @@ -264,9 +268,17 @@ pub(crate) async fn import_single_block_metered, Trans if let Some(keys) = maybe_keys { cache.extend(keys.into_iter()); } - import_block.allow_missing_state = block.allow_missing_state; + import_block.import_existing = block.import_existing; + let mut import_block = import_block.clear_storage_changes_and_mutate(); + if let Some(state) = block.state { + import_block.state_action = StateAction::ApplyChanges(crate::StorageChanges::Import(state)); + } else if block.skip_execution { + import_block.state_action = StateAction::Skip; + } else if block.allow_missing_state { + import_block.state_action = StateAction::ExecuteIfPossible; + } - let imported = import_handle.import_block(import_block.convert_transaction(), cache).await; + let imported = import_handle.import_block(import_block, cache).await; if let Some(metrics) = metrics.as_ref() { metrics.report_verification_and_import(started.elapsed()); } diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index 3af983952af75..5767b72dd8084 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -564,6 +564,8 @@ mod tests { origin: None, allow_missing_state: false, import_existing: false, + state: None, + skip_execution: false, }], ))) .unwrap(); diff --git a/primitives/consensus/common/src/lib.rs b/primitives/consensus/common/src/lib.rs index 37df7230fd62b..60e260a892829 100644 --- a/primitives/consensus/common/src/lib.rs +++ b/primitives/consensus/common/src/lib.rs @@ -50,7 +50,8 @@ mod metrics; pub use self::error::Error; pub use block_import::{ BlockCheckParams, BlockImport, BlockImportParams, BlockOrigin, ForkChoiceStrategy, - ImportResult, ImportedAux, JustificationImport, JustificationSyncLink, + ImportResult, ImportedAux, ImportedState, JustificationImport, JustificationSyncLink, + StateAction, StorageChanges, }; pub use select_chain::SelectChain; pub use sp_state_machine::Backend as StateBackend; diff --git a/primitives/runtime/src/generic/block.rs b/primitives/runtime/src/generic/block.rs index 1b30d43ccaca7..af4f9e4521e3b 100644 --- a/primitives/runtime/src/generic/block.rs +++ b/primitives/runtime/src/generic/block.rs @@ -54,6 +54,19 @@ impl BlockId { pub fn number(number: NumberFor) -> Self { BlockId::Number(number) } + + /// Check if this block ID refers to the pre-genesis state. + pub fn is_pre_genesis(&self) -> bool { + match self { + BlockId::Hash(hash) => hash == &Default::default(), + BlockId::Number(_) => false, + } + } + + /// Create a block ID for a pre-genesis state. + pub fn pre_genesis() -> Self { + BlockId::Hash(Default::default()) + } } impl Copy for BlockId {} diff --git a/primitives/state-machine/src/backend.rs b/primitives/state-machine/src/backend.rs index 18b89acbc6f13..9b99537130364 100644 --- a/primitives/state-machine/src/backend.rs +++ b/primitives/state-machine/src/backend.rs @@ -93,6 +93,22 @@ pub trait Backend: sp_std::fmt::Debug { key: &[u8] ) -> Result, Self::Error>; + /// Iterate over storage starting at key, for a given prefix and child trie. + /// Aborts as soon as `f` returns false. + /// Warning, this fails at first error when usual iteration skips errors. + /// If `allow_missing` is true, iteration stops when it reaches a missing trie node. + /// Otherwise an error is produced. + /// + /// Returns `true` if trie end is reached. + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: F, + allow_missing: bool, + ) -> Result; + /// Retrieve all entries keys of storage and call `f` for each of those keys. /// Aborts as soon as `f` returns false. fn apply_to_keys_while bool>( diff --git a/primitives/state-machine/src/lib.rs b/primitives/state-machine/src/lib.rs index c4ba39e160160..bc5b48f02db4e 100644 --- a/primitives/state-machine/src/lib.rs +++ b/primitives/state-machine/src/lib.rs @@ -726,6 +726,50 @@ mod execution { prove_read_on_trie_backend(trie_backend, keys) } + /// Generate range storage read proof. + pub fn prove_range_read_with_size( + mut backend: B, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + size_limit: usize, + start_at: Option<&[u8]>, + ) -> Result<(StorageProof, u32), Box> + where + B: Backend, + H: Hasher, + H::Out: Ord + Codec, + { + let trie_backend = backend.as_trie_backend() + .ok_or_else(|| Box::new(ExecutionError::UnableToGenerateProof) as Box)?; + prove_range_read_with_size_on_trie_backend(trie_backend, child_info, prefix, size_limit, start_at) + } + + /// Generate range storage read proof on an existing trie backend. + pub fn prove_range_read_with_size_on_trie_backend( + trie_backend: &TrieBackend, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + size_limit: usize, + start_at: Option<&[u8]>, + ) -> Result<(StorageProof, u32), Box> + where + S: trie_backend_essence::TrieBackendStorage, + H: Hasher, + H::Out: Ord + Codec, + { + let proving_backend = proving_backend::ProvingBackend::::new(trie_backend); + let mut count = 0; + proving_backend.apply_to_key_values_while(child_info, prefix, start_at, |_key, _value| { + if count == 0 || proving_backend.estimate_encoded_size() <= size_limit { + count += 1; + true + } else { + false + } + }, false).map_err(|e| Box::new(e) as Box)?; + Ok((proving_backend.extract_proof(), count)) + } + /// Generate child storage read proof. pub fn prove_child_read( mut backend: B, @@ -808,6 +852,29 @@ mod execution { Ok(result) } + /// Check child storage range proof, generated by `prove_range_read` call. + pub fn read_range_proof_check( + root: H::Out, + proof: StorageProof, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + count: Option, + start_at: Option<&[u8]>, + ) -> Result<(Vec<(Vec, Vec)>, bool), Box> + where + H: Hasher, + H::Out: Ord + Codec, + { + let proving_backend = create_proof_check_backend::(root, proof)?; + read_range_proof_check_on_proving_backend( + &proving_backend, + child_info, + prefix, + count, + start_at, + ) + } + /// Check child storage read proof, generated by `prove_child_read` call. pub fn read_child_proof_check( root: H::Out, @@ -859,6 +926,32 @@ mod execution { proving_backend.child_storage(child_info, key) .map_err(|e| Box::new(e) as Box) } + + /// Check storage range proof on pre-created proving backend. + /// + /// Returns a vector with the read `key => value` pairs and a `bool` that is set to `true` when + /// all `key => value` pairs could be read and no more are left. + pub fn read_range_proof_check_on_proving_backend( + proving_backend: &TrieBackend, H>, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + count: Option, + start_at: Option<&[u8]>, + ) -> Result<(Vec<(Vec, Vec)>, bool), Box> + where + H: Hasher, + H::Out: Ord + Codec, + { + let mut values = Vec::new(); + let result = proving_backend.apply_to_key_values_while(child_info, prefix, start_at, |key, value| { + values.push((key.to_vec(), value.to_vec())); + count.as_ref().map_or(true, |c| (values.len() as u32) < *c) + }, true); + match result { + Ok(completed) => Ok((values, completed)), + Err(e) => Err(Box::new(e) as Box), + } + } } #[cfg(test)] @@ -1457,7 +1550,7 @@ mod tests { remote_proof.clone(), &[&[0xff]], ).is_ok(); - // check that results are correct + // check that results are correct assert_eq!( local_result1.into_iter().collect::>(), vec![(b"value2".to_vec(), Some(vec![24]))], @@ -1494,6 +1587,57 @@ mod tests { ); } + #[test] + fn prove_read_with_size_limit_works() { + let remote_backend = trie_backend::tests::test_trie(); + let remote_root = remote_backend.storage_root(::std::iter::empty()).0; + let (proof, count) = prove_range_read_with_size(remote_backend, None, None, 0, None).unwrap(); + // Alwasys contains at least some nodes. + assert_eq!(proof.into_memory_db::().drain().len(), 3); + assert_eq!(count, 1); + + let remote_backend = trie_backend::tests::test_trie(); + let (proof, count) = prove_range_read_with_size(remote_backend, None, None, 800, Some(&[])).unwrap(); + assert_eq!(proof.clone().into_memory_db::().drain().len(), 9); + assert_eq!(count, 85); + let (results, completed) = read_range_proof_check::( + remote_root, + proof.clone(), + None, + None, + Some(count), + None, + ).unwrap(); + assert_eq!(results.len() as u32, count); + assert_eq!(completed, false); + // When checking without count limit, proof may actually contain extra values. + let (results, completed) = read_range_proof_check::( + remote_root, + proof, + None, + None, + None, + None, + ).unwrap(); + assert_eq!(results.len() as u32, 101); + assert_eq!(completed, false); + + let remote_backend = trie_backend::tests::test_trie(); + let (proof, count) = prove_range_read_with_size(remote_backend, None, None, 50000, Some(&[])).unwrap(); + assert_eq!(proof.clone().into_memory_db::().drain().len(), 11); + assert_eq!(count, 132); + let (results, completed) = read_range_proof_check::( + remote_root, + proof.clone(), + None, + None, + None, + None, + ).unwrap(); + assert_eq!(results.len() as u32, count); + assert_eq!(completed, true); + } + #[test] fn compact_multiple_child_trie() { // this root will be queried diff --git a/primitives/state-machine/src/overlayed_changes/mod.rs b/primitives/state-machine/src/overlayed_changes/mod.rs index c01d56ab919a0..a261e084eeda9 100644 --- a/primitives/state-machine/src/overlayed_changes/mod.rs +++ b/primitives/state-machine/src/overlayed_changes/mod.rs @@ -303,7 +303,7 @@ impl OverlayedChanges { /// Set a new value for the specified key. /// /// Can be rolled back or committed when called inside a transaction. - pub(crate) fn set_storage(&mut self, key: StorageKey, val: Option) { + pub fn set_storage(&mut self, key: StorageKey, val: Option) { let size_write = val.as_ref().map(|x| x.len() as u64).unwrap_or(0); self.stats.tally_write_overlay(size_write); self.top.set(key, val, self.extrinsic_index()); diff --git a/primitives/state-machine/src/proving_backend.rs b/primitives/state-machine/src/proving_backend.rs index d68a87f9f56a5..5275aa82521c5 100644 --- a/primitives/state-machine/src/proving_backend.rs +++ b/primitives/state-machine/src/proving_backend.rs @@ -212,6 +212,14 @@ impl<'a, S: 'a + TrieBackendStorage, H: 'a + Hasher> ProvingBackend<'a, S, H> pub fn extract_proof(&self) -> StorageProof { self.0.essence().backend_storage().proof_recorder.to_storage_proof() } + + /// Returns the estimated encoded size of the proof. + /// + /// The estimation is maybe bigger (by in maximum 4 bytes), but never smaller than the actual + /// encoded proof. + pub fn estimate_encoded_size(&self) -> usize { + self.0.essence().backend_storage().proof_recorder.estimate_encoded_size() + } } impl<'a, S: 'a + TrieBackendStorage, H: 'a + Hasher> TrieBackendStorage @@ -260,6 +268,17 @@ impl<'a, S, H> Backend for ProvingBackend<'a, S, H> self.0.child_storage(child_info, key) } + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: F, + allow_missing: bool, + ) -> Result { + self.0.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing) + } + fn apply_to_keys_while bool>( &self, child_info: Option<&ChildInfo>, diff --git a/primitives/state-machine/src/trie_backend.rs b/primitives/state-machine/src/trie_backend.rs index 98deca23a9570..6162a9866a46c 100644 --- a/primitives/state-machine/src/trie_backend.rs +++ b/primitives/state-machine/src/trie_backend.rs @@ -113,6 +113,17 @@ impl, H: Hasher> Backend for TrieBackend where self.essence.for_key_values_with_prefix(prefix, f) } + fn apply_to_key_values_while, Vec) -> bool>( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: F, + allow_missing: bool, + ) -> Result { + self.essence.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing) + } + fn apply_to_keys_while bool>( &self, child_info: Option<&ChildInfo>, diff --git a/primitives/state-machine/src/trie_backend_essence.rs b/primitives/state-machine/src/trie_backend_essence.rs index e0a24c08393c7..54124e6754a52 100644 --- a/primitives/state-machine/src/trie_backend_essence.rs +++ b/primitives/state-machine/src/trie_backend_essence.rs @@ -189,6 +189,43 @@ impl, H: Hasher> TrieBackendEssence where H::Out: .map_err(map_e) } + /// Retrieve all entries keys of storage and call `f` for each of those keys. + /// Aborts as soon as `f` returns false. + /// + /// Returns `true` when all keys were iterated. + pub fn apply_to_key_values_while( + &self, + child_info: Option<&ChildInfo>, + prefix: Option<&[u8]>, + start_at: Option<&[u8]>, + f: impl FnMut(Vec, Vec) -> bool, + allow_missing_nodes: bool, + ) -> Result { + let mut child_root; + let root = if let Some(child_info) = child_info.as_ref() { + if let Some(fetched_child_root) = self.child_root(child_info)? { + child_root = H::Out::default(); + // root is fetched from DB, not writable by runtime, so it's always valid. + child_root.as_mut().copy_from_slice(fetched_child_root.as_slice()); + + &child_root + } else { + return Ok(true); + } + } else { + &self.root + }; + + self.trie_iter_inner( + &root, + prefix, + f, + child_info, + start_at, + allow_missing_nodes, + ) + } + /// Retrieve all entries keys of a storage and call `f` for each of those keys. /// Aborts as soon as `f` returns false. pub fn apply_to_keys_while bool>( @@ -212,15 +249,15 @@ impl, H: Hasher> TrieBackendEssence where H::Out: &self.root }; - self.trie_iter_inner(root, prefix, |k, _v| f(k), child_info) + let _ = self.trie_iter_inner(root, prefix, |k, _v| { f(&k); true}, child_info, None, false); } /// Execute given closure for all keys starting with prefix. - pub fn for_child_keys_with_prefix( + pub fn for_child_keys_with_prefix( &self, child_info: &ChildInfo, prefix: &[u8], - mut f: F, + mut f: impl FnMut(&[u8]), ) { let root_vec = match self.child_root(child_info) { Ok(v) => v.unwrap_or_else(|| empty_child_trie_root::>().encode()), @@ -231,41 +268,43 @@ impl, H: Hasher> TrieBackendEssence where H::Out: }; let mut root = H::Out::default(); root.as_mut().copy_from_slice(&root_vec); - self.trie_iter_inner(&root, Some(prefix), |k, _v| { f(k); true }, Some(child_info)) + let _ = self.trie_iter_inner(&root, Some(prefix), |k, _v| { f(&k); true }, Some(child_info), None, false); } /// Execute given closure for all keys starting with prefix. pub fn for_keys_with_prefix(&self, prefix: &[u8], mut f: F) { - self.trie_iter_inner(&self.root, Some(prefix), |k, _v| { f(k); true }, None) + let _ = self.trie_iter_inner(&self.root, Some(prefix), |k, _v| { f(&k); true }, None, None, false); } - fn trie_iter_inner bool>( + fn trie_iter_inner, Vec) -> bool>( &self, root: &H::Out, prefix: Option<&[u8]>, mut f: F, child_info: Option<&ChildInfo>, - ) { - let mut iter = move |db| -> sp_std::result::Result<(), Box>> { + start_at: Option<&[u8]>, + allow_missing_nodes: bool, + ) -> Result { + let mut iter = move |db| -> sp_std::result::Result>> { let trie = TrieDB::::new(db, root)?; - let iter = if let Some(prefix) = prefix.as_ref() { - TrieDBIterator::new_prefixed(&trie, prefix)? + let prefix = prefix.unwrap_or(&[]); + let iterator = if let Some(start_at) = start_at { + TrieDBIterator::new_prefixed_then_seek(&trie, prefix, start_at)? } else { - TrieDBIterator::new(&trie)? + TrieDBIterator::new_prefixed(&trie, prefix)? }; - - for x in iter { + for x in iterator { let (key, value) = x?; - debug_assert!(prefix.as_ref().map(|prefix| key.starts_with(prefix)).unwrap_or(true)); + debug_assert!(key.starts_with(prefix)); - if !f(&key, &value) { - break; + if !f(key, value) { + return Ok(false) } } - Ok(()) + Ok(true) }; let result = if let Some(child_info) = child_info { @@ -274,14 +313,16 @@ impl, H: Hasher> TrieBackendEssence where H::Out: } else { iter(self) }; - if let Err(e) = result { - debug!(target: "trie", "Error while iterating by prefix: {}", e); + match result { + Ok(completed) => Ok(completed), + Err(e) if matches!(*e, TrieError::IncompleteDatabase(_)) && allow_missing_nodes => Ok(false), + Err(e) => Err(format!("TrieDB iteration error: {}", e)), } } /// Execute given closure for all key and values starting with prefix. pub fn for_key_values_with_prefix(&self, prefix: &[u8], mut f: F) { - self.trie_iter_inner(&self.root, Some(prefix), |k, v| { f(k, v); true }, None) + let _ = self.trie_iter_inner(&self.root, Some(prefix), |k, v| {f(&k, &v); true}, None, None, false); } } diff --git a/test-utils/client/src/lib.rs b/test-utils/client/src/lib.rs index e343181505c98..eb810e0360588 100644 --- a/test-utils/client/src/lib.rs +++ b/test-utils/client/src/lib.rs @@ -80,6 +80,7 @@ pub struct TestClientBuilder { fork_blocks: ForkBlocks, bad_blocks: BadBlocks, enable_offchain_indexing_api: bool, + no_genesis: bool, } impl Default @@ -116,6 +117,7 @@ impl TestClientBuilder TestClientBuilder Self { + self.no_genesis = true; + self + } + /// Build the test client with the given native executor. pub fn build_with_executor( self, @@ -232,6 +240,7 @@ impl TestClientBuilder