Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainHead/follow: Provide multiple block hashes to the initialized event #3445

Merged
merged 14 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};

/// The maximum number of finalized blocks provided by the
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;

use super::subscription::InsertedSubscriptionData;

Expand Down Expand Up @@ -95,6 +102,8 @@ struct InitialBlocks<Block: BlockT> {
///
/// It is a tuple of (block hash, parent hash).
finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
/// Hashes of the last finalized blocks
finalized_block_hashes: VecDeque<Block::Hash>,
/// Blocks that should not be reported as pruned by the `Finalized` event.
///
/// Substrate database will perform the pruning of height N at
Expand Down Expand Up @@ -178,13 +187,14 @@ where
}

/// Get the in-memory blocks of the client, starting from the provided finalized hash.
///
/// The reported blocks are pinned by this function.
fn get_init_blocks_with_forks(
&self,
startup_point: &StartupPoint<Block>,
finalized: Block::Hash,
) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let leaves = blockchain.leaves()?;
let finalized = startup_point.finalized_hash;
let mut pruned_forks = HashSet::new();
let mut finalized_block_descendants = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment related to this PR, but could we do some kind of sanity check of the distance between leaf and finalized block? This code assumes that they are close together, as they should be. However, on parachains for example, the finalization of para blocks is dependent on the relay chain.

If the embedded relay chain node syncs slower while the parachain node is already at the tip, between a leaf and the known finalized block could be millions of blocks (until relay chain has caught up).

This would lead to:

  1. The tree route being very costly
  2. Us delivering 1 million blocks via RPC

Maybe we should add some kind of safeguard against such situations.
Certainly an edge case however.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have create this PR to handle that edge-case: #3562 🙏

let mut unique_descendants = HashSet::new();
Expand All @@ -202,13 +212,40 @@ where

for pair in blocks.zip(parents) {
Copy link
Member

@niklasad1 niklasad1 Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but I don't understand the logic with zip here.

Parents = blocks.len() + 1

Then the zip will ignore the last item in blocks is that intended?

Example what I mean:

    let mut a = vec![1, 2];
    let mut tmp = vec![3, 4];
    
    let b = std::iter::once(99).chain(tmp);
    
    let res: Vec<(usize, usize)> = a.into_iter().zip(b).collect();
    assert_eq!(res, vec![(1, 99), (2, 3)]);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense! Indeed we are ignoring the last element of the parents, but that's on purpose since parents = [finalized, blocks[..]]

We are interested in grouping every block reported by blocks with its parent.

For example:

    let blocks = vec!["A", "B", "C"];
    let parents = vec!["gensiss", "A", "B", "C"];

    for pair in blocks.iter().zip(parents.iter()) {
        println!("block={} parent={}", pair.0, pair.1);
    }

This should produce the following result:

block=A parent=gensiss
block=B parent=A
block=C parent=B

Let me have a go at this to remove the zip to make the code easier to read 🙏

if unique_descendants.insert(pair) {
// The finalized block is pinned below.
self.sub_handle.pin_block(&self.sub_id, pair.0)?;
finalized_block_descendants.push(pair);
}
}
}
}

Ok(InitialBlocks { finalized_block_descendants, pruned_forks })
let mut current_block = finalized;
// The header of the finalized block must not be pruned.
let Some(header) = blockchain.header(current_block)? else {
return Err(SubscriptionManagementError::BlockHeaderAbsent);
};

// Report at most `MAX_FINALIZED_BLOCKS`. Note: The node might not have that many blocks.
let mut finalized_block_hashes = VecDeque::with_capacity(MAX_FINALIZED_BLOCKS);

// Pin the finalized block.
self.sub_handle.pin_block(&self.sub_id, current_block)?;
finalized_block_hashes.push_front(current_block);
current_block = *header.parent_hash();

for _ in 0..MAX_FINALIZDED_BLOCKS - 1 {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let Ok(Some(header)) = blockchain.header(current_block) else { break };
// Block cannot be reported if pinning fails.
if self.sub_handle.pin_block(&self.sub_id, current_block).is_err() {
break
};

finalized_block_hashes.push_front(current_block);
current_block = *header.parent_hash();
}

Ok(InitialBlocks { finalized_block_descendants, finalized_block_hashes, pruned_forks })
}

/// Generate the initial events reported by the RPC `follow` method.
Expand All @@ -220,18 +257,17 @@ where
startup_point: &StartupPoint<Block>,
) -> Result<(Vec<FollowEvent<Block::Hash>>, HashSet<Block::Hash>), SubscriptionManagementError>
{
let init = self.get_init_blocks_with_forks(startup_point)?;
let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;

// The initialized event is the first one sent.
let initial_blocks = init.finalized_block_descendants;
let finalized_block_hashes = init.finalized_block_hashes;

// The initialized event is the first one sent.
let finalized_block_hash = startup_point.finalized_hash;
self.sub_handle.pin_block(&self.sub_id, finalized_block_hash)?;

let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);

let initialized_event = FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_hashes: finalized_block_hashes.into(),
finalized_block_runtime,
with_runtime: self.with_runtime,
});
Expand All @@ -240,8 +276,6 @@ where

finalized_block_descendants.push(initialized_event);
for (child, parent) in initial_blocks.into_iter() {
self.sub_handle.pin_block(&self.sub_id, child)?;

let new_runtime = self.generate_runtime_event(child, Some(parent));

let event = FollowEvent::NewBlock(NewBlock {
Expand Down
16 changes: 8 additions & 8 deletions substrate/client/rpc-spec-v2/src/chain_head/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ impl From<ApiError> for RuntimeEvent {
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Initialized<Hash> {
/// The hash of the latest finalized block.
pub finalized_block_hash: Hash,
/// The hash of the lastest finalized blocks.
pub finalized_block_hashes: Vec<Hash>,
/// The runtime version of the finalized block.
///
/// # Note
Expand All @@ -135,12 +135,12 @@ impl<Hash: Serialize> Serialize for Initialized<Hash> {
{
if self.with_runtime {
let mut state = serializer.serialize_struct("Initialized", 2)?;
state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?;
state.serialize_field("finalizedBlockHashes", &self.finalized_block_hashes)?;
state.serialize_field("finalizedBlockRuntime", &self.finalized_block_runtime)?;
state.end()
} else {
let mut state = serializer.serialize_struct("Initialized", 1)?;
state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?;
state.serialize_field("finalizedBlockHashes", &self.finalized_block_hashes)?;
state.end()
}
}
Expand Down Expand Up @@ -348,13 +348,13 @@ mod tests {
fn follow_initialized_event_no_updates() {
// Runtime flag is false.
let event: FollowEvent<String> = FollowEvent::Initialized(Initialized {
finalized_block_hash: "0x1".into(),
finalized_block_hashes: vec!["0x1".into()],
finalized_block_runtime: None,
with_runtime: false,
});

let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"initialized","finalizedBlockHash":"0x1"}"#;
let exp = r#"{"event":"initialized","finalizedBlockHashes":["0x1"]}"#;
assert_eq!(ser, exp);

let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
Expand All @@ -373,15 +373,15 @@ mod tests {

let runtime_event = RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.into() });
let mut initialized = Initialized {
finalized_block_hash: "0x1".into(),
finalized_block_hashes: vec!["0x1".into()],
finalized_block_runtime: Some(runtime_event),
with_runtime: true,
};
let event: FollowEvent<String> = FollowEvent::Initialized(initialized.clone());

let ser = serde_json::to_string(&event).unwrap();
let exp = concat!(
r#"{"event":"initialized","finalizedBlockHash":"0x1","#,
r#"{"event":"initialized","finalizedBlockHashes":["0x1"],"#,
r#""finalizedBlockRuntime":{"type":"valid","spec":{"specName":"ABC","implName":"Impl","#,
r#""specVersion":1,"implVersion":0,"apis":{},"transactionVersion":0}}}"#,
);
Expand Down
22 changes: 15 additions & 7 deletions substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async fn follow_subscription_produces_blocks() {
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", finalized_hash),
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
Expand Down Expand Up @@ -255,7 +255,7 @@ async fn follow_with_runtime() {
Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.clone().into() }));
// Runtime must always be reported with the first event.
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", finalized_hash),
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime,
with_runtime: false,
});
Expand Down Expand Up @@ -1344,7 +1344,7 @@ async fn follow_generates_initial_blocks() {
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", finalized_hash),
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
Expand Down Expand Up @@ -1896,7 +1896,7 @@ async fn follow_prune_best_block() {
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", finalized_hash),
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
Expand Down Expand Up @@ -2081,6 +2081,7 @@ async fn follow_forks_pruned_block() {
// ^^^ finalized
// -> block 1 -> block 2_f -> block 3_f
//
let finalized_hash = client.info().finalized_hash;

let block_1 = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
Expand All @@ -2090,6 +2091,7 @@ async fn follow_forks_pruned_block() {
.build()
.unwrap()
.block;
let block_1_hash = block_1.header.hash();
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();

let block_2 = BlockBuilderBuilder::new(&*client)
Expand All @@ -2100,6 +2102,7 @@ async fn follow_forks_pruned_block() {
.build()
.unwrap()
.block;
let block_2_hash = block_2.header.hash();
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();

let block_3 = BlockBuilderBuilder::new(&*client)
Expand Down Expand Up @@ -2156,7 +2159,12 @@ async fn follow_forks_pruned_block() {
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", block_3_hash),
finalized_block_hashes: vec![
format!("{:?}", finalized_hash),
format!("{:?}", block_1_hash),
format!("{:?}", block_2_hash),
format!("{:?}", block_3_hash),
],
finalized_block_runtime: None,
with_runtime: false,
});
Expand Down Expand Up @@ -2310,7 +2318,7 @@ async fn follow_report_multiple_pruned_block() {
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", finalized_hash),
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
Expand Down Expand Up @@ -2632,7 +2640,7 @@ async fn follow_finalized_before_new_block() {
let finalized_hash = client.info().finalized_hash;
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", finalized_hash),
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
Expand Down
Loading