Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainHead: Ensure reasonable distance between leaf and finalized block #3562

Merged
merged 19 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f69de7e
chainHead: Ensure reasonable distance between leaf and finalized block
lexnv Mar 1, 2024
1afeaf8
chainHead: Introduce custom error for distance too large
lexnv Mar 1, 2024
6f02a08
chainHead: Temporarily suspend subscriptions
lexnv Mar 4, 2024
831f095
chainHead: Move suspending to subscription management
lexnv Mar 4, 2024
887d380
chainHead/subs/tests: Adjust testing to suspending changes
lexnv Mar 4, 2024
34a28ec
chainHead/subs/tests: Check subspended subscriptions
lexnv Mar 4, 2024
4da5073
chainHead/subs/tests: Simplify block production
lexnv Mar 4, 2024
e51b932
chainHead: Add config for suspended subscriptions
lexnv Mar 4, 2024
9024461
chainHead: Configure the lagging distance
lexnv Mar 4, 2024
e182600
chainHead/tests: Check suspension and lagging distance
lexnv Mar 4, 2024
53b80e8
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Mar 15, 2024
0be419d
Update substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
lexnv Apr 2, 2024
7563d9a
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Apr 2, 2024
e9ae914
chainHead: Remove all active subscriptions instead of suspending time
lexnv Apr 2, 2024
e500f75
chainHead/tests: Adjust testing
lexnv Apr 2, 2024
ab5c218
Merge remote-tracking branch 'origin/lexnv/chainhead-edge-case-laggin…
lexnv Apr 2, 2024
5326ada
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Apr 3, 2024
0e276cc
chainHead: Refactor master with reserved subscriptions
lexnv Apr 3, 2024
f2cbb64
chainHead/tests: Adjust testing
lexnv Apr 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 39 additions & 4 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
///
/// Subscriptions are suspended for the `suspended_duration`.
pub suspend_on_lagging_distance: usize,
/// The amount of time for which the subscriptions are suspended.
///
/// Subscriptions are suspended when the distance between any leaf
/// and the finalized block is too large.
pub suspended_duration: Duration,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
Expand All @@ -86,12 +96,26 @@ const MAX_ONGOING_OPERATIONS: usize = 16;
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;

/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
///
/// Subscriptions are suspended for the `suspended_duration`.
const SUSPEND_ON_LAGGING_DISTANCE: usize = 128;

/// The amount of time for which the subscriptions are suspended.
///
/// Subscriptions are suspended when the distance between any leaf
/// and the finalized block is too large.
const SUSPENDED_DURATION: Duration = Duration::from_secs(30);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative for this would be to simply recalculate the:

leaves = blockchain.leaves()?;
if leaf - finalized > 128 { return Err(); }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there much of a computation cost for doing the above? Personally I think fewer constants is a good thing, but if it's expensive to compute then having a delay makes sense :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that makes sense, I was also wondering if the suspended duration is overkill for this.

We might have a slight increase in reading the blockchain leaves numbers if they are placed on the disk, however I think we can live with that, since this is only an edge-case of an edge-case.

Have removed that constant and we should allow subscriptions to resume normal work as soon as the distance is reasonable.


impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
suspended_duration: SUSPENDED_DURATION,
suspend_on_lagging_distance: SUSPEND_ON_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
}
}
Expand All @@ -110,6 +134,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
suspend_on_lagging_distance: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -130,9 +157,11 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
config.suspended_duration,
backend,
)),
operation_max_storage_items: config.operation_max_storage_items,
suspend_on_lagging_distance: config.suspend_on_lagging_distance,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -180,6 +209,7 @@ where
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let suspend_on_lagging_distance = self.suspend_on_lagging_distance;

let fut = async move {
let Ok(sink) = pending.accept().await else { return };
Expand All @@ -189,9 +219,9 @@ where
// Keep track of the subscription.
let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
// subscription ID; or subscriptions are suspended.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted or suspended", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
Expand All @@ -204,9 +234,14 @@ where
subscriptions.clone(),
with_runtime,
sub_id.clone(),
suspend_on_lagging_distance,
);
let result = chain_head_follow.generate_events(sink, sub_data).await;

chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are suspended", sub_id);
subscriptions.suspend_subscriptions();
}

subscriptions.remove_subscription(&sub_id);
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
Expand Down
71 changes: 62 additions & 9 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
SaturatedConversion, Saturating,
};
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};

/// The maximum number of finalized blocks provided by the
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;
Expand All @@ -67,6 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
suspend_on_lagging_distance: usize,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
Expand All @@ -77,8 +82,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
sub_handle: Arc<SubscriptionManagement<Block, BE>>,
with_runtime: bool,
sub_id: String,
suspend_on_lagging_distance: usize,
) -> Self {
Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
Self {
client,
backend,
sub_handle,
with_runtime,
sub_id,
best_block_cache: None,
suspend_on_lagging_distance,
}
}
}

Expand Down Expand Up @@ -186,6 +200,35 @@ where
}
}

/// Check the distance between the provided blocks does not exceed a
/// a reasonable range.
///
/// When the blocks are too far apart (potentially millions of blocks):
/// - Tree route is expensive to calculate.
/// - The RPC layer will not be able to generate the `NewBlock` events for all blocks.
///
/// This edge-case can happen for parachains where the relay chain syncs slower to
/// the head of the chain than the parachain node that is synced already.
fn distace_within_reason(
&self,
block: Block::Hash,
finalized: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
let Some(block_num) = self.client.number(block)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};
let Some(finalized_num) = self.client.number(finalized)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};

let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
if distance > self.suspend_on_lagging_distance {
return Err(SubscriptionManagementError::BlockDistanceTooLarge);
}

Ok(())
}

/// Get the in-memory blocks of the client, starting from the provided finalized hash.
///
/// The reported blocks are pinned by this function.
Expand All @@ -198,6 +241,13 @@ where
let mut pruned_forks = HashSet::new();
let mut finalized_block_descendants = Vec::new();
let mut unique_descendants = HashSet::new();

// Ensure all leaves are within a reasonable distance from the finalized block,
// before traversing the tree.
for leaf in &leaves {
self.distace_within_reason(*leaf, finalized)?;
}

for leaf in leaves {
let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;

Expand Down Expand Up @@ -542,7 +592,8 @@ where
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
{
let mut stream_item = stream.next();
Expand Down Expand Up @@ -571,7 +622,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};

Expand All @@ -586,7 +637,8 @@ where

let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
// No need to propagate this error further, the client disconnected.
return Ok(())
}
}

Expand All @@ -598,14 +650,15 @@ where
// or the `Stop` receiver was triggered.
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
Ok(())
}

/// Generate the block events for the `chainHead_follow` method.
pub async fn generate_events(
&mut self,
sink: SubscriptionSink,
sub_data: InsertedSubscriptionData<Block>,
) {
) -> Result<(), SubscriptionManagementError> {
// Register for the new block and finalized notifications.
let stream_import = self
.client
Expand Down Expand Up @@ -633,7 +686,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};

Expand All @@ -643,6 +696,6 @@ where
let stream = stream::once(futures::future::ready(initial)).chain(merged);

self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await;
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub enum SubscriptionManagementError {
/// The unpin method was called with duplicate hashes.
#[error("Duplicate hashes")]
DuplicateHashes,
/// The distance between the leaves and the current finalized block is too large.
#[error("Distance too large")]
BlockDistanceTooLarge,
/// Custom error.
#[error("Subscription error {0}")]
Custom(String),
Expand All @@ -57,6 +60,7 @@ impl PartialEq for SubscriptionManagementError {
(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
(Self::SubscriptionAbsent, Self::SubscriptionAbsent) |
(Self::DuplicateHashes, Self::DuplicateHashes) => true,
(Self::BlockDistanceTooLarge, Self::BlockDistanceTooLarge) => true,
(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
_ => false,
}
Expand Down
Loading
Loading