From 1dd1435203272f0b7e144c2ed52a319bf03b668c Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Wed, 20 Nov 2024 16:36:08 -0500 Subject: [PATCH 1/3] state sync: improvements from mocknet testing --- chain/chain/src/chain.rs | 5 +- chain/chain/src/metrics.rs | 14 +++++ chain/client/src/client.rs | 6 ++- chain/client/src/metrics.rs | 2 +- chain/client/src/sync/state/downloader.rs | 59 ++++++++++------------ chain/client/src/sync/state/external.rs | 29 ++++++++--- chain/client/src/sync/state/mod.rs | 6 ++- chain/client/src/sync/state/network.rs | 8 +-- chain/client/src/sync/state/shard.rs | 56 +++++++++++++++----- core/chain-configs/src/client_config.rs | 15 ++++-- core/chain-configs/src/lib.rs | 19 +++---- integration-tests/src/test_loop/builder.rs | 3 +- nearcore/src/config.rs | 41 ++++++++------- 13 files changed, 167 insertions(+), 96 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 50d8030a4ef..c467f11fe35 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2776,8 +2776,10 @@ impl Chain { // Check cache let key = borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id))?; if let Ok(Some(state_part)) = self.chain_store.store().get(DBCol::StateParts, &key) { + metrics::STATE_PART_CACHE_HIT.inc(); return Ok(state_part.into()); } + metrics::STATE_PART_CACHE_MISS.inc(); let block = self .get_block(&sync_hash) @@ -2823,9 +2825,6 @@ impl Chain { self.requested_state_parts .save_state_part_elapsed(&sync_hash, &shard_id, &part_id, elapsed_ms); - // Before saving State Part data, we need to make sure we can calculate and save State Header - self.get_state_response_header(shard_id, sync_hash)?; - // Saving the part data let mut store_update = self.chain_store.store().store_update(); store_update.set(DBCol::StateParts, &key, &state_part); diff --git a/chain/chain/src/metrics.rs b/chain/chain/src/metrics.rs index f46cfb44a02..ba4ccdbbafc 100644 --- a/chain/chain/src/metrics.rs +++ b/chain/chain/src/metrics.rs @@ -103,6 +103,20 @@ pub static STATE_PART_ELAPSED: LazyLock = LazyLock::new(|| { ) .unwrap() }); +pub static STATE_PART_CACHE_HIT: LazyLock = LazyLock::new(|| { + try_create_int_counter( + "near_state_part_cache_hit", + "Total number of state parts served from db cache", + ) + .unwrap() +}); +pub static STATE_PART_CACHE_MISS: LazyLock = LazyLock::new(|| { + try_create_int_counter( + "near_state_part_cache_miss", + "Total number of state parts built on-demand", + ) + .unwrap() +}); pub static NUM_INVALID_BLOCKS: LazyLock = LazyLock::new(|| { try_create_int_gauge_vec("near_num_invalid_blocks", "Number of invalid blocks", &["error"]) .unwrap() diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index d7710bf5b96..0412774f2aa 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -324,7 +324,8 @@ impl Client { network_adapter.clone().into_sender(), config.state_sync_external_timeout, config.state_sync_p2p_timeout, - config.state_sync_retry_timeout, + config.state_sync_retry_backoff, + config.state_sync_external_backoff, &config.chain_id, &config.state_sync.sync, chain_sender_for_state_sync.clone(), @@ -2578,7 +2579,8 @@ impl Client { self.network_adapter.clone().into_sender(), self.config.state_sync_external_timeout, self.config.state_sync_p2p_timeout, - self.config.state_sync_retry_timeout, + self.config.state_sync_retry_backoff, + self.config.state_sync_external_backoff, &self.config.chain_id, &self.config.state_sync.sync, self.chain_sender_for_state_sync.clone(), diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index b38d0efc434..d40d3422ad2 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -473,7 +473,7 @@ pub(crate) static STATE_SYNC_STAGE: LazyLock = LazyLock::new(|| { pub(crate) static STATE_SYNC_DOWNLOAD_RESULT: LazyLock = LazyLock::new(|| { try_create_int_counter_vec( - "near_state_sync_header_download_result", + "near_state_sync_download_result", "Count of number of state sync downloads by type (header, part), source (network, external), and result (timeout, error, success)", &["shard_id", "type", "source", "result"], diff --git a/chain/client/src/sync/state/downloader.rs b/chain/client/src/sync/state/downloader.rs index 59fb714c4cf..f505d0455e3 100644 --- a/chain/client/src/sync/state/downloader.rs +++ b/chain/client/src/sync/state/downloader.rs @@ -33,7 +33,7 @@ pub(super) struct StateSyncDownloader { pub header_validation_sender: AsyncSender>, pub runtime: Arc, - pub retry_timeout: Duration, + pub retry_backoff: Duration, pub task_tracker: TaskTracker, } @@ -56,7 +56,7 @@ impl StateSyncDownloader { let num_attempts_before_fallback = self.num_attempts_before_fallback; let task_tracker = self.task_tracker.clone(); let clock = self.clock.clone(); - let retry_timeout = self.retry_timeout; + let retry_backoff = self.retry_backoff; async move { let handle = task_tracker.get_handle(&format!("shard {} header", shard_id)).await; handle.set_status("Reading existing header"); @@ -104,9 +104,9 @@ impl StateSyncDownloader { Err(err) => { handle.set_status(&format!( "Error: {}, will retry in {}", - err, retry_timeout + err, retry_backoff )); - let deadline = clock.now() + retry_timeout; + let deadline = clock.now() + retry_backoff; tokio::select! { _ = cancel.cancelled() => { return Err(near_chain::Error::Other("Cancelled".to_owned())); @@ -122,17 +122,19 @@ impl StateSyncDownloader { .boxed() } - /// Ensures that the shard part is downloaded and validated. If the part exists on disk, - /// just returns. Otherwise, downloads the part, validates it, and retries if needed. + /// Attempts once to ensure that the shard part is downloaded and validated. + /// If the part exists on disk, just returns. Otherwise, makes one attempt + /// to download the part and validate it. /// - /// This method will only return an error if the download cannot be completed even - /// with retries, or if the download is cancelled. - pub fn ensure_shard_part_downloaded( + /// This method will return an error if the download fails or is cancelled. + pub fn ensure_shard_part_downloaded_single_attempt( &self, shard_id: ShardId, sync_hash: CryptoHash, + state_root: CryptoHash, + num_state_parts: u64, part_id: u64, - header: ShardStateSyncResponseHeader, + num_prior_attempts: usize, cancel: CancellationToken, ) -> BoxFuture<'static, Result<(), near_chain::Error>> { let store = self.store.clone(); @@ -142,8 +144,11 @@ impl StateSyncDownloader { let num_attempts_before_fallback = self.num_attempts_before_fallback; let clock = self.clock.clone(); let task_tracker = self.task_tracker.clone(); - let retry_timeout = self.retry_timeout; + let retry_backoff = self.retry_backoff; async move { + if cancel.is_cancelled() { + return Err(near_chain::Error::Other("Cancelled".to_owned())); + } let handle = task_tracker.get_handle(&format!("shard {} part {}", shard_id, part_id)).await; handle.set_status("Reading existing part"); @@ -151,15 +156,15 @@ impl StateSyncDownloader { return Ok(()); } - let i = AtomicUsize::new(0); // for easier Rust async capture let attempt = || async { let source = if fallback_source.is_some() - && i.load(Ordering::Relaxed) >= num_attempts_before_fallback + && num_prior_attempts >= num_attempts_before_fallback { fallback_source.as_ref().unwrap().as_ref() } else { preferred_source.as_ref() }; + let part = source .download_shard_part( shard_id, @@ -169,10 +174,9 @@ impl StateSyncDownloader { cancel.clone(), ) .await?; - let state_root = header.chunk_prev_state_root(); if runtime_adapter.validate_state_part( &state_root, - PartId { idx: part_id, total: header.num_state_parts() }, + PartId { idx: part_id, total: num_state_parts }, &part, ) { let mut store_update = store.store_update(); @@ -187,25 +191,16 @@ impl StateSyncDownloader { Ok(()) }; - loop { - match attempt().await { - Ok(()) => return Ok(()), - Err(err) => { - handle.set_status(&format!( - "Error: {}, will retry in {}", - err, retry_timeout - )); - let deadline = clock.now() + retry_timeout; - tokio::select! { - _ = cancel.cancelled() => { - return Err(near_chain::Error::Other("Cancelled".to_owned())); - } - _ = clock.sleep_until(deadline) => {} - } - } + let res = attempt().await; + if let Err(ref err) = res { + handle.set_status(&format!("Error: {}, will retry in {}", err, retry_backoff)); + let deadline = clock.now() + retry_backoff; + tokio::select! { + _ = cancel.cancelled() => {} + _ = clock.sleep_until(deadline) => {} } - i.fetch_add(1, Ordering::Relaxed); } + res } .instrument(tracing::debug_span!("StateSyncDownloader::ensure_shard_part_downloaded")) .boxed() diff --git a/chain/client/src/sync/state/external.rs b/chain/client/src/sync/state/external.rs index 0b0fa128696..454226293b4 100644 --- a/chain/client/src/sync/state/external.rs +++ b/chain/client/src/sync/state/external.rs @@ -22,12 +22,14 @@ pub(super) struct StateSyncDownloadSourceExternal { pub chain_id: String, pub conn: ExternalConnection, pub timeout: Duration, + pub backoff: Duration, } impl StateSyncDownloadSourceExternal { async fn get_file_with_timeout( clock: Clock, timeout: Duration, + backoff: Duration, cancellation: CancellationToken, conn: ExternalConnection, shard_id: ShardId, @@ -46,14 +48,25 @@ impl StateSyncDownloadSourceExternal { Err(near_chain::Error::Other("Timeout".to_owned())) } _ = cancellation.cancelled() => { - increment_download_count(shard_id, typ, "external", "error"); + increment_download_count(shard_id, typ, "external", "cancelled"); Err(near_chain::Error::Other("Cancelled".to_owned())) } result = fut => { - result.map_err(|e| { - increment_download_count(shard_id, typ, "network", "error"); - near_chain::Error::Other(format!("Failed to download: {}", e)) - }) + match result { + Err(err) => { + // A download error typically indicates that the file is not available yet. At the + // start of the epoch it takes a while for dumpers to populate the external storage + // with state files. This backoff period prevents spamming requests during that time. + let deadline = clock.now() + backoff; + tokio::select! { + _ = clock.sleep_until(deadline) => {} + _ = cancellation.cancelled() => {} + } + increment_download_count(shard_id, typ, "external", "download_error"); + Err(near_chain::Error::Other(format!("Failed to download: {}", err))) + } + Ok(res) => Ok(res) + } } } } @@ -69,6 +82,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { ) -> BoxFuture> { let clock = self.clock.clone(); let timeout = self.timeout; + let backoff = self.backoff; let chain_id = self.chain_id.clone(); let conn = self.conn.clone(); let store = self.store.clone(); @@ -86,6 +100,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { let data = Self::get_file_with_timeout( clock, timeout, + backoff, cancel, conn, shard_id, @@ -94,7 +109,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { ) .await?; let header = ShardStateSyncResponseHeader::try_from_slice(&data).map_err(|e| { - increment_download_count(shard_id, "header", "external", "error"); + increment_download_count(shard_id, "header", "external", "parse_error"); near_chain::Error::Other(format!("Failed to parse header: {}", e)) })?; @@ -115,6 +130,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { ) -> BoxFuture, near_chain::Error>> { let clock = self.clock.clone(); let timeout = self.timeout; + let backoff = self.backoff; let chain_id = self.chain_id.clone(); let conn = self.conn.clone(); let store = self.store.clone(); @@ -137,6 +153,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal { let data = Self::get_file_with_timeout( clock, timeout, + backoff, cancel, conn, shard_id, diff --git a/chain/client/src/sync/state/mod.rs b/chain/client/src/sync/state/mod.rs index fd0ecc4763a..3eb84a0e213 100644 --- a/chain/client/src/sync/state/mod.rs +++ b/chain/client/src/sync/state/mod.rs @@ -91,7 +91,8 @@ impl StateSync { network_adapter: AsyncSender, external_timeout: Duration, p2p_timeout: Duration, - retry_timeout: Duration, + retry_backoff: Duration, + external_backoff: Duration, chain_id: &str, sync_config: &SyncConfig, chain_requests_sender: ChainSenderForStateSync, @@ -147,6 +148,7 @@ impl StateSync { chain_id: chain_id.to_string(), conn: external, timeout: external_timeout, + backoff: external_backoff, }) as Arc; ( Some(fallback_source), @@ -166,7 +168,7 @@ impl StateSync { num_attempts_before_fallback, header_validation_sender: chain_requests_sender.clone().into_sender(), runtime: runtime.clone(), - retry_timeout, + retry_backoff, task_tracker: downloading_task_tracker.clone(), }); diff --git a/chain/client/src/sync/state/network.rs b/chain/client/src/sync/state/network.rs index 36162aa9e14..a68db23423f 100644 --- a/chain/client/src/sync/state/network.rs +++ b/chain/client/src/sync/state/network.rs @@ -189,12 +189,12 @@ impl StateSyncDownloadSourcePeer { match request_sender.send_async(network_request).await { Ok(response) => { if let NetworkResponses::RouteNotFound = response.as_network_response() { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "route_not_found"); return Err(near_chain::Error::Other("Route not found".to_owned())); } } Err(e) => { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "failed_to_send"); return Err(near_chain::Error::Other(format!("Failed to send request: {}", e))); } } @@ -206,7 +206,7 @@ impl StateSyncDownloadSourcePeer { Err(near_chain::Error::Other("Timeout".to_owned())) } _ = cancel.cancelled() => { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "cancelled"); Err(near_chain::Error::Other("Cancelled".to_owned())) } result = receiver => { @@ -216,7 +216,7 @@ impl StateSyncDownloadSourcePeer { Ok(result) } Err(_) => { - increment_download_count(key.shard_id, typ, "network", "error"); + increment_download_count(key.shard_id, typ, "network", "sender_dropped"); Err(near_chain::Error::Other("Sender dropped".to_owned())) }, } diff --git a/chain/client/src/sync/state/shard.rs b/chain/client/src/sync/state/shard.rs index a174c4cd3a4..0ef25a07bd6 100644 --- a/chain/client/src/sync/state/shard.rs +++ b/chain/client/src/sync/state/shard.rs @@ -18,6 +18,8 @@ use near_primitives::types::{EpochId, ShardId}; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus}; use near_store::{DBCol, ShardUId, Store}; +use rand::prelude::SliceRandom; +use rand::thread_rng; use std::sync::{Arc, Mutex}; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; @@ -82,20 +84,46 @@ pub(super) async fn run_state_sync_for_shard( return_if_cancelled!(cancel); *status.lock().unwrap() = ShardSyncStatus::StateDownloadParts; - tokio_stream::iter(0..num_parts) - .map(|part_id| { - let future = downloader.ensure_shard_part_downloaded( - shard_id, - sync_hash, - part_id, - header.clone(), - cancel.clone(), - ); - respawn_for_parallelism(&*future_spawner, "state sync download part", future) - }) - .buffer_unordered(MAX_PARALLELISM_PER_SHARD_FOR_FAIRNESS) - .try_collect::>() - .await?; + let mut parts_to_download: Vec = (0..num_parts).collect(); + { + // Peer selection is designed such that different nodes downloading the same part will tend + // to send the requests to the same host. It allows the host to benefit from caching the part. + // + // At the start of an epoch, a number of nodes begin state sync at the same time. If we + // don't randomize the order in which the parts are requested, the nodes will request the + // parts in roughly the same order, producing spikes of traffic to the same hosts. + let mut rng = thread_rng(); + parts_to_download.shuffle(&mut rng); + } + let mut attempt_count = 0; + while !parts_to_download.is_empty() { + return_if_cancelled!(cancel); + let results = tokio_stream::iter(parts_to_download.clone()) + .map(|part_id| { + let future = downloader.ensure_shard_part_downloaded_single_attempt( + shard_id, + sync_hash, + state_root, + num_parts, + part_id, + attempt_count, + cancel.clone(), + ); + respawn_for_parallelism(&*future_spawner, "state sync download part", future) + }) + .buffered(MAX_PARALLELISM_PER_SHARD_FOR_FAIRNESS) + .collect::>() + .await; + attempt_count += 1; + // Update the list of parts_to_download retaining only the ones that failed + parts_to_download = results + .iter() + .enumerate() + .filter_map(|(task_index, res)| { + res.as_ref().err().map(|_| parts_to_download[task_index]) + }) + .collect(); + } return_if_cancelled!(cancel); *status.lock().unwrap() = ShardSyncStatus::StateApplyInProgress; diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index ae744b23fd6..b3235914726 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -316,10 +316,14 @@ pub fn default_state_sync_p2p_timeout() -> Duration { Duration::seconds(10) } -pub fn default_state_sync_retry_timeout() -> Duration { +pub fn default_state_sync_retry_backoff() -> Duration { Duration::seconds(1) } +pub fn default_state_sync_external_backoff() -> Duration { + Duration::seconds(60) +} + pub fn default_header_sync_expected_height_per_second() -> u64 { 10 } @@ -453,8 +457,10 @@ pub struct ClientConfig { pub state_sync_external_timeout: Duration, /// How long to wait for a response from p2p state sync pub state_sync_p2p_timeout: Duration, - /// How long to wait between attempts to obtain a state part - pub state_sync_retry_timeout: Duration, + /// How long to wait after a failed state sync request + pub state_sync_retry_backoff: Duration, + /// Additional waiting period after a failed request to external storage + pub state_sync_external_backoff: Duration, /// Minimum number of peers to start syncing. pub min_num_peers: usize, /// Period between logging summary information. @@ -598,7 +604,8 @@ impl ClientConfig { header_sync_stall_ban_timeout: Duration::seconds(30), state_sync_external_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), state_sync_p2p_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), - state_sync_retry_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), + state_sync_retry_backoff: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), + state_sync_external_backoff: Duration::seconds(TEST_STATE_SYNC_TIMEOUT), header_sync_expected_height_per_second: 1, min_num_peers: 1, log_summary_period: Duration::seconds(10), diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index a408960a335..85fbf8a433d 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -15,15 +15,16 @@ pub use client_config::{ default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, default_log_summary_period, default_orphan_state_witness_max_size, default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit, - default_state_sync_enabled, default_state_sync_external_timeout, - default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period, - default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, ChunkDistributionNetworkConfig, ChunkDistributionUris, - ClientConfig, DumpConfig, EpochSyncConfig, ExternalStorageConfig, ExternalStorageLocation, - GCConfig, LogSummaryStyle, ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, - DEFAULT_GC_NUM_EPOCHS_TO_KEEP, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, + default_state_sync_enabled, default_state_sync_external_backoff, + default_state_sync_external_timeout, default_state_sync_p2p_timeout, + default_state_sync_retry_backoff, default_sync_check_period, default_sync_height_threshold, + default_sync_max_block_requests, default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, + ChunkDistributionNetworkConfig, ChunkDistributionUris, ClientConfig, DumpConfig, + EpochSyncConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, LogSummaryStyle, + ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL, MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 4e2a32479fb..59657d335d7 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -488,7 +488,8 @@ impl TestLoopBuilder { client_config.state_sync_enabled = true; client_config.state_sync_external_timeout = Duration::milliseconds(100); client_config.state_sync_p2p_timeout = Duration::milliseconds(100); - client_config.state_sync_retry_timeout = Duration::milliseconds(100); + client_config.state_sync_retry_backoff = Duration::milliseconds(100); + client_config.state_sync_external_backoff = Duration::milliseconds(100); if let Some(num_epochs) = self.gc_num_epochs_to_keep { client_config.gc.gc_num_epochs_to_keep = num_epochs; } diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 102725ace16..1d354e03736 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -14,20 +14,20 @@ use near_chain_configs::{ default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, default_log_summary_period, default_orphan_state_witness_max_size, default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit, - default_state_sync_enabled, default_state_sync_external_timeout, - default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period, - default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, get_initial_supply, ChunkDistributionNetworkConfig, - ClientConfig, EpochSyncConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode, - LogSummaryStyle, MutableConfigValue, MutableValidatorSigner, ReshardingConfig, StateSyncConfig, - BLOCK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_PRODUCER_KICKOUT_THRESHOLD, - CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD, EXPECTED_EPOCH_LENGTH, FAST_EPOCH_LENGTH, - FISHERMEN_THRESHOLD, GAS_PRICE_ADJUSTMENT_RATE, GENESIS_CONFIG_FILENAME, INITIAL_GAS_LIMIT, - MAX_INFLATION_RATE, MIN_BLOCK_PRODUCTION_DELAY, MIN_GAS_PRICE, NEAR_BASE, NUM_BLOCKS_PER_YEAR, - NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE, PROTOCOL_UPGRADE_STAKE_THRESHOLD, - TRANSACTION_VALIDITY_PERIOD, + default_state_sync_enabled, default_state_sync_external_backoff, + default_state_sync_external_timeout, default_state_sync_p2p_timeout, + default_state_sync_retry_backoff, default_sync_check_period, default_sync_height_threshold, + default_sync_max_block_requests, default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, get_initial_supply, + ChunkDistributionNetworkConfig, ClientConfig, EpochSyncConfig, GCConfig, Genesis, + GenesisConfig, GenesisValidationMode, LogSummaryStyle, MutableConfigValue, + MutableValidatorSigner, ReshardingConfig, StateSyncConfig, BLOCK_PRODUCER_KICKOUT_THRESHOLD, + CHUNK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD, + EXPECTED_EPOCH_LENGTH, FAST_EPOCH_LENGTH, FISHERMEN_THRESHOLD, GAS_PRICE_ADJUSTMENT_RATE, + GENESIS_CONFIG_FILENAME, INITIAL_GAS_LIMIT, MAX_INFLATION_RATE, MIN_BLOCK_PRODUCTION_DELAY, + MIN_GAS_PRICE, NEAR_BASE, NUM_BLOCKS_PER_YEAR, NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE, + PROTOCOL_UPGRADE_STAKE_THRESHOLD, TRANSACTION_VALIDITY_PERIOD, }; use near_config_utils::{DownloadConfigType, ValidationError, ValidationErrors}; use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey}; @@ -162,9 +162,12 @@ pub struct Consensus { #[serde(default = "default_state_sync_p2p_timeout")] #[serde(with = "near_async::time::serde_duration_as_std")] pub state_sync_p2p_timeout: Duration, - #[serde(default = "default_state_sync_retry_timeout")] + #[serde(default = "default_state_sync_retry_backoff")] #[serde(with = "near_async::time::serde_duration_as_std")] - pub state_sync_retry_timeout: Duration, + pub state_sync_retry_backoff: Duration, + #[serde(default = "default_state_sync_external_backoff")] + #[serde(with = "near_async::time::serde_duration_as_std")] + pub state_sync_external_backoff: Duration, /// Expected increase of header head weight per second during header sync #[serde(default = "default_header_sync_expected_height_per_second")] pub header_sync_expected_height_per_second: u64, @@ -207,7 +210,8 @@ impl Default for Consensus { header_sync_stall_ban_timeout: default_header_sync_stall_ban_timeout(), state_sync_external_timeout: default_state_sync_external_timeout(), state_sync_p2p_timeout: default_state_sync_p2p_timeout(), - state_sync_retry_timeout: default_state_sync_retry_timeout(), + state_sync_retry_backoff: default_state_sync_retry_backoff(), + state_sync_external_backoff: default_state_sync_external_backoff(), header_sync_expected_height_per_second: default_header_sync_expected_height_per_second( ), sync_check_period: default_sync_check_period(), @@ -562,7 +566,8 @@ impl NearConfig { .header_sync_expected_height_per_second, state_sync_external_timeout: config.consensus.state_sync_external_timeout, state_sync_p2p_timeout: config.consensus.state_sync_p2p_timeout, - state_sync_retry_timeout: config.consensus.state_sync_retry_timeout, + state_sync_retry_backoff: config.consensus.state_sync_retry_backoff, + state_sync_external_backoff: config.consensus.state_sync_external_backoff, min_num_peers: config.consensus.min_num_peers, log_summary_period: config.log_summary_period, produce_empty_blocks: config.consensus.produce_empty_blocks, From f2d4496d892dabf43d3a4b5b2b5b84595f2575c7 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sun, 1 Dec 2024 15:53:47 -0500 Subject: [PATCH 2/3] fix debug span name --- chain/client/src/sync/state/downloader.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chain/client/src/sync/state/downloader.rs b/chain/client/src/sync/state/downloader.rs index f505d0455e3..17ba4638ff9 100644 --- a/chain/client/src/sync/state/downloader.rs +++ b/chain/client/src/sync/state/downloader.rs @@ -202,7 +202,9 @@ impl StateSyncDownloader { } res } - .instrument(tracing::debug_span!("StateSyncDownloader::ensure_shard_part_downloaded")) + .instrument(tracing::debug_span!( + "StateSyncDownloader::ensure_shard_part_downloaded_single_attempt" + )) .boxed() } } From 1dd5e089a0beab93c30b2ee22a11e66ff54e91a6 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Wed, 4 Dec 2024 09:08:38 -0500 Subject: [PATCH 3/3] set reasonable external_backoff in tests --- pytest/lib/cluster.py | 1 + pytest/lib/state_sync_lib.py | 8 +++++++ pytest/tests/sanity/epoch_switches.py | 6 ++++- pytest/tests/sanity/rpc_tx_forwarding.py | 24 +++++++++++++++---- pytest/tests/sanity/staking2.py | 18 +++++++++++--- pytest/tests/sanity/transactions.py | 30 ++++++++++++++++++++---- 6 files changed, 74 insertions(+), 13 deletions(-) diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 5a4f6ea86b7..bf4beebd52f 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -1009,6 +1009,7 @@ def apply_config_changes(node_dir: str, 'consensus.max_block_production_delay', 'consensus.max_block_wait_delay', 'consensus.state_sync_external_timeout', + 'consensus.state_sync_external_backoff', 'consensus.state_sync_p2p_timeout', 'expected_shutdown', 'log_summary_period', 'max_gas_burnt_view', 'rosetta_rpc', 'save_trie_changes', 'split_storage', 'state_sync', diff --git a/pytest/lib/state_sync_lib.py b/pytest/lib/state_sync_lib.py index b55d31f0fe6..d8d1b0c923e 100644 --- a/pytest/lib/state_sync_lib.py +++ b/pytest/lib/state_sync_lib.py @@ -40,6 +40,10 @@ def get_state_sync_configs_pair(tracked_shards=[0]): "secs": 0, "nanos": 500000000 }, + "consensus.state_sync_external_backoff": { + "secs": 0, + "nanos": 500000000 + }, "state_sync": { "sync": { "ExternalStorage": { @@ -70,6 +74,10 @@ def get_state_sync_config_combined(): "secs": 0, "nanos": 500000000 }, + "consensus.state_sync_external_backoff": { + "secs": 0, + "nanos": 500000000 + }, "state_sync": { "dump": { "location": { diff --git a/pytest/tests/sanity/epoch_switches.py b/pytest/tests/sanity/epoch_switches.py index 264b472db37..78a5c492526 100755 --- a/pytest/tests/sanity/epoch_switches.py +++ b/pytest/tests/sanity/epoch_switches.py @@ -32,7 +32,11 @@ "state_sync_p2p_timeout": { "secs": 0, "nanos": 500000000 - } + }, + "state_sync_external_backoff": { + "secs": 0, + "nanos": 500000000 + }, } } diff --git a/pytest/tests/sanity/rpc_tx_forwarding.py b/pytest/tests/sanity/rpc_tx_forwarding.py index a79cd20e674..91654b241a2 100755 --- a/pytest/tests/sanity/rpc_tx_forwarding.py +++ b/pytest/tests/sanity/rpc_tx_forwarding.py @@ -27,7 +27,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 1: { @@ -40,7 +44,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 2: { @@ -53,7 +61,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 3: { @@ -66,7 +78,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } } }) diff --git a/pytest/tests/sanity/staking2.py b/pytest/tests/sanity/staking2.py index 729f0ce4e63..11caf87c0b9 100755 --- a/pytest/tests/sanity/staking2.py +++ b/pytest/tests/sanity/staking2.py @@ -108,7 +108,11 @@ def doit(seq=[]): "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 1: { @@ -125,7 +129,11 @@ def doit(seq=[]): "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 2: { @@ -142,7 +150,11 @@ def doit(seq=[]): "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, }, "store.state_snapshot_enabled": True, } diff --git a/pytest/tests/sanity/transactions.py b/pytest/tests/sanity/transactions.py index 7c93498692a..bdc72f68f71 100755 --- a/pytest/tests/sanity/transactions.py +++ b/pytest/tests/sanity/transactions.py @@ -37,7 +37,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 1: { @@ -49,7 +53,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 2: { @@ -61,7 +69,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 3: { @@ -73,7 +85,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, } }, 4: { @@ -85,7 +101,11 @@ "state_sync_p2p_timeout": { "secs": 2, "nanos": 0 - } + }, + "state_sync_external_backoff": { + "secs": 2, + "nanos": 0 + }, }, "tracked_shards": [0, 1, 2, 3] }