Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainHead: Ensure reasonable distance between leaf and finalized block #3562

Merged
merged 19 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f69de7e
chainHead: Ensure reasonable distance between leaf and finalized block
lexnv Mar 1, 2024
1afeaf8
chainHead: Introduce custom error for distance too large
lexnv Mar 1, 2024
6f02a08
chainHead: Temporarily suspend subscriptions
lexnv Mar 4, 2024
831f095
chainHead: Move suspending to subscription management
lexnv Mar 4, 2024
887d380
chainHead/subs/tests: Adjust testing to suspending changes
lexnv Mar 4, 2024
34a28ec
chainHead/subs/tests: Check subspended subscriptions
lexnv Mar 4, 2024
4da5073
chainHead/subs/tests: Simplify block production
lexnv Mar 4, 2024
e51b932
chainHead: Add config for suspended subscriptions
lexnv Mar 4, 2024
9024461
chainHead: Configure the lagging distance
lexnv Mar 4, 2024
e182600
chainHead/tests: Check suspension and lagging distance
lexnv Mar 4, 2024
53b80e8
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Mar 15, 2024
0be419d
Update substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
lexnv Apr 2, 2024
7563d9a
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Apr 2, 2024
e9ae914
chainHead: Remove all active subscriptions instead of suspending time
lexnv Apr 2, 2024
e500f75
chainHead/tests: Adjust testing
lexnv Apr 2, 2024
ab5c218
Merge remote-tracking branch 'origin/lexnv/chainhead-edge-case-laggin…
lexnv Apr 2, 2024
5326ada
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Apr 3, 2024
0e276cc
chainHead: Refactor master with reserved subscriptions
lexnv Apr 3, 2024
f2cbb64
chainHead/tests: Adjust testing
lexnv Apr 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 21 additions & 4 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
pub max_lagging_distance: usize,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
Expand All @@ -88,6 +91,10 @@ const MAX_ONGOING_OPERATIONS: usize = 16;
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;

/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
const MAX_LAGGING_DISTANCE: usize = 128;

/// The maximum number of `chainHead_follow` subscriptions per connection.
const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;

Expand All @@ -97,6 +104,7 @@ impl Default for ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
}
Expand All @@ -116,6 +124,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -140,6 +151,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend,
),
operation_max_storage_items: config.operation_max_storage_items,
max_lagging_distance: config.max_lagging_distance,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -187,6 +199,7 @@ where
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let max_lagging_distance = self.max_lagging_distance;

let fut = async move {
// Ensure the current connection ID has enough space to accept a new subscription.
Expand All @@ -207,8 +220,8 @@ where
let Some(sub_data) =
reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
// subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
Expand All @@ -222,9 +235,13 @@ where
subscriptions,
with_runtime,
sub_id.clone(),
max_lagging_distance,
);

chain_head_follow.generate_events(sink, sub_data).await;
let result = chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
reserved_subscription.stop_all_subscriptions();
}

debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
};
Expand Down
71 changes: 62 additions & 9 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
SaturatedConversion, Saturating,
};
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};

/// The maximum number of finalized blocks provided by the
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;
Expand All @@ -67,6 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
Expand All @@ -77,8 +82,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
sub_handle: SubscriptionManagement<Block, BE>,
with_runtime: bool,
sub_id: String,
max_lagging_distance: usize,
) -> Self {
Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
Self {
client,
backend,
sub_handle,
with_runtime,
sub_id,
best_block_cache: None,
max_lagging_distance,
}
}
}

Expand Down Expand Up @@ -186,6 +200,35 @@ where
}
}

/// Check the distance between the provided blocks does not exceed a
/// a reasonable range.
///
/// When the blocks are too far apart (potentially millions of blocks):
/// - Tree route is expensive to calculate.
/// - The RPC layer will not be able to generate the `NewBlock` events for all blocks.
///
/// This edge-case can happen for parachains where the relay chain syncs slower to
/// the head of the chain than the parachain node that is synced already.
fn distace_within_reason(
&self,
block: Block::Hash,
finalized: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
let Some(block_num) = self.client.number(block)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};
let Some(finalized_num) = self.client.number(finalized)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};

let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
if distance > self.max_lagging_distance {
return Err(SubscriptionManagementError::BlockDistanceTooLarge);
}

Ok(())
}

/// Get the in-memory blocks of the client, starting from the provided finalized hash.
///
/// The reported blocks are pinned by this function.
Expand All @@ -198,6 +241,13 @@ where
let mut pruned_forks = HashSet::new();
let mut finalized_block_descendants = Vec::new();
let mut unique_descendants = HashSet::new();

// Ensure all leaves are within a reasonable distance from the finalized block,
// before traversing the tree.
for leaf in &leaves {
self.distace_within_reason(*leaf, finalized)?;
}

for leaf in leaves {
let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;

Expand Down Expand Up @@ -542,7 +592,8 @@ where
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
{
let mut stream_item = stream.next();
Expand Down Expand Up @@ -576,7 +627,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};

Expand All @@ -591,7 +642,8 @@ where

let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
// No need to propagate this error further, the client disconnected.
return Ok(())
}
}

Expand All @@ -605,14 +657,15 @@ where
// - the client disconnected.
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
Ok(())
}

/// Generate the block events for the `chainHead_follow` method.
pub async fn generate_events(
&mut self,
sink: SubscriptionSink,
sub_data: InsertedSubscriptionData<Block>,
) {
) -> Result<(), SubscriptionManagementError> {
// Register for the new block and finalized notifications.
let stream_import = self
.client
Expand Down Expand Up @@ -640,7 +693,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};

Expand All @@ -650,6 +703,6 @@ where
let stream = stream::once(futures::future::ready(initial)).chain(merged);

self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await;
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub enum SubscriptionManagementError {
/// The unpin method was called with duplicate hashes.
#[error("Duplicate hashes")]
DuplicateHashes,
/// The distance between the leaves and the current finalized block is too large.
#[error("Distance too large")]
BlockDistanceTooLarge,
/// Custom error.
#[error("Subscription error {0}")]
Custom(String),
Expand All @@ -57,6 +60,7 @@ impl PartialEq for SubscriptionManagementError {
(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
(Self::SubscriptionAbsent, Self::SubscriptionAbsent) |
(Self::DuplicateHashes, Self::DuplicateHashes) => true,
(Self::BlockDistanceTooLarge, Self::BlockDistanceTooLarge) => true,
(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
_ => false,
}
Expand Down
Loading
Loading