From f69de7eb7c7b0ded80c24b344c75a236cfab4d19 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 1 Mar 2024 15:40:08 +0200 Subject: [PATCH 01/16] chainHead: Ensure reasonable distance between leaf and finalized block Signed-off-by: Alexandru Vasile --- .../src/chain_head/chain_head_follow.rs | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index e94374aebd91..9b85c0a00234 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -41,7 +41,10 @@ use sp_api::CallApiAt; use sp_blockchain::{ Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info, }; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; +use sp_runtime::{ + traits::{Block as BlockT, Header as HeaderT, NumberFor}, + SaturatedConversion, Saturating, +}; use std::{collections::HashSet, sync::Arc}; use super::subscription::InsertedSubscriptionData; @@ -177,6 +180,38 @@ where } } + /// Check the distance between the provided blocks does not exceed a + /// a reasonable range. + /// + /// When the blocks are too far apart (potentially millions of blocks): + /// - Tree route is expensive to calculate. + /// - The RPC layer will not be able to generate the `NewBlock` events for all blocks. + /// + /// This edge-case can happen for parachains where the relay chain syncs slower to + /// the head of the chain than the parachain node that is synched already. + fn distace_within_reason( + &self, + block: Block::Hash, + finalized: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + let Some(block_num) = self.client.number(block)? else { + return Err(SubscriptionManagementError::BlockHashAbsent) + }; + let Some(finalized_num) = self.client.number(finalized)? else { + return Err(SubscriptionManagementError::BlockHashAbsent) + }; + + // Note: u32 would be enough here. + let distance: u64 = block_num.saturating_sub(finalized_num).saturated_into(); + if distance > 128 { + return Err(SubscriptionManagementError::Custom( + "Distance between the blocks is too large".into(), + )); + } + + Ok(()) + } + /// Get the in-memory blocks of the client, starting from the provided finalized hash. fn get_init_blocks_with_forks( &self, @@ -188,6 +223,13 @@ where let mut pruned_forks = HashSet::new(); let mut finalized_block_descendants = Vec::new(); let mut unique_descendants = HashSet::new(); + + // Ensure all leaves are within a reasonable distance from the finalized block, + // before traversing the tree. + for leaf in &leaves { + self.distace_within_reason(*leaf, finalized)?; + } + for leaf in leaves { let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?; From 1afeaf8e7a38c947cd8ac6d3a5b843b14c87d51a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 1 Mar 2024 16:41:37 +0200 Subject: [PATCH 02/16] chainHead: Introduce custom error for distance too large Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 2 +- .../src/chain_head/chain_head_follow.rs | 19 ++++++++++--------- .../src/chain_head/subscription/error.rs | 4 ++++ 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 2bda22b45239..38406e375605 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -206,7 +206,7 @@ where sub_id.clone(), ); - chain_head_follow.generate_events(sink, sub_data).await; + let _result = chain_head_follow.generate_events(sink, sub_data).await; subscriptions.remove_subscription(&sub_id); debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 9b85c0a00234..5e80e9857677 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -204,9 +204,7 @@ where // Note: u32 would be enough here. let distance: u64 = block_num.saturating_sub(finalized_num).saturated_into(); if distance > 128 { - return Err(SubscriptionManagementError::Custom( - "Distance between the blocks is too large".into(), - )); + return Err(SubscriptionManagementError::BlockDistanceTooLarge); } Ok(()) @@ -547,7 +545,8 @@ where mut to_ignore: HashSet, sink: SubscriptionSink, rx_stop: oneshot::Receiver<()>, - ) where + ) -> Result<(), SubscriptionManagementError> + where EventStream: Stream> + Unpin, { let mut stream_item = stream.next(); @@ -576,7 +575,7 @@ where ); let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; - return + return Err(err) }, }; @@ -591,7 +590,8 @@ where let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; - return + // No need to propagate this error further, the client disconnected. + return Ok(()) } } @@ -603,6 +603,7 @@ where // or the `Stop` receiver was triggered. let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; + Ok(()) } /// Generate the block events for the `chainHead_follow` method. @@ -610,7 +611,7 @@ where &mut self, sink: SubscriptionSink, sub_data: InsertedSubscriptionData, - ) { + ) -> Result<(), SubscriptionManagementError> { // Register for the new block and finalized notifications. let stream_import = self .client @@ -638,7 +639,7 @@ where ); let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; - return + return Err(err) }, }; @@ -648,6 +649,6 @@ where let stream = stream::once(futures::future::ready(initial)).chain(merged); self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop) - .await; + .await } } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs index 2c22e51ca4dc..91ce26db22a5 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs @@ -41,6 +41,9 @@ pub enum SubscriptionManagementError { /// The unpin method was called with duplicate hashes. #[error("Duplicate hashes")] DuplicateHashes, + /// The distance between the leaves and the current finalized block is too large. + #[error("Distance too large")] + BlockDistanceTooLarge, /// Custom error. #[error("Subscription error {0}")] Custom(String), @@ -57,6 +60,7 @@ impl PartialEq for SubscriptionManagementError { (Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) | (Self::SubscriptionAbsent, Self::SubscriptionAbsent) | (Self::DuplicateHashes, Self::DuplicateHashes) => true, + (Self::BlockDistanceTooLarge, Self::BlockDistanceTooLarge) => true, (Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs, _ => false, } From 6f02a089f2470185f81cbac657d390b9823bcdc3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 13:36:30 +0200 Subject: [PATCH 03/16] chainHead: Temporarily suspend subscriptions Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 20 ++++- .../client/rpc-spec-v2/src/chain_head/mod.rs | 1 + .../src/chain_head/suspend_subscriptions.rs | 90 +++++++++++++++++++ 3 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 substrate/client/rpc-spec-v2/src/chain_head/suspend_subscriptions.rs diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 38406e375605..033d5dbf3df5 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -29,6 +29,7 @@ use crate::{ error::Error as ChainHeadRpcError, event::{FollowEvent, MethodResponse, OperationError}, subscription::{SubscriptionManagement, SubscriptionManagementError}, + suspend_subscriptions::SuspendSubscriptions, }, common::events::StorageQuery, hex_string, SubscriptionTaskExecutor, @@ -110,6 +111,10 @@ pub struct ChainHead, Block: BlockT, Client> { /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. operation_max_storage_items: usize, + /// The distance between the leaves and the finalized block is too large. + /// + /// Suspends the subscriptions for a given amount of time. + suspend_subscriptions: SuspendSubscriptions, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -133,6 +138,7 @@ impl, Block: BlockT, Client> ChainHead { backend, )), operation_max_storage_items: config.operation_max_storage_items, + suspend_subscriptions: SuspendSubscriptions::new(Duration::from_secs(30)), _phantom: PhantomData, } } @@ -180,12 +186,20 @@ where let subscriptions = self.subscriptions.clone(); let backend = self.backend.clone(); let client = self.client.clone(); + let suspend_subscriptions = self.suspend_subscriptions.clone(); let fut = async move { let Ok(sink) = pending.accept().await else { return }; let sub_id = read_subscription_id_as_string(&sink); + if suspend_subscriptions.is_suspended() { + debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription suspended", sub_id); + let msg = to_sub_message(&sink, &FollowEvent::::Stop); + let _ = sink.send(msg).await; + return + } + // Keep track of the subscription. let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime) else { @@ -205,8 +219,12 @@ where with_runtime, sub_id.clone(), ); + let result = chain_head_follow.generate_events(sink, sub_data).await; - let _result = chain_head_follow.generate_events(sink, sub_data).await; + if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result { + debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription suspended", sub_id); + suspend_subscriptions.suspend_subscriptions(); + } subscriptions.remove_subscription(&sub_id); debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index c9fe19aca2b1..bd4c9343e66b 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -32,6 +32,7 @@ pub mod chain_head; pub mod error; pub mod event; +mod suspend_subscriptions; mod chain_head_follow; mod chain_head_storage; mod subscription; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/suspend_subscriptions.rs b/substrate/client/rpc-spec-v2/src/chain_head/suspend_subscriptions.rs new file mode 100644 index 000000000000..9c34b963112f --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/suspend_subscriptions.rs @@ -0,0 +1,90 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Temporarily ban subscriptions if the distance between the leaves +//! and the current finalized block is too large. + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use parking_lot::Mutex; + +#[derive(Debug)] +struct SuspendSubscriptionsInner { + /// The time at which the subscriptions where banned. + instant: Option, + /// The amount of time the subscriptions are banned for. + duration: Duration, +} + +/// Suspend the subscriptions for a given amount of time. +#[derive(Debug, Clone)] +pub struct SuspendSubscriptions { + inner: Arc>, +} + +impl SuspendSubscriptions { + /// Construct a new [`SuspendSubscriptions`]. + /// + /// The given parameter is the duration for which the subscriptions are banned for. + pub fn new(duration: Duration) -> Self { + Self { inner: Arc::new(Mutex::new(SuspendSubscriptionsInner { instant: None, duration })) } + } + + /// Suspend all subscriptions for the given duration. + pub fn suspend_subscriptions(&self) { + let mut inner = self.inner.lock(); + + inner.instant = Some(Instant::now()); + } + + /// Check if the subscriptions are banned. + pub fn is_suspended(&self) -> bool { + let mut inner = self.inner.lock(); + + match inner.instant { + Some(time) => { + if time.elapsed() > inner.duration { + inner.instant = None; + return false + } + true + }, + None => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn suspend_subscriptions() { + let mut suspend = SuspendSubscriptions::new(Duration::from_secs(1)); + assert!(!suspend.is_suspended()); + + suspend.suspend_subscriptions(); + assert!(suspend.is_suspended()); + + std::thread::sleep(Duration::from_secs(2)); + assert!(!suspend.is_suspended()); + } +} From 831f0955a812ab54c3062ddbd1e4d226e56e2940 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 14:11:59 +0200 Subject: [PATCH 04/16] chainHead: Move suspending to subscription management Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 25 ++++---------- .../client/rpc-spec-v2/src/chain_head/mod.rs | 1 - .../src/chain_head/subscription/inner.rs | 29 +++++++++++++++- .../src/chain_head/subscription/mod.rs | 14 ++++++++ .../suspend.rs} | 34 ++++++------------- 5 files changed, 58 insertions(+), 45 deletions(-) rename substrate/client/rpc-spec-v2/src/chain_head/{suspend_subscriptions.rs => subscription/suspend.rs} (77%) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 033d5dbf3df5..0f1c859c4e66 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -29,7 +29,6 @@ use crate::{ error::Error as ChainHeadRpcError, event::{FollowEvent, MethodResponse, OperationError}, subscription::{SubscriptionManagement, SubscriptionManagementError}, - suspend_subscriptions::SuspendSubscriptions, }, common::events::StorageQuery, hex_string, SubscriptionTaskExecutor, @@ -111,10 +110,6 @@ pub struct ChainHead, Block: BlockT, Client> { /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. operation_max_storage_items: usize, - /// The distance between the leaves and the finalized block is too large. - /// - /// Suspends the subscriptions for a given amount of time. - suspend_subscriptions: SuspendSubscriptions, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -135,10 +130,10 @@ impl, Block: BlockT, Client> ChainHead { config.global_max_pinned_blocks, config.subscription_max_pinned_duration, config.subscription_max_ongoing_operations, + Duration::from_secs(30), backend, )), operation_max_storage_items: config.operation_max_storage_items, - suspend_subscriptions: SuspendSubscriptions::new(Duration::from_secs(30)), _phantom: PhantomData, } } @@ -186,26 +181,18 @@ where let subscriptions = self.subscriptions.clone(); let backend = self.backend.clone(); let client = self.client.clone(); - let suspend_subscriptions = self.suspend_subscriptions.clone(); let fut = async move { let Ok(sink) = pending.accept().await else { return }; let sub_id = read_subscription_id_as_string(&sink); - if suspend_subscriptions.is_suspended() { - debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription suspended", sub_id); - let msg = to_sub_message(&sink, &FollowEvent::::Stop); - let _ = sink.send(msg).await; - return - } - // Keep track of the subscription. let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime) else { - // Inserting the subscription can only fail if the JsonRPSee - // generated a duplicate subscription ID. - debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id); + // Inserting the subscription can only fail if the JsonRPSee generated a duplicate + // subscription ID; or subscriptions are suspended. + debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted or suspended", sub_id); let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; return @@ -222,8 +209,8 @@ where let result = chain_head_follow.generate_events(sink, sub_data).await; if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result { - debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription suspended", sub_id); - suspend_subscriptions.suspend_subscriptions(); + debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are suspended", sub_id); + subscriptions.suspend_subscriptions(); } subscriptions.remove_subscription(&sub_id); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index bd4c9343e66b..c9fe19aca2b1 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -32,7 +32,6 @@ pub mod chain_head; pub mod error; pub mod event; -mod suspend_subscriptions; mod chain_head_follow; mod chain_head_storage; mod subscription; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index d2879679501f..c3f3a87b70b2 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -27,7 +27,11 @@ use std::{ time::{Duration, Instant}, }; -use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent}; +use crate::chain_head::{ + chain_head::LOG_TARGET, + subscription::{suspend::SuspendSubscriptions, SubscriptionManagementError}, + FollowEvent, +}; /// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings. const QUEUE_SIZE_WARNING: usize = 512; @@ -560,6 +564,9 @@ pub struct SubscriptionsInner> { max_ongoing_operations: usize, /// Map the subscription ID to internal details of the subscription. subs: HashMap>, + /// Suspend subscriptions for a given amount of time. + suspend: SuspendSubscriptions, + /// Backend pinning / unpinning blocks. /// /// The `Arc` is handled one level-above, but substrate exposes the backend as Arc. @@ -572,6 +579,7 @@ impl> SubscriptionsInner { global_max_pinned_blocks: usize, local_max_pin_duration: Duration, max_ongoing_operations: usize, + suspend_duration: Duration, backend: Arc, ) -> Self { SubscriptionsInner { @@ -580,6 +588,7 @@ impl> SubscriptionsInner { local_max_pin_duration, max_ongoing_operations, subs: Default::default(), + suspend: SuspendSubscriptions::new(suspend_duration), backend, } } @@ -590,6 +599,11 @@ impl> SubscriptionsInner { sub_id: String, with_runtime: bool, ) -> Option> { + if self.suspend.is_suspended() { + log::trace!(target: LOG_TARGET, "[id={:?}] Subscription already suspended", sub_id); + return None + } + if let Entry::Vacant(entry) = self.subs.entry(sub_id) { let (tx_stop, rx_stop) = oneshot::channel(); let (response_sender, response_receiver) = @@ -623,6 +637,19 @@ impl> SubscriptionsInner { } } + /// Suspends all subscriptions for the given duration. + /// + /// All active subscriptions are removed. + pub fn suspend_subscriptions(&mut self) { + self.suspend.suspend_subscriptions(); + + let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect(); + + for sub_id in to_remove { + self.remove_subscription(&sub_id); + } + } + /// Ensure that a new block could be pinned. /// /// If the global number of blocks has been reached this method diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index c830e662da2e..968e49b49cc0 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -23,6 +23,7 @@ use std::{sync::Arc, time::Duration}; mod error; mod inner; +mod suspend; use self::inner::SubscriptionsInner; @@ -43,6 +44,7 @@ impl> SubscriptionManagement { global_max_pinned_blocks: usize, local_max_pin_duration: Duration, max_ongoing_operations: usize, + suspend_duration: Duration, backend: Arc, ) -> Self { SubscriptionManagement { @@ -50,6 +52,7 @@ impl> SubscriptionManagement { global_max_pinned_blocks, local_max_pin_duration, max_ongoing_operations, + suspend_duration, backend, )), } @@ -75,6 +78,17 @@ impl> SubscriptionManagement { inner.remove_subscription(sub_id) } + /// Suspends all subscriptions for the given duration. + /// + /// For all active subscriptions, the internal data is discarded, blocks are unpinned and the + /// `Stop` event will be generated. + /// + /// For incoming subscriptions, only the `Stop` event is delivered. + pub fn suspend_subscriptions(&self) { + let mut inner = self.inner.write(); + inner.suspend_subscriptions() + } + /// The block is pinned in the backend only once when the block's hash is first encountered. /// /// Each subscription is expected to call this method twice: diff --git a/substrate/client/rpc-spec-v2/src/chain_head/suspend_subscriptions.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/suspend.rs similarity index 77% rename from substrate/client/rpc-spec-v2/src/chain_head/suspend_subscriptions.rs rename to substrate/client/rpc-spec-v2/src/chain_head/subscription/suspend.rs index 9c34b963112f..02ff2bfc6ea2 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/suspend_subscriptions.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/suspend.rs @@ -19,50 +19,36 @@ //! Temporarily ban subscriptions if the distance between the leaves //! and the current finalized block is too large. -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; - -use parking_lot::Mutex; +use std::time::{Duration, Instant}; +/// Suspend the subscriptions for a given amount of time. #[derive(Debug)] -struct SuspendSubscriptionsInner { +pub struct SuspendSubscriptions { /// The time at which the subscriptions where banned. instant: Option, /// The amount of time the subscriptions are banned for. duration: Duration, } -/// Suspend the subscriptions for a given amount of time. -#[derive(Debug, Clone)] -pub struct SuspendSubscriptions { - inner: Arc>, -} - impl SuspendSubscriptions { /// Construct a new [`SuspendSubscriptions`]. /// /// The given parameter is the duration for which the subscriptions are banned for. pub fn new(duration: Duration) -> Self { - Self { inner: Arc::new(Mutex::new(SuspendSubscriptionsInner { instant: None, duration })) } + Self { instant: None, duration } } /// Suspend all subscriptions for the given duration. - pub fn suspend_subscriptions(&self) { - let mut inner = self.inner.lock(); - - inner.instant = Some(Instant::now()); + pub fn suspend_subscriptions(&mut self) { + self.instant = Some(Instant::now()); } /// Check if the subscriptions are banned. - pub fn is_suspended(&self) -> bool { - let mut inner = self.inner.lock(); - - match inner.instant { + pub fn is_suspended(&mut self) -> bool { + match self.instant { Some(time) => { - if time.elapsed() > inner.duration { - inner.instant = None; + if time.elapsed() > self.duration { + self.instant = None; return false } true From 887d38061c5461283808603e4bdecc86fe4f43cf Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 14:25:09 +0200 Subject: [PATCH 05/16] chainHead/subs/tests: Adjust testing to suspending changes Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 72 ++++++++++++++----- 1 file changed, 56 insertions(+), 16 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index c3f3a87b70b2..99ac12a49f43 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -1062,8 +1062,13 @@ mod tests { let hash_3 = block.header.hash(); futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let mut subs = - SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1102,8 +1107,13 @@ mod tests { fn subscription_lock_block() { let builder = TestClientBuilder::new(); let backend = builder.backend(); - let mut subs = - SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id = "abc".to_string(); let hash = H256::random(); @@ -1142,8 +1152,13 @@ mod tests { let hash = block.hash(); futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let mut subs = - SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id = "abc".to_string(); let _stop = subs.insert_subscription(id.clone(), true).unwrap(); @@ -1179,8 +1194,13 @@ mod tests { let hash = block.header.hash(); futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let mut subs = - SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id = "abc".to_string(); let _stop = subs.insert_subscription(id.clone(), true).unwrap(); @@ -1249,8 +1269,13 @@ mod tests { let hash_3 = block.header.hash(); futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let mut subs = - SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1316,8 +1341,13 @@ mod tests { futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); // Maximum number of pinned blocks is 2. - let mut subs = - SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 2, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1388,8 +1418,13 @@ mod tests { futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); // Maximum number of pinned blocks is 2 and maximum pin duration is 5 second. - let mut subs = - SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 2, + Duration::from_secs(5), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1438,8 +1473,13 @@ mod tests { fn subscription_check_stop_event() { let builder = TestClientBuilder::new(); let backend = builder.backend(); - let mut subs = - SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let mut subs = SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(10), + backend, + ); let id = "abc".to_string(); From 34a28ec82a00d448500d536339b7143ee270c443 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 14:30:47 +0200 Subject: [PATCH 06/16] chainHead/subs/tests: Check subspended subscriptions Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 99ac12a49f43..d170d06b0954 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -1522,4 +1522,89 @@ mod tests { let permit_three = ops.reserve_at_most(1).unwrap(); assert_eq!(permit_three.num_ops, 1); } + + #[test] + fn suspend_subscriptions() { + let (backend, mut client) = init_backend(); + + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().genesis_hash) + .with_parent_block_number(0) + .build() + .unwrap() + .build() + .unwrap() + .block; + let hash_1 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(hash_1) + .with_parent_block_number(1) + .build() + .unwrap() + .build() + .unwrap() + .block; + let hash_2 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(hash_2) + .with_parent_block_number(2) + .build() + .unwrap() + .build() + .unwrap() + .block; + let hash_3 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + let mut subs = SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + Duration::from_secs(3), + backend, + ); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); + let id_3 = "abcde".to_string(); + + // Pin all blocks for the first subscription. + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true); + + // Pin only block 2 for the second subscription. + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1); + assert_eq!(subs.global_blocks.len(), 3); + + // Suspend all subscriptions. + assert!(!subs.suspend.is_suspended()); + subs.suspend_subscriptions(); + assert!(subs.suspend.is_suspended()); + + // A new subscription cannot be inserted while suspended. + let result = subs.insert_subscription(id_3.clone(), true); + assert!(result.is_none()); + + // Check reference count. + assert_eq!(subs.global_blocks.len(), 0); + + // Sleep 5 seconds. + std::thread::sleep(std::time::Duration::from_secs(5)); + + assert!(!subs.suspend.is_suspended()); + + // Subscriptions can be inserted again. + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + let _stop = subs.insert_subscription(id_3.clone(), true).unwrap(); + } } From 4da5073faa460c17a99e1f101c57a59fc46d7c60 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 14:42:28 +0200 Subject: [PATCH 07/16] chainHead/subs/tests: Simplify block production Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 231 ++++-------------- 1 file changed, 52 insertions(+), 179 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index d170d06b0954..e1b9cf182cf1 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -905,6 +905,30 @@ mod tests { (backend, client) } + fn produce_blocks( + mut client: Arc>>, + num_blocks: usize, + ) -> Vec<::Hash> { + let mut blocks = Vec::with_capacity(num_blocks); + let mut parent_hash = client.chain_info().genesis_hash; + + for i in 0..num_blocks { + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(parent_hash) + .with_parent_block_number(i as u64) + .build() + .unwrap() + .build() + .unwrap() + .block; + parent_hash = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + blocks.push(block.header.hash()); + } + + blocks + } + #[test] fn block_state_machine_register_unpin() { let mut state = BlockStateMachine::new(); @@ -1030,37 +1054,10 @@ mod tests { #[test] fn unpin_duplicate_hashes() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); let mut subs = SubscriptionsInner::new( 10, @@ -1139,18 +1136,10 @@ mod tests { #[test] fn subscription_check_block() { - let (backend, mut client) = init_backend(); - - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash = block.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 1); + let hash = hashes[0]; let mut subs = SubscriptionsInner::new( 10, @@ -1182,17 +1171,10 @@ mod tests { #[test] fn subscription_ref_count() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 1); + let hash = hashes[0]; let mut subs = SubscriptionsInner::new( 10, @@ -1237,37 +1219,10 @@ mod tests { #[test] fn subscription_remove_subscription() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); let mut subs = SubscriptionsInner::new( 10, @@ -1308,37 +1263,10 @@ mod tests { #[test] fn subscription_check_limits() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); // Maximum number of pinned blocks is 2. let mut subs = SubscriptionsInner::new( @@ -1385,37 +1313,10 @@ mod tests { #[test] fn subscription_check_limits_with_duration() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); // Maximum number of pinned blocks is 2 and maximum pin duration is 5 second. let mut subs = SubscriptionsInner::new( @@ -1525,38 +1426,10 @@ mod tests { #[test] fn suspend_subscriptions() { - let (backend, mut client) = init_backend(); - - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); let mut subs = SubscriptionsInner::new( 10, From e51b932159998e3ce8516b6caeff87d405333379 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 14:49:54 +0200 Subject: [PATCH 08/16] chainHead: Add config for suspended subscriptions Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 14 +++++++++++++- .../src/chain_head/chain_head_follow.rs | 1 - .../rpc-spec-v2/src/chain_head/tests.rs | 19 +++++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 0f1c859c4e66..d62c42a8bde2 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -62,6 +62,11 @@ pub struct ChainHeadConfig { pub subscription_max_pinned_duration: Duration, /// The maximum number of ongoing operations per subscription. pub subscription_max_ongoing_operations: usize, + /// The amount of time for which the subscriptions are suspended. + /// + /// Subscriptions are suspended when the distance between any leaf + /// and the finalized block is too large. + pub suspended_duration: Duration, /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. pub operation_max_storage_items: usize, @@ -86,12 +91,19 @@ const MAX_ONGOING_OPERATIONS: usize = 16; /// before paginations is required. const MAX_STORAGE_ITER_ITEMS: usize = 5; +/// The amount of time for which the subscriptions are suspended. +/// +/// Subscriptions are suspended when the distance between any leaf +/// and the finalized block is too large. +const SUSPENDED_DURATION: Duration = Duration::from_secs(30); + impl Default for ChainHeadConfig { fn default() -> Self { ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, + suspended_duration: SUSPENDED_DURATION, operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, } } @@ -130,7 +142,7 @@ impl, Block: BlockT, Client> ChainHead { config.global_max_pinned_blocks, config.subscription_max_pinned_duration, config.subscription_max_ongoing_operations, - Duration::from_secs(30), + config.suspended_duration, backend, )), operation_max_storage_items: config.operation_max_storage_items, diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 5e80e9857677..6ae2857579c3 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -201,7 +201,6 @@ where return Err(SubscriptionManagementError::BlockHashAbsent) }; - // Note: u32 would be enough here. let distance: u64 = block_num.saturating_sub(finalized_num).saturated_into(); if distance > 128 { return Err(SubscriptionManagementError::BlockDistanceTooLarge); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index 89d8c4ce2713..aa6e0531c9a3 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -58,6 +58,7 @@ type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; +const MAX_SUSPENDED_SECS: u64 = 30; const MAX_PAGINATION_LIMIT: usize = 5; const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; @@ -113,6 +114,7 @@ async fn setup_api() -> ( subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -163,6 +165,7 @@ async fn follow_subscription_produces_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -231,6 +234,7 @@ async fn follow_with_runtime() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -543,6 +547,7 @@ async fn call_runtime_without_flag() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -1201,6 +1206,7 @@ async fn separate_operation_ids_for_subscriptions() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -1289,6 +1295,7 @@ async fn follow_generates_initial_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -1444,6 +1451,7 @@ async fn follow_exceeding_pinned_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -1520,6 +1528,7 @@ async fn follow_with_unpin() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -1631,6 +1640,7 @@ async fn unpin_duplicate_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -1733,6 +1743,7 @@ async fn follow_with_multiple_unpin_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -1886,6 +1897,7 @@ async fn follow_prune_best_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -2071,6 +2083,7 @@ async fn follow_forks_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -2222,6 +2235,7 @@ async fn follow_report_multiple_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -2467,6 +2481,7 @@ async fn pin_block_references() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -2604,6 +2619,7 @@ async fn follow_finalized_before_new_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -2718,6 +2734,7 @@ async fn ensure_operation_limits_works() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -2822,6 +2839,7 @@ async fn check_continue_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); @@ -3004,6 +3022,7 @@ async fn stop_storage_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), }, ) .into_rpc(); From 9024461c9be88e1e577a1b56554db3addaba525a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 15:17:08 +0200 Subject: [PATCH 09/16] chainHead: Configure the lagging distance Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 18 ++++++++++++++++++ .../src/chain_head/chain_head_follow.rs | 18 +++++++++++++++--- .../rpc-spec-v2/src/chain_head/tests.rs | 19 +++++++++++++++++++ 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index d62c42a8bde2..637794d1713d 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -62,6 +62,11 @@ pub struct ChainHeadConfig { pub subscription_max_pinned_duration: Duration, /// The maximum number of ongoing operations per subscription. pub subscription_max_ongoing_operations: usize, + /// Suspend the subscriptions if the distance between the leaves and the current finalized + /// block is larger than this value. + /// + /// Subscriptions are suspended for the `suspended_duration`. + pub suspend_on_lagging_distance: usize, /// The amount of time for which the subscriptions are suspended. /// /// Subscriptions are suspended when the distance between any leaf @@ -91,6 +96,12 @@ const MAX_ONGOING_OPERATIONS: usize = 16; /// before paginations is required. const MAX_STORAGE_ITER_ITEMS: usize = 5; +/// Suspend the subscriptions if the distance between the leaves and the current finalized +/// block is larger than this value. +/// +/// Subscriptions are suspended for the `suspended_duration`. +const SUSPEND_ON_LAGGING_DISTANCE: usize = 128; + /// The amount of time for which the subscriptions are suspended. /// /// Subscriptions are suspended when the distance between any leaf @@ -104,6 +115,7 @@ impl Default for ChainHeadConfig { subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, suspended_duration: SUSPENDED_DURATION, + suspend_on_lagging_distance: SUSPEND_ON_LAGGING_DISTANCE, operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, } } @@ -122,6 +134,9 @@ pub struct ChainHead, Block: BlockT, Client> { /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. operation_max_storage_items: usize, + /// Suspend the subscriptions if the distance between the leaves and the current finalized + /// block is larger than this value. + suspend_on_lagging_distance: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -146,6 +161,7 @@ impl, Block: BlockT, Client> ChainHead { backend, )), operation_max_storage_items: config.operation_max_storage_items, + suspend_on_lagging_distance: config.suspend_on_lagging_distance, _phantom: PhantomData, } } @@ -193,6 +209,7 @@ where let subscriptions = self.subscriptions.clone(); let backend = self.backend.clone(); let client = self.client.clone(); + let suspend_on_lagging_distance = self.suspend_on_lagging_distance; let fut = async move { let Ok(sink) = pending.accept().await else { return }; @@ -217,6 +234,7 @@ where subscriptions.clone(), with_runtime, sub_id.clone(), + suspend_on_lagging_distance, ); let result = chain_head_follow.generate_events(sink, sub_data).await; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 6ae2857579c3..5383bbe517ee 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -63,6 +63,9 @@ pub struct ChainHeadFollower, Block: BlockT, Client> { sub_id: String, /// The best reported block by this subscription. best_block_cache: Option, + /// Suspend the subscriptions if the distance between the leaves and the current finalized + /// block is larger than this value. + suspend_on_lagging_distance: usize, } impl, Block: BlockT, Client> ChainHeadFollower { @@ -73,8 +76,17 @@ impl, Block: BlockT, Client> ChainHeadFollower>, with_runtime: bool, sub_id: String, + suspend_on_lagging_distance: usize, ) -> Self { - Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None } + Self { + client, + backend, + sub_handle, + with_runtime, + sub_id, + best_block_cache: None, + suspend_on_lagging_distance, + } } } @@ -201,8 +213,8 @@ where return Err(SubscriptionManagementError::BlockHashAbsent) }; - let distance: u64 = block_num.saturating_sub(finalized_num).saturated_into(); - if distance > 128 { + let distance: usize = block_num.saturating_sub(finalized_num).saturated_into(); + if distance > self.suspend_on_lagging_distance { return Err(SubscriptionManagementError::BlockDistanceTooLarge); } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index aa6e0531c9a3..03bf597537b3 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -60,6 +60,7 @@ const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; const MAX_SUSPENDED_SECS: u64 = 30; const MAX_PAGINATION_LIMIT: usize = 5; +const MAX_LAGGING_DISTANCE: usize = 128; const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; const VALUE: &[u8] = b"hello world"; @@ -115,6 +116,7 @@ async fn setup_api() -> ( subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -166,6 +168,7 @@ async fn follow_subscription_produces_blocks() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -235,6 +238,7 @@ async fn follow_with_runtime() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -548,6 +552,7 @@ async fn call_runtime_without_flag() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1207,6 +1212,7 @@ async fn separate_operation_ids_for_subscriptions() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1296,6 +1302,7 @@ async fn follow_generates_initial_blocks() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1452,6 +1459,7 @@ async fn follow_exceeding_pinned_blocks() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1529,6 +1537,7 @@ async fn follow_with_unpin() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1641,6 +1650,7 @@ async fn unpin_duplicate_hashes() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1744,6 +1754,7 @@ async fn follow_with_multiple_unpin_hashes() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1898,6 +1909,7 @@ async fn follow_prune_best_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2084,6 +2096,7 @@ async fn follow_forks_pruned_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2236,6 +2249,7 @@ async fn follow_report_multiple_pruned_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2482,6 +2496,7 @@ async fn pin_block_references() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2620,6 +2635,7 @@ async fn follow_finalized_before_new_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2735,6 +2751,7 @@ async fn ensure_operation_limits_works() { subscription_max_ongoing_operations: 1, operation_max_storage_items: MAX_PAGINATION_LIMIT, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2840,6 +2857,7 @@ async fn check_continue_operation() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -3023,6 +3041,7 @@ async fn stop_storage_operation() { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), + suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); From e1826004175d8a1552f76908015640d2c77d12db Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 4 Mar 2024 16:09:57 +0200 Subject: [PATCH 10/16] chainHead/tests: Check suspension and lagging distance Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/tests.rs | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index 03bf597537b3..346c8f079cbc 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -3327,3 +3327,90 @@ async fn storage_closest_merkle_value() { merkle_values_rhs.get(&hex_string(b":AAAA")).unwrap() ); } + +#[tokio::test] +async fn chain_head_suspend_subscriptions() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + // Configure the chainHead to suspend subscriptions for 5 seconds + // and to suspend on lagging distance of 5 blocks. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + suspended_duration: Duration::from_secs(5), + suspend_on_lagging_distance: 5, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + + // Import 6 blocks in total to trigger the suspension distance. + let mut parent_hash = client.chain_info().genesis_hash; + for i in 0..6 { + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(parent_hash) + .with_parent_block_number(i) + .build() + .unwrap() + .build() + .unwrap() + .block; + + let hash = block.hash(); + parent_hash = hash; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + } + + let mut second_sub = + api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Lagging detected, the stop event is delivered immediately. + assert_matches!( + get_next_event::>(&mut second_sub).await, + FollowEvent::Stop + ); + + // Ensure that all subscriptions are stopped. + assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); + + // Other subscriptions cannot be started until the suspension period is over. + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Should receive the stop event immediately. + assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); + + // Wait for the suspension period to be over. + tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; + + // For the next subscription: + // - duration must be over + // - lagging distance must be smaller. + client.finalize_block(parent_hash, None).unwrap(); + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); +} From 0be419d1df5fa0d1b6d7cff48294335aa34a6ae8 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 2 Apr 2024 12:42:39 +0300 Subject: [PATCH 11/16] Update substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs Co-authored-by: Sebastian Kunert --- .../client/rpc-spec-v2/src/chain_head/subscription/inner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index e1b9cf182cf1..5f62d8d2495e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -637,7 +637,7 @@ impl> SubscriptionsInner { } } - /// Suspends all subscriptions for the given duration. + /// Suspends all subscriptions for the configured duration. /// /// All active subscriptions are removed. pub fn suspend_subscriptions(&mut self) { From 7563d9a8384b259efed99f87c07ba43edc2e4bcb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 2 Apr 2024 12:43:16 +0300 Subject: [PATCH 12/16] Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs Co-authored-by: Sebastian Kunert --- .../client/rpc-spec-v2/src/chain_head/chain_head_follow.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 828cd54efd5f..dab5c767c08e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -208,7 +208,7 @@ where /// - The RPC layer will not be able to generate the `NewBlock` events for all blocks. /// /// This edge-case can happen for parachains where the relay chain syncs slower to - /// the head of the chain than the parachain node that is synched already. + /// the head of the chain than the parachain node that is synced already. fn distace_within_reason( &self, block: Block::Hash, From e9ae91480ab9d4fb3baf67f7c0822fe8af7ba264 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 2 Apr 2024 19:41:37 +0300 Subject: [PATCH 13/16] chainHead: Remove all active subscriptions instead of suspending time Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 15 +--- .../src/chain_head/subscription/inner.rs | 21 +---- .../src/chain_head/subscription/mod.rs | 11 +-- .../src/chain_head/subscription/suspend.rs | 76 ------------------- 4 files changed, 6 insertions(+), 117 deletions(-) delete mode 100644 substrate/client/rpc-spec-v2/src/chain_head/subscription/suspend.rs diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 637794d1713d..810512dccc30 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -67,11 +67,6 @@ pub struct ChainHeadConfig { /// /// Subscriptions are suspended for the `suspended_duration`. pub suspend_on_lagging_distance: usize, - /// The amount of time for which the subscriptions are suspended. - /// - /// Subscriptions are suspended when the distance between any leaf - /// and the finalized block is too large. - pub suspended_duration: Duration, /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. pub operation_max_storage_items: usize, @@ -102,19 +97,12 @@ const MAX_STORAGE_ITER_ITEMS: usize = 5; /// Subscriptions are suspended for the `suspended_duration`. const SUSPEND_ON_LAGGING_DISTANCE: usize = 128; -/// The amount of time for which the subscriptions are suspended. -/// -/// Subscriptions are suspended when the distance between any leaf -/// and the finalized block is too large. -const SUSPENDED_DURATION: Duration = Duration::from_secs(30); - impl Default for ChainHeadConfig { fn default() -> Self { ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, - suspended_duration: SUSPENDED_DURATION, suspend_on_lagging_distance: SUSPEND_ON_LAGGING_DISTANCE, operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, } @@ -157,7 +145,6 @@ impl, Block: BlockT, Client> ChainHead { config.global_max_pinned_blocks, config.subscription_max_pinned_duration, config.subscription_max_ongoing_operations, - config.suspended_duration, backend, )), operation_max_storage_items: config.operation_max_storage_items, @@ -240,7 +227,7 @@ where if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result { debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are suspended", sub_id); - subscriptions.suspend_subscriptions(); + subscriptions.stop_all_subscriptions(); } subscriptions.remove_subscription(&sub_id); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index e1b9cf182cf1..9b2b0a2c69f9 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -27,11 +27,7 @@ use std::{ time::{Duration, Instant}, }; -use crate::chain_head::{ - chain_head::LOG_TARGET, - subscription::{suspend::SuspendSubscriptions, SubscriptionManagementError}, - FollowEvent, -}; +use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent}; /// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings. const QUEUE_SIZE_WARNING: usize = 512; @@ -564,8 +560,6 @@ pub struct SubscriptionsInner> { max_ongoing_operations: usize, /// Map the subscription ID to internal details of the subscription. subs: HashMap>, - /// Suspend subscriptions for a given amount of time. - suspend: SuspendSubscriptions, /// Backend pinning / unpinning blocks. /// @@ -579,7 +573,6 @@ impl> SubscriptionsInner { global_max_pinned_blocks: usize, local_max_pin_duration: Duration, max_ongoing_operations: usize, - suspend_duration: Duration, backend: Arc, ) -> Self { SubscriptionsInner { @@ -588,7 +581,6 @@ impl> SubscriptionsInner { local_max_pin_duration, max_ongoing_operations, subs: Default::default(), - suspend: SuspendSubscriptions::new(suspend_duration), backend, } } @@ -599,11 +591,6 @@ impl> SubscriptionsInner { sub_id: String, with_runtime: bool, ) -> Option> { - if self.suspend.is_suspended() { - log::trace!(target: LOG_TARGET, "[id={:?}] Subscription already suspended", sub_id); - return None - } - if let Entry::Vacant(entry) = self.subs.entry(sub_id) { let (tx_stop, rx_stop) = oneshot::channel(); let (response_sender, response_receiver) = @@ -637,12 +624,8 @@ impl> SubscriptionsInner { } } - /// Suspends all subscriptions for the given duration. - /// /// All active subscriptions are removed. - pub fn suspend_subscriptions(&mut self) { - self.suspend.suspend_subscriptions(); - + pub fn stop_all_subscriptions(&mut self) { let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect(); for sub_id in to_remove { diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 968e49b49cc0..0c219c5e3455 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -23,7 +23,6 @@ use std::{sync::Arc, time::Duration}; mod error; mod inner; -mod suspend; use self::inner::SubscriptionsInner; @@ -44,7 +43,6 @@ impl> SubscriptionManagement { global_max_pinned_blocks: usize, local_max_pin_duration: Duration, max_ongoing_operations: usize, - suspend_duration: Duration, backend: Arc, ) -> Self { SubscriptionManagement { @@ -52,7 +50,6 @@ impl> SubscriptionManagement { global_max_pinned_blocks, local_max_pin_duration, max_ongoing_operations, - suspend_duration, backend, )), } @@ -78,15 +75,13 @@ impl> SubscriptionManagement { inner.remove_subscription(sub_id) } - /// Suspends all subscriptions for the given duration. + /// Stop all active subscriptions. /// /// For all active subscriptions, the internal data is discarded, blocks are unpinned and the /// `Stop` event will be generated. - /// - /// For incoming subscriptions, only the `Stop` event is delivered. - pub fn suspend_subscriptions(&self) { + pub fn stop_all_subscriptions(&self) { let mut inner = self.inner.write(); - inner.suspend_subscriptions() + inner.stop_all_subscriptions() } /// The block is pinned in the backend only once when the block's hash is first encountered. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/suspend.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/suspend.rs deleted file mode 100644 index 02ff2bfc6ea2..000000000000 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/suspend.rs +++ /dev/null @@ -1,76 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Temporarily ban subscriptions if the distance between the leaves -//! and the current finalized block is too large. - -use std::time::{Duration, Instant}; - -/// Suspend the subscriptions for a given amount of time. -#[derive(Debug)] -pub struct SuspendSubscriptions { - /// The time at which the subscriptions where banned. - instant: Option, - /// The amount of time the subscriptions are banned for. - duration: Duration, -} - -impl SuspendSubscriptions { - /// Construct a new [`SuspendSubscriptions`]. - /// - /// The given parameter is the duration for which the subscriptions are banned for. - pub fn new(duration: Duration) -> Self { - Self { instant: None, duration } - } - - /// Suspend all subscriptions for the given duration. - pub fn suspend_subscriptions(&mut self) { - self.instant = Some(Instant::now()); - } - - /// Check if the subscriptions are banned. - pub fn is_suspended(&mut self) -> bool { - match self.instant { - Some(time) => { - if time.elapsed() > self.duration { - self.instant = None; - return false - } - true - }, - None => false, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn suspend_subscriptions() { - let mut suspend = SuspendSubscriptions::new(Duration::from_secs(1)); - assert!(!suspend.is_suspended()); - - suspend.suspend_subscriptions(); - assert!(suspend.is_suspended()); - - std::thread::sleep(Duration::from_secs(2)); - assert!(!suspend.is_suspended()); - } -} From e500f753809845e38c6e7935b81d9fe5efcde35f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 2 Apr 2024 20:00:28 +0300 Subject: [PATCH 14/16] chainHead/tests: Adjust testing Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 30 +++-- .../src/chain_head/chain_head_follow.rs | 10 +- .../src/chain_head/subscription/inner.rs | 108 ++++-------------- .../rpc-spec-v2/src/chain_head/tests.rs | 87 +++++++------- 4 files changed, 79 insertions(+), 156 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 810512dccc30..8a317290c3f1 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -62,11 +62,9 @@ pub struct ChainHeadConfig { pub subscription_max_pinned_duration: Duration, /// The maximum number of ongoing operations per subscription. pub subscription_max_ongoing_operations: usize, - /// Suspend the subscriptions if the distance between the leaves and the current finalized + /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. - /// - /// Subscriptions are suspended for the `suspended_duration`. - pub suspend_on_lagging_distance: usize, + pub max_lagging_distance: usize, /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. pub operation_max_storage_items: usize, @@ -91,11 +89,9 @@ const MAX_ONGOING_OPERATIONS: usize = 16; /// before paginations is required. const MAX_STORAGE_ITER_ITEMS: usize = 5; -/// Suspend the subscriptions if the distance between the leaves and the current finalized +/// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. -/// -/// Subscriptions are suspended for the `suspended_duration`. -const SUSPEND_ON_LAGGING_DISTANCE: usize = 128; +const MAX_LAGGING_DISTANCE: usize = 128; impl Default for ChainHeadConfig { fn default() -> Self { @@ -103,7 +99,7 @@ impl Default for ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, - suspend_on_lagging_distance: SUSPEND_ON_LAGGING_DISTANCE, + max_lagging_distance: MAX_LAGGING_DISTANCE, operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, } } @@ -122,9 +118,9 @@ pub struct ChainHead, Block: BlockT, Client> { /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. operation_max_storage_items: usize, - /// Suspend the subscriptions if the distance between the leaves and the current finalized + /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. - suspend_on_lagging_distance: usize, + max_lagging_distance: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -148,7 +144,7 @@ impl, Block: BlockT, Client> ChainHead { backend, )), operation_max_storage_items: config.operation_max_storage_items, - suspend_on_lagging_distance: config.suspend_on_lagging_distance, + max_lagging_distance: config.max_lagging_distance, _phantom: PhantomData, } } @@ -196,7 +192,7 @@ where let subscriptions = self.subscriptions.clone(); let backend = self.backend.clone(); let client = self.client.clone(); - let suspend_on_lagging_distance = self.suspend_on_lagging_distance; + let max_lagging_distance = self.max_lagging_distance; let fut = async move { let Ok(sink) = pending.accept().await else { return }; @@ -207,8 +203,8 @@ where let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime) else { // Inserting the subscription can only fail if the JsonRPSee generated a duplicate - // subscription ID; or subscriptions are suspended. - debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted or suspended", sub_id); + // subscription ID. + debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id); let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; return @@ -221,12 +217,12 @@ where subscriptions.clone(), with_runtime, sub_id.clone(), - suspend_on_lagging_distance, + max_lagging_distance, ); let result = chain_head_follow.generate_events(sink, sub_data).await; if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result { - debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are suspended", sub_id); + debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id); subscriptions.stop_all_subscriptions(); } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 828cd54efd5f..ed746fa9bae6 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -69,9 +69,9 @@ pub struct ChainHeadFollower, Block: BlockT, Client> { sub_id: String, /// The best reported block by this subscription. best_block_cache: Option, - /// Suspend the subscriptions if the distance between the leaves and the current finalized + /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. - suspend_on_lagging_distance: usize, + max_lagging_distance: usize, } impl, Block: BlockT, Client> ChainHeadFollower { @@ -82,7 +82,7 @@ impl, Block: BlockT, Client> ChainHeadFollower>, with_runtime: bool, sub_id: String, - suspend_on_lagging_distance: usize, + max_lagging_distance: usize, ) -> Self { Self { client, @@ -91,7 +91,7 @@ impl, Block: BlockT, Client> ChainHeadFollower self.suspend_on_lagging_distance { + if distance > self.max_lagging_distance { return Err(SubscriptionManagementError::BlockDistanceTooLarge); } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 9b2b0a2c69f9..8ac4d96954f3 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -1042,13 +1042,8 @@ mod tests { let hashes = produce_blocks(client, 3); let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); - let mut subs = SubscriptionsInner::new( - 10, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1087,13 +1082,8 @@ mod tests { fn subscription_lock_block() { let builder = TestClientBuilder::new(); let backend = builder.backend(); - let mut subs = SubscriptionsInner::new( - 10, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); let hash = H256::random(); @@ -1124,13 +1114,8 @@ mod tests { let hashes = produce_blocks(client, 1); let hash = hashes[0]; - let mut subs = SubscriptionsInner::new( - 10, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); let _stop = subs.insert_subscription(id.clone(), true).unwrap(); @@ -1159,13 +1144,8 @@ mod tests { let hashes = produce_blocks(client, 1); let hash = hashes[0]; - let mut subs = SubscriptionsInner::new( - 10, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); let _stop = subs.insert_subscription(id.clone(), true).unwrap(); @@ -1207,13 +1187,8 @@ mod tests { let hashes = produce_blocks(client, 3); let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); - let mut subs = SubscriptionsInner::new( - 10, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1252,13 +1227,8 @@ mod tests { let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); // Maximum number of pinned blocks is 2. - let mut subs = SubscriptionsInner::new( - 2, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1302,13 +1272,8 @@ mod tests { let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); // Maximum number of pinned blocks is 2 and maximum pin duration is 5 second. - let mut subs = SubscriptionsInner::new( - 2, - Duration::from_secs(5), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -1357,13 +1322,8 @@ mod tests { fn subscription_check_stop_event() { let builder = TestClientBuilder::new(); let backend = builder.backend(); - let mut subs = SubscriptionsInner::new( - 10, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(10), - backend, - ); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); @@ -1408,22 +1368,16 @@ mod tests { } #[test] - fn suspend_subscriptions() { + fn stop_all_subscriptions() { let (backend, client) = init_backend(); let hashes = produce_blocks(client, 3); let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); - let mut subs = SubscriptionsInner::new( - 10, - Duration::from_secs(10), - MAX_OPERATIONS_PER_SUB, - Duration::from_secs(3), - backend, - ); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); - let id_3 = "abcde".to_string(); // Pin all blocks for the first subscription. let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); @@ -1441,26 +1395,8 @@ mod tests { assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1); assert_eq!(subs.global_blocks.len(), 3); - // Suspend all subscriptions. - assert!(!subs.suspend.is_suspended()); - subs.suspend_subscriptions(); - assert!(subs.suspend.is_suspended()); - - // A new subscription cannot be inserted while suspended. - let result = subs.insert_subscription(id_3.clone(), true); - assert!(result.is_none()); - - // Check reference count. - assert_eq!(subs.global_blocks.len(), 0); - - // Sleep 5 seconds. - std::thread::sleep(std::time::Duration::from_secs(5)); - - assert!(!subs.suspend.is_suspended()); - - // Subscriptions can be inserted again. - let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); - let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); - let _stop = subs.insert_subscription(id_3.clone(), true).unwrap(); + // Stop all active subscriptions. + subs.stop_all_subscriptions(); + assert!(subs.global_blocks.is_empty()); } } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index cb8c3f3ffcb2..187e22ee0000 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -58,7 +58,6 @@ type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; -const MAX_SUSPENDED_SECS: u64 = 30; const MAX_PAGINATION_LIMIT: usize = 5; const MAX_LAGGING_DISTANCE: usize = 128; const INVALID_HASH: [u8; 32] = [1; 32]; @@ -115,8 +114,7 @@ async fn setup_api() -> ( subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -167,8 +165,8 @@ async fn follow_subscription_produces_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -237,8 +235,8 @@ async fn follow_with_runtime() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -551,8 +549,8 @@ async fn call_runtime_without_flag() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1211,8 +1209,8 @@ async fn separate_operation_ids_for_subscriptions() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1301,8 +1299,8 @@ async fn follow_generates_initial_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1458,8 +1456,8 @@ async fn follow_exceeding_pinned_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1536,8 +1534,8 @@ async fn follow_with_unpin() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1649,8 +1647,8 @@ async fn unpin_duplicate_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1753,8 +1751,8 @@ async fn follow_with_multiple_unpin_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -1908,8 +1906,8 @@ async fn follow_prune_best_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2095,8 +2093,8 @@ async fn follow_forks_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2256,8 +2254,8 @@ async fn follow_report_multiple_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2503,8 +2501,8 @@ async fn pin_block_references() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2642,8 +2640,8 @@ async fn follow_finalized_before_new_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2758,8 +2756,8 @@ async fn ensure_operation_limits_works() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -2864,8 +2862,8 @@ async fn check_continue_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -3048,8 +3046,8 @@ async fn stop_storage_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, - suspended_duration: Duration::from_secs(MAX_SUSPENDED_SECS), - suspend_on_lagging_distance: MAX_LAGGING_DISTANCE, + + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -3337,13 +3335,12 @@ async fn storage_closest_merkle_value() { } #[tokio::test] -async fn chain_head_suspend_subscriptions() { +async fn chain_head_stop_all_subscriptions() { let builder = TestClientBuilder::new(); let backend = builder.backend(); let mut client = Arc::new(builder.build()); - // Configure the chainHead to suspend subscriptions for 5 seconds - // and to suspend on lagging distance of 5 blocks. + // Configure the chainHead to stop all subscriptions on lagging distance of 5 blocks. let api = ChainHead::new( client.clone(), backend, @@ -3353,8 +3350,7 @@ async fn chain_head_suspend_subscriptions() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, - suspended_duration: Duration::from_secs(5), - suspend_on_lagging_distance: 5, + max_lagging_distance: 5, }, ) .into_rpc(); @@ -3408,12 +3404,7 @@ async fn chain_head_suspend_subscriptions() { // Should receive the stop event immediately. assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); - // Wait for the suspension period to be over. - tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; - - // For the next subscription: - // - duration must be over - // - lagging distance must be smaller. + // For the next subscription, lagging distance must be smaller. client.finalize_block(parent_hash, None).unwrap(); let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); From 0e276cccfe014328e8d844cdaed2adb7549fbbb7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 3 Apr 2024 14:17:19 +0300 Subject: [PATCH 15/16] chainHead: Refactor master with reserved subscriptions Signed-off-by: Alexandru Vasile --- Cargo.lock | 4 ++-- .../rpc-spec-v2/src/chain_head/chain_head.rs | 2 +- .../src/chain_head/subscription/mod.rs | 18 +++++++++--------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e66b2a017df8..55ef63fa2bf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8182,9 +8182,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.11" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi 0.11.0+wasi-snapshot-preview1", diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index df0ae147bfd7..86d9a726d7be 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -240,7 +240,7 @@ where let result = chain_head_follow.generate_events(sink, sub_data).await; if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result { debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id); - subscriptions.stop_all_subscriptions(); + reserved_subscription.stop_all_subscriptions(); } debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 0e4b2bbb8c37..f266c9d8b34f 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -121,15 +121,6 @@ impl> SubscriptionManagement { inner.remove_subscription(sub_id) } - /// Stop all active subscriptions. - /// - /// For all active subscriptions, the internal data is discarded, blocks are unpinned and the - /// `Stop` event will be generated. - pub fn stop_all_subscriptions(&self) { - let mut inner = self.inner.write(); - inner.stop_all_subscriptions() - } - /// The block is pinned in the backend only once when the block's hash is first encountered. /// /// Each subscription is expected to call this method twice: @@ -242,6 +233,15 @@ impl> ReservedSubscription { }, } } + + /// Stop all active subscriptions. + /// + /// For all active subscriptions, the internal data is discarded, blocks are unpinned and the + /// `Stop` event will be generated. + pub fn stop_all_subscriptions(&self) { + let mut inner = self.inner.write(); + inner.stop_all_subscriptions() + } } impl> Drop for ReservedSubscription { From f2cbb647203e03a6204669b9b2f24ba039b13981 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 3 Apr 2024 14:24:03 +0300 Subject: [PATCH 16/16] chainHead/tests: Adjust testing Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/chain_head/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index cd44e2016711..c2bff7c50d5e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -89,6 +89,7 @@ pub async fn run_server() -> std::net::SocketAddr { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, max_follow_subscriptions_per_connection: 1, + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc();