From f7a9bca411f774c89d5e5971921c2198d3faa93a Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Wed, 4 Dec 2024 09:27:55 -0500 Subject: [PATCH] state sync: improvements from mocknet testing (#12507) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains several fixes which improve the speed and robustness of state sync: - **All part requests to peers are now made before all cloud attempts**. Previously we focused on obtaining specific parts one by one, which could cause a thread to block for a long time until a particular part was uploaded to cloud storage. It takes tens of minutes after the epoch ends for dumper nodes to write all state parts to cloud storage, whereas peer hosts are ready to serve all requests as soon as the epoch ends. - **Part request order is randomized at each syncing node**, preventing spikes in demand to specific hosts. - **Removes an unnecessary check for state headers when serving state parts**. In some cases this was preventing peer hosts which do not track all shards from responding successfully to part requests. Before these changes, it took up to 75 minutes for nodes to download parts for the largest shard (38.8 GiB in 1324 parts). After these changes: * Nodes consistently finish downloading parts in under 15 min, * State requests to peer hosts have a failure rate below 1%, * and 100% of parts are successfully obtained from peer hosts within three requests. Screenshot 2024-11-24 at 7 17 39 AM ----- Additional minor improvements: - Adds a separate config parameter to specify how long to wait after a failed cloud download. This allows nodes to avoid spamming requests to cloud storage before parts have been uploaded. - Adds metrics recording the number of cache hits and misses when serving state part requests. - Distinguishes different types of errors collected in the `near_state_sync_download_result` metric. Closes issues #12497, #12498, #12499 Once merged this PR should be cherry-picked into the 2.4 release. cc @staffik --- chain/chain/src/chain.rs | 5 +- chain/chain/src/metrics.rs | 14 +++++ chain/client/src/client.rs | 6 ++- chain/client/src/metrics.rs | 2 +- chain/client/src/sync/state/downloader.rs | 63 +++++++++++----------- chain/client/src/sync/state/external.rs | 29 +++++++--- chain/client/src/sync/state/mod.rs | 6 ++- chain/client/src/sync/state/network.rs | 8 +-- chain/client/src/sync/state/shard.rs | 56 ++++++++++++++----- core/chain-configs/src/client_config.rs | 15 ++++-- core/chain-configs/src/lib.rs | 19 +++---- integration-tests/src/test_loop/builder.rs | 3 +- nearcore/src/config.rs | 41 +++++++------- pytest/lib/cluster.py | 1 + pytest/lib/state_sync_lib.py | 8 +++ pytest/tests/sanity/epoch_switches.py | 6 ++- pytest/tests/sanity/rpc_tx_forwarding.py | 24 +++++++-- pytest/tests/sanity/staking2.py | 18 +++++-- pytest/tests/sanity/transactions.py | 30 +++++++++-- 19 files changed, 244 insertions(+), 110 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index b59fcc90310..f268d3d24bb 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2765,8 +2765,10 @@ impl Chain { // Check cache let key = borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id))?; if let Ok(Some(state_part)) = self.chain_store.store().get(DBCol::StateParts, &key) { + metrics::STATE_PART_CACHE_HIT.inc(); return Ok(state_part.into()); } + metrics::STATE_PART_CACHE_MISS.inc(); let block = self .get_block(&sync_hash) @@ -2812,9 +2814,6 @@ impl Chain { self.requested_state_parts .save_state_part_elapsed(&sync_hash, &shard_id, &part_id, elapsed_ms); - // Before saving State Part data, we need to make sure we can calculate and save State Header - self.get_state_response_header(shard_id, sync_hash)?; - // Saving the part data let mut store_update = self.chain_store.store().store_update(); store_update.set(DBCol::StateParts, &key, &state_part); diff --git a/chain/chain/src/metrics.rs b/chain/chain/src/metrics.rs index f46cfb44a02..ba4ccdbbafc 100644 --- a/chain/chain/src/metrics.rs +++ b/chain/chain/src/metrics.rs @@ -103,6 +103,20 @@ pub static STATE_PART_ELAPSED: LazyLock = LazyLock::new(|| { ) .unwrap() }); +pub static STATE_PART_CACHE_HIT: LazyLock = LazyLock::new(|| { + try_create_int_counter( + "near_state_part_cache_hit", + "Total number of state parts served from db cache", + ) + .unwrap() +}); +pub static STATE_PART_CACHE_MISS: LazyLock = LazyLock::new(|| { + try_create_int_counter( + "near_state_part_cache_miss", + "Total number of state parts built on-demand", + ) + .unwrap() +}); pub static NUM_INVALID_BLOCKS: LazyLock = LazyLock::new(|| { try_create_int_gauge_vec("near_num_invalid_blocks", "Number of invalid blocks", &["error"]) .unwrap() diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index cca83f9521c..fbe825541df 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -324,7 +324,8 @@ impl Client { network_adapter.clone().into_sender(), config.state_sync_external_timeout, config.state_sync_p2p_timeout, - config.state_sync_retry_timeout, + config.state_sync_retry_backoff, + config.state_sync_external_backoff, &config.chain_id, &config.state_sync.sync, chain_sender_for_state_sync.clone(), @@ -2566,7 +2567,8 @@ impl Client { self.network_adapter.clone().into_sender(), self.config.state_sync_external_timeout, self.config.state_sync_p2p_timeout, - self.config.state_sync_retry_timeout, + self.config.state_sync_retry_backoff, + self.config.state_sync_external_backoff, &self.config.chain_id, &self.config.state_sync.sync, self.chain_sender_for_state_sync.clone(), diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index b38d0efc434..d40d3422ad2 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -473,7 +473,7 @@ pub(crate) static STATE_SYNC_STAGE: LazyLock = LazyLock::new(|| { pub(crate) static STATE_SYNC_DOWNLOAD_RESULT: LazyLock = LazyLock::new(|| { try_create_int_counter_vec( - "near_state_sync_header_download_result", + "near_state_sync_download_result", "Count of number of state sync downloads by type (header, part), source (network, external), and result (timeout, error, success)", &["shard_id", "type", "source", "result"], diff --git a/chain/client/src/sync/state/downloader.rs b/chain/client/src/sync/state/downloader.rs index 59fb714c4cf..17ba4638ff9 100644 --- a/chain/client/src/sync/state/downloader.rs +++ b/chain/client/src/sync/state/downloader.rs @@ -33,7 +33,7 @@ pub(super) struct StateSyncDownloader { pub header_validation_sender: AsyncSender>, pub runtime: Arc, - pub retry_timeout: Duration, + pub retry_backoff: Duration, pub task_tracker: TaskTracker, } @@ -56,7 +56,7 @@ impl StateSyncDownloader { let num_attempts_before_fallback = self.num_attempts_before_fallback; let task_tracker = self.task_tracker.clone(); let clock = self.clock.clone(); - let retry_timeout = self.retry_timeout; + let retry_backoff = self.retry_backoff; async move { let handle = task_tracker.get_handle(&format!("shard {} header", shard_id)).await; handle.set_status("Reading existing header"); @@ -104,9 +104,9 @@ impl StateSyncDownloader { Err(err) => { handle.set_status(&format!( "Error: {}, will retry in {}", - err, retry_timeout + err, retry_backoff )); - let deadline = clock.now() + retry_timeout; + let deadline = clock.now() + retry_backoff; tokio::select! { _ = cancel.cancelled() => { return Err(near_chain::Error::Other("Cancelled".to_owned())); @@ -122,17 +122,19 @@ impl StateSyncDownloader { .boxed() } - /// Ensures that the shard part is downloaded and validated. If the part exists on disk, - /// just returns. Otherwise, downloads the part, validates it, and retries if needed. + /// Attempts once to ensure that the shard part is downloaded and validated. + /// If the part exists on disk, just returns. Otherwise, makes one attempt + /// to download the part and validate it. /// - /// This method will only return an error if the download cannot be completed even - /// with retries, or if the download is cancelled. - pub fn ensure_shard_part_downloaded( + /// This method will return an error if the download fails or is cancelled. + pub fn ensure_shard_part_downloaded_single_attempt( &self, shard_id: ShardId, sync_hash: CryptoHash, + state_root: CryptoHash, + num_state_parts: u64, part_id: u64, - header: ShardStateSyncResponseHeader, + num_prior_attempts: usize, cancel: CancellationToken, ) -> BoxFuture<'static, Result<(), near_chain::Error>> { let store = self.store.clone(); @@ -142,8 +144,11 @@ impl StateSyncDownloader { let num_attempts_before_fallback = self.num_attempts_before_fallback; let clock = self.clock.clone(); let task_tracker = self.task_tracker.clone(); - let retry_timeout = self.retry_timeout; + let retry_backoff = self.retry_backoff; async move { + if cancel.is_cancelled() { + return Err(near_chain::Error::Other("Cancelled".to_owned())); + } let handle = task_tracker.get_handle(&format!("shard {} part {}", shard_id, part_id)).await; handle.set_status("Reading existing part"); @@ -151,15 +156,15 @@ impl StateSyncDownloader { return Ok(()); } - let i = AtomicUsize::new(0); // for easier Rust async capture let attempt = || async { let source = if fallback_source.is_some() - && i.load(Ordering::Relaxed) >= num_attempts_before_fallback + && num_prior_attempts >= num_attempts_before_fallback { fallback_source.as_ref().unwrap().as_ref() } else { preferred_source.as_ref() }; + let part = source .download_shard_part( shard_id, @@ -169,10 +174,9 @@ impl StateSyncDownloader { cancel.clone(), ) .await?; - let state_root = header.chunk_prev_state_root(); if runtime_adapter.validate_state_part( &state_root, - PartId { idx: part_id, total: header.num_state_parts() }, + PartId { idx: part_id, total: num_state_parts }, &part, ) { let mut store_update = store.store_update(); @@ -187,27 +191,20 @@ impl StateSyncDownloader { Ok(()) }; - loop { - match attempt().await { - Ok(()) => return Ok(()), - Err(err) => { - handle.set_status(&format!( - "Error: {}, will retry in {}", - err, retry_timeout - )); - let deadline = clock.now() + retry_timeout; - tokio::select! { - _ = cancel.cancelled() => { - return Err(near_chain::Error::Other("Cancelled".to_owned())); - } - _ = clock.sleep_until(deadline) => {} - } - } + let res = attempt().await; + if let Err(ref err) = res { + handle.set_status(&format!("Error: {}, will retry in {}", err, retry_backoff)); + let deadline = clock.now() + retry_backoff; + tokio::select! { + _ = cancel.cancelled() => {} + _ = clock.sleep_until(deadline) => {} } - i.fetch_add(1, Ordering::Relaxed); } + res } - .instrument(tracing::debug_span!("StateSyncDownloader::ensure_shard_part_downloaded")) + .instrument(tracing::debug_span!( + "StateSyncDownloader::ensure_shard_part_downloaded_single_attempt" + )) .boxed() } } diff --git a/chain/client/src/sync/state/external.rs b/chain/client/src/sync/state/external.rs index 0b0fa128696..454226293b4 100644 --- a/chain/client/src/sync/state/external.rs +++ b/chain/client/src/sync/state/external.rs @@ -22,12 +22,14 @@ pub(super) struct StateSyncDownloadSourceExternal { pub chain_id: String, pub conn: ExternalConnection, pub timeout: Duration, + pub backoff: Duration, } impl StateSyncDownloadSourceExternal { async fn get_file_with_timeout( clock: Clock, timeout: Duration, + backoff: Duration, cancellation: CancellationToken, conn: ExternalConnection, shard_id: ShardId, @@ -46,14 +48,25 @@ impl StateSyncDownloadSourceExternal { Err(near_chain::Error::Other("Timeout".to_owned())) } _ = cancellation.cancelled() => { - increment_download_count(shard_id, typ, "external", "error"); + increment_download_count(shard_id, typ, "external", "cancelled"); Err(near_chain::Error::Other("Cancelled".to_owned())) } result = fut => { - result.map_err(|e| { - increment_download_count(shard_id, typ, "network", "error"); - near_chain::Error::Other(format!("Failed to download: {}", e)) - }) + match result { + Err(err) => { + // A download error typically indicates that the file is not available yet. At the + // start of the epoch it takes a while for dumpers to populate the external storage + // with state files. This backoff period prevents spamming requests during that time. + let deadline = clock.now() + backoff; + tokio::select! { + _ = clock.sleep_until(deadline) => {} + _ = cancellation.cancelled() => {} + } + increment_download_count(shard_id, typ, "external", "download_error"); + Err(near_chain::Error::Other(format!("Failed to download: {}", err))) + } + Ok(res) => Ok(res) + } } } } @@ -69,6 +82,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { ) -> BoxFuture> { let clock = self.clock.clone(); let timeout = self.timeout; + let backoff = self.backoff; let chain_id = self.chain_id.clone(); let conn = self.conn.clone(); let store = self.store.clone(); @@ -86,6 +100,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { let data = Self::get_file_with_timeout( clock, timeout, + backoff, cancel, conn, shard_id, @@ -94,7 +109,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { ) .await?; let header = ShardStateSyncResponseHeader::try_from_slice(&data).map_err(|e| { - increment_download_count(shard_id, "header", "external", "error"); + increment_download_count(shard_id, "header", "external", "parse_error"); near_chain::Error::Other(format!("Failed to parse header: {}", e)) })?; @@ -115,6 +130,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { ) -> BoxFuture, near_chain::Error>> { let clock = self.clock.clone(); let timeout = self.timeout; + let backoff = self.backoff; let chain_id = self.chain_id.clone(); let conn = self.conn.clone(); let store = self.store.clone(); @@ -137,6 +153,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { let data = Self::get_file_with_timeout( clock, timeout, + backoff, cancel, conn, shard_id, diff --git a/chain/client/src/sync/state/mod.rs b/chain/client/src/sync/state/mod.rs index fd0ecc4763a..3eb84a0e213 100644 --- a/chain/client/src/sync/state/mod.rs +++ b/chain/client/src/sync/state/mod.rs @@ -91,7 +91,8 @@ impl StateSync { network_adapter: AsyncSender, external_timeout: Duration, p2p_timeout: Duration, - retry_timeout: Duration, + retry_backoff: Duration, + external_backoff: Duration, chain_id: &str, sync_config: &SyncConfig, chain_requests_sender: ChainSenderForStateSync, @@ -147,6 +148,7 @@ impl StateSync { chain_id: chain_id.to_string(), conn: external, timeout: external_timeout, + backoff: external_backoff, }) as Arc; ( Some(fallback_source), @@ -166,7 +168,7 @@ impl StateSync { num_attempts_before_fallback, header_validation_sender: chain_requests_sender.clone().into_sender(), runtime: runtime.clone(), - retry_timeout, + retry_backoff, task_tracker: downloading_task_tracker.clone(), }); diff --git a/chain/client/src/sync/state/network.rs b/chain/client/src/sync/state/network.rs index 36162aa9e14..a68db23423f 100644 --- a/chain/client/src/sync/state/network.rs +++ b/chain/client/src/sync/state/network.rs @@ -189,12 +189,12 @@ impl StateSyncDownloadSourcePeer { match request_sender.send_async(network_request).await { Ok(response) => { if let NetworkResponses::RouteNotFound = response.as_network_response() { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "route_not_found"); return Err(near_chain::Error::Other("Route not found".to_owned())); } } Err(e) => { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "failed_to_send"); return Err(near_chain::Error::Other(format!("Failed to send request: {}", e))); } } @@ -206,7 +206,7 @@ impl StateSyncDownloadSourcePeer { Err(near_chain::Error::Other("Timeout".to_owned())) } _ = cancel.cancelled() => { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "cancelled"); Err(near_chain::Error::Other("Cancelled".to_owned())) } result = receiver => { @@ -216,7 +216,7 @@ impl StateSyncDownloadSourcePeer { Ok(result) } Err(_) => { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "sender_dropped"); Err(near_chain::Error::Other("Sender dropped".to_owned())) }, } diff --git a/chain/client/src/sync/state/shard.rs b/chain/client/src/sync/state/shard.rs index a174c4cd3a4..0ef25a07bd6 100644 --- a/chain/client/src/sync/state/shard.rs +++ b/chain/client/src/sync/state/shard.rs @@ -18,6 +18,8 @@ use near_primitives::types::{EpochId, ShardId}; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus}; use near_store::{DBCol, ShardUId, Store}; +use rand::prelude::SliceRandom; +use rand::thread_rng; use std::sync::{Arc, Mutex}; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; @@ -82,20 +84,46 @@ pub(super) async fn run_state_sync_for_shard( return_if_cancelled!(cancel); *status.lock().unwrap() = ShardSyncStatus::StateDownloadParts; - tokio_stream::iter(0..num_parts) - .map(|part_id| { - let future = downloader.ensure_shard_part_downloaded( - shard_id, - sync_hash, - part_id, - header.clone(), - cancel.clone(), - ); - respawn_for_parallelism(&*future_spawner, "state sync download part", future) - }) - .buffer_unordered(MAX_PARALLELISM_PER_SHARD_FOR_FAIRNESS) - .try_collect::>() - .await?; + let mut parts_to_download: Vec = (0..num_parts).collect(); + { + // Peer selection is designed such that different nodes downloading the same part will tend + // to send the requests to the same host. It allows the host to benefit from caching the part. + // + // At the start of an epoch, a number of nodes begin state sync at the same time. If we + // don't randomize the order in which the parts are requested, the nodes will request the + // parts in roughly the same order, producing spikes of traffic to the same hosts. + let mut rng = thread_rng(); + parts_to_download.shuffle(&mut rng); + } + let mut attempt_count = 0; + while !parts_to_download.is_empty() { + return_if_cancelled!(cancel); + let results = tokio_stream::iter(parts_to_download.clone()) + .map(|part_id| { + let future = downloader.ensure_shard_part_downloaded_single_attempt( + shard_id, + sync_hash, + state_root, + num_parts, + part_id, + attempt_count, + cancel.clone(), + ); + respawn_for_parallelism(&*future_spawner, "state sync download part", future) + }) + .buffered(MAX_PARALLELISM_PER_SHARD_FOR_FAIRNESS) + .collect::>() + .await; + attempt_count += 1; + // Update the list of parts_to_download retaining only the ones that failed + parts_to_download = results + .iter() + .enumerate() + .filter_map(|(task_index, res)| { + res.as_ref().err().map(|_| parts_to_download[task_index]) + }) + .collect(); + } return_if_cancelled!(cancel); *status.lock().unwrap() = ShardSyncStatus::StateApplyInProgress; diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index ae744b23fd6..b3235914726 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -316,10 +316,14 @@ pub fn default_state_sync_p2p_timeout() -> Duration { Duration::seconds(10) } -pub fn default_state_sync_retry_timeout() -> Duration { +pub fn default_state_sync_retry_backoff() -> Duration { Duration::seconds(1) } +pub fn default_state_sync_external_backoff() -> Duration { + Duration::seconds(60) +} + pub fn default_header_sync_expected_height_per_second() -> u64 { 10 } @@ -453,8 +457,10 @@ pub struct ClientConfig { pub state_sync_external_timeout: Duration, /// How long to wait for a response from p2p state sync pub state_sync_p2p_timeout: Duration, - /// How long to wait between attempts to obtain a state part - pub state_sync_retry_timeout: Duration, + /// How long to wait after a failed state sync request + pub state_sync_retry_backoff: Duration, + /// Additional waiting period after a failed request to external storage + pub state_sync_external_backoff: Duration, /// Minimum number of peers to start syncing. pub min_num_peers: usize, /// Period between logging summary information. @@ -598,7 +604,8 @@ impl ClientConfig { header_sync_stall_ban_timeout: Duration::seconds(30), state_sync_external_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), state_sync_p2p_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), - state_sync_retry_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), + state_sync_retry_backoff: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), + state_sync_external_backoff: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), header_sync_expected_height_per_second: 1, min_num_peers: 1, log_summary_period: Duration::seconds(10), diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index a408960a335..85fbf8a433d 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -15,15 +15,16 @@ pub use client_config::{ default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, default_log_summary_period, default_orphan_state_witness_max_size, default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit, - default_state_sync_enabled, default_state_sync_external_timeout, - default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period, - default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, ChunkDistributionNetworkConfig, ChunkDistributionUris, - ClientConfig, DumpConfig, EpochSyncConfig, ExternalStorageConfig, ExternalStorageLocation, - GCConfig, LogSummaryStyle, ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, - DEFAULT_GC_NUM_EPOCHS_TO_KEEP, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, + default_state_sync_enabled, default_state_sync_external_backoff, + default_state_sync_external_timeout, default_state_sync_p2p_timeout, + default_state_sync_retry_backoff, default_sync_check_period, default_sync_height_threshold, + default_sync_max_block_requests, default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, + ChunkDistributionNetworkConfig, ChunkDistributionUris, ClientConfig, DumpConfig, + EpochSyncConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, LogSummaryStyle, + ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL, MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index ff1fdbde509..0d1a69c0820 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -475,7 +475,8 @@ impl TestLoopBuilder { client_config.state_sync_enabled = true; client_config.state_sync_external_timeout = Duration::milliseconds(100); client_config.state_sync_p2p_timeout = Duration::milliseconds(100); - client_config.state_sync_retry_timeout = Duration::milliseconds(100); + client_config.state_sync_retry_backoff = Duration::milliseconds(100); + client_config.state_sync_external_backoff = Duration::milliseconds(100); if let Some(num_epochs) = self.gc_num_epochs_to_keep { client_config.gc.gc_num_epochs_to_keep = num_epochs; } diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index c44b324a418..c6b3d583a90 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -14,20 +14,20 @@ use near_chain_configs::{ default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, default_log_summary_period, default_orphan_state_witness_max_size, default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit, - default_state_sync_enabled, default_state_sync_external_timeout, - default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period, - default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, get_initial_supply, ChunkDistributionNetworkConfig, - ClientConfig, EpochSyncConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode, - LogSummaryStyle, MutableConfigValue, MutableValidatorSigner, ReshardingConfig, StateSyncConfig, - BLOCK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_PRODUCER_KICKOUT_THRESHOLD, - CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD, EXPECTED_EPOCH_LENGTH, FAST_EPOCH_LENGTH, - FISHERMEN_THRESHOLD, GAS_PRICE_ADJUSTMENT_RATE, GENESIS_CONFIG_FILENAME, INITIAL_GAS_LIMIT, - MAX_INFLATION_RATE, MIN_BLOCK_PRODUCTION_DELAY, MIN_GAS_PRICE, NEAR_BASE, NUM_BLOCKS_PER_YEAR, - NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE, PROTOCOL_UPGRADE_STAKE_THRESHOLD, - TRANSACTION_VALIDITY_PERIOD, + default_state_sync_enabled, default_state_sync_external_backoff, + default_state_sync_external_timeout, default_state_sync_p2p_timeout, + default_state_sync_retry_backoff, default_sync_check_period, default_sync_height_threshold, + default_sync_max_block_requests, default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, get_initial_supply, + ChunkDistributionNetworkConfig, ClientConfig, EpochSyncConfig, GCConfig, Genesis, + GenesisConfig, GenesisValidationMode, LogSummaryStyle, MutableConfigValue, + MutableValidatorSigner, ReshardingConfig, StateSyncConfig, BLOCK_PRODUCER_KICKOUT_THRESHOLD, + CHUNK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD, + EXPECTED_EPOCH_LENGTH, FAST_EPOCH_LENGTH, FISHERMEN_THRESHOLD, GAS_PRICE_ADJUSTMENT_RATE, + GENESIS_CONFIG_FILENAME, INITIAL_GAS_LIMIT, MAX_INFLATION_RATE, MIN_BLOCK_PRODUCTION_DELAY, + MIN_GAS_PRICE, NEAR_BASE, NUM_BLOCKS_PER_YEAR, NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE, + PROTOCOL_UPGRADE_STAKE_THRESHOLD, TRANSACTION_VALIDITY_PERIOD, }; use near_config_utils::{DownloadConfigType, ValidationError, ValidationErrors}; use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey}; @@ -162,9 +162,12 @@ pub struct Consensus { #[serde(default = "default_state_sync_p2p_timeout")] #[serde(with = "near_async::time::serde_duration_as_std")] pub state_sync_p2p_timeout: Duration, - #[serde(default = "default_state_sync_retry_timeout")] + #[serde(default = "default_state_sync_retry_backoff")] #[serde(with = "near_async::time::serde_duration_as_std")] - pub state_sync_retry_timeout: Duration, + pub state_sync_retry_backoff: Duration, + #[serde(default = "default_state_sync_external_backoff")] + #[serde(with = "near_async::time::serde_duration_as_std")] + pub state_sync_external_backoff: Duration, /// Expected increase of header head weight per second during header sync #[serde(default = "default_header_sync_expected_height_per_second")] pub header_sync_expected_height_per_second: u64, @@ -207,7 +210,8 @@ impl Default for Consensus { header_sync_stall_ban_timeout: default_header_sync_stall_ban_timeout(), state_sync_external_timeout: default_state_sync_external_timeout(), state_sync_p2p_timeout: default_state_sync_p2p_timeout(), - state_sync_retry_timeout: default_state_sync_retry_timeout(), + state_sync_retry_backoff: default_state_sync_retry_backoff(), + state_sync_external_backoff: default_state_sync_external_backoff(), header_sync_expected_height_per_second: default_header_sync_expected_height_per_second( ), sync_check_period: default_sync_check_period(), @@ -562,7 +566,8 @@ impl NearConfig { .header_sync_expected_height_per_second, state_sync_external_timeout: config.consensus.state_sync_external_timeout, state_sync_p2p_timeout: config.consensus.state_sync_p2p_timeout, - state_sync_retry_timeout: config.consensus.state_sync_retry_timeout, + state_sync_retry_backoff: config.consensus.state_sync_retry_backoff, + state_sync_external_backoff: config.consensus.state_sync_external_backoff, min_num_peers: config.consensus.min_num_peers, log_summary_period: config.log_summary_period, produce_empty_blocks: config.consensus.produce_empty_blocks, diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 5a4f6ea86b7..bf4beebd52f 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -1009,6 +1009,7 @@ def apply_config_changes(node_dir: str, 'consensus.max_block_production_delay', 'consensus.max_block_wait_delay', 'consensus.state_sync_external_timeout', + 'consensus.state_sync_external_backoff', 'consensus.state_sync_p2p_timeout', 'expected_shutdown', 'log_summary_period', 'max_gas_burnt_view', 'rosetta_rpc', 'save_trie_changes', 'split_storage', 'state_sync', diff --git a/pytest/lib/state_sync_lib.py b/pytest/lib/state_sync_lib.py index b55d31f0fe6..d8d1b0c923e 100644 --- a/pytest/lib/state_sync_lib.py +++ b/pytest/lib/state_sync_lib.py @@ -40,6 +40,10 @@ def get_state_sync_configs_pair(tracked_shards=[0]): "secs": 0, "nanos": 500000000 }, + "consensus.state_sync_external_backoff": { + "secs": 0, + "nanos": 500000000 + }, "state_sync": { "sync": { "ExternalStorage": { @@ -70,6 +74,10 @@ def get_state_sync_config_combined(): "secs": 0, "nanos": 500000000 }, + "consensus.state_sync_external_backoff": { + "secs": 0, + "nanos": 500000000 + }, "state_sync": { "dump": { "location": { diff --git a/pytest/tests/sanity/epoch_switches.py b/pytest/tests/sanity/epoch_switches.py index 264b472db37..78a5c492526 100755 --- a/pytest/tests/sanity/epoch_switches.py +++ b/pytest/tests/sanity/epoch_switches.py @@ -32,7 +32,11 @@ "state_sync_p2p_timeout": { "secs": 0, "nanos": 500000000 - } + }, + "state_sync_external_backoff": { + "secs": 0, + "nanos": 500000000 + }, } } diff --git a/pytest/tests/sanity/rpc_tx_forwarding.py b/pytest/tests/sanity/rpc_tx_forwarding.py index a79cd20e674..91654b241a2 100755 --- a/pytest/tests/sanity/rpc_tx_forwarding.py +++ b/pytest/tests/sanity/rpc_tx_forwarding.py @@ -27,7 +27,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 1: { @@ -40,7 +44,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 2: { @@ -53,7 +61,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 3: { @@ -66,7 +78,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } } }) diff --git a/pytest/tests/sanity/staking2.py b/pytest/tests/sanity/staking2.py index 729f0ce4e63..11caf87c0b9 100755 --- a/pytest/tests/sanity/staking2.py +++ b/pytest/tests/sanity/staking2.py @@ -108,7 +108,11 @@ def doit(seq=[]): "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 1: { @@ -125,7 +129,11 @@ def doit(seq=[]): "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 2: { @@ -142,7 +150,11 @@ def doit(seq=[]): "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, }, "store.state_snapshot_enabled": True, } diff --git a/pytest/tests/sanity/transactions.py b/pytest/tests/sanity/transactions.py index 7c93498692a..bdc72f68f71 100755 --- a/pytest/tests/sanity/transactions.py +++ b/pytest/tests/sanity/transactions.py @@ -37,7 +37,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 1: { @@ -49,7 +53,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 2: { @@ -61,7 +69,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 3: { @@ -73,7 +85,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 4: { @@ -85,7 +101,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, }, "tracked_shards": [0, 1, 2, 3] }