Skip to content

Commit

Permalink
polkadot-node-subsystems: ChainApiBackend added + polkadot-debug
Browse files Browse the repository at this point in the history
…image version fixed (#2411)

The out-dated version (bad tag) of [polkadot
image](/~https://github.com/paritytech/polkadot-sdk/blob/ede4a362622dfa69eb167eaa876246b1289f4b41/.gitlab/pipeline/zombienet/cumulus.yml#L31)
([docker
info](https://hub.docker.com/layers/paritypr/polkadot-debug/master/images/sha256:adb1658052cf671b50c90d5cece5c7a131efa1a95978249bd5cb85a5ad654f7a?context=explore))
was used. This PR fixes this.

See also:
#2411 (comment)

Also adds an abstraction that allows asynchronous backends to be passed to `ChainApiSubsystem`
---------

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
michalkucharczyk and skunert authored Nov 22, 2023
1 parent 0956357 commit 4987488
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .gitlab/pipeline/zombienet/cumulus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
- job: build-push-image-test-parachain
artifacts: true
variables:
POLKADOT_IMAGE: "docker.io/paritypr/polkadot-debug:master"
POLKADOT_IMAGE: "docker.io/paritypr/polkadot-debug:${DOCKER_IMAGES_VERSION}"
GH_DIR: "/~https://github.com/paritytech/cumulus/tree/${CI_COMMIT_SHORT_SHA}/zombienet/tests"
LOCAL_DIR: "/builds/parity/mirrors/polkadot-sdk/cumulus/zombienet/tests"
COL_IMAGE: "docker.io/paritypr/test-parachain:${DOCKER_IMAGES_VERSION}"
Expand Down
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cumulus/client/relay-chain-minimal-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ polkadot-collator-protocol = { path = "../../../polkadot/node/network/collator-p
polkadot-network-bridge = { path = "../../../polkadot/node/network/bridge" }
polkadot-node-collation-generation = { path = "../../../polkadot/node/collation-generation" }
polkadot-node-core-runtime-api = { path = "../../../polkadot/node/core/runtime-api" }
polkadot-node-core-chain-api = { path = "../../../polkadot/node/core/chain-api" }
polkadot-node-core-prospective-parachains = { path = "../../../polkadot/node/core/prospective-parachains" }

# substrate deps
sc-authority-discovery = { path = "../../../substrate/client/authority-discovery" }
sc-network = { path = "../../../substrate/client/network" }
sc-network-common = { path = "../../../substrate/client/network/common" }
sc-service = { path = "../../../substrate/client/service" }
sc-client-api = { path = "../../../substrate/client/api" }
substrate-prometheus-endpoint = { path = "../../../substrate/utils/prometheus" }
sc-tracing = { path = "../../../substrate/client/tracing" }
sc-utils = { path = "../../../substrate/client/utils" }
sp-api = { path = "../../../substrate/primitives/api" }
sp-consensus-babe = { path = "../../../substrate/primitives/consensus/babe" }
sp-consensus = { path = "../../../substrate/primitives/consensus/common" }
sp-runtime = { path = "../../../substrate/primitives/runtime" }
sp-blockchain = { path = "../../../substrate/primitives/blockchain" }
tokio = { version = "1.32.0", features = ["macros"] }

# cumulus deps
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_relay_chain_rpc_interface::RelayChainRpcClient;
use futures::{Stream, StreamExt};
use polkadot_core_primitives::{Block, BlockNumber, Hash, Header};
use polkadot_overseer::RuntimeApiSubsystemClient;
use polkadot_overseer::{ChainApiBackend, RuntimeApiSubsystemClient};
use polkadot_primitives::{
async_backing::{AsyncBackingParams, BackingState},
slashing,
vstaging::NodeFeatures,
};
use sc_authority_discovery::{AuthorityDiscovery, Error as AuthorityDiscoveryError};
use sp_api::{ApiError, RuntimeApiInfo};
use sc_client_api::AuxStore;
use sp_api::{ApiError, BlockT, HeaderT, NumberFor, RuntimeApiInfo};
use sp_blockchain::Info;

#[derive(Clone)]
pub struct BlockChainRpcClient {
Expand All @@ -54,6 +56,64 @@ impl BlockChainRpcClient {
}
}

#[async_trait::async_trait]
impl ChainApiBackend for BlockChainRpcClient {
async fn header(
&self,
hash: <Block as BlockT>::Hash,
) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
Ok(self.rpc_client.chain_get_header(Some(hash)).await?)
}

async fn info(&self) -> sp_blockchain::Result<Info<Block>> {
let (best_header_opt, genesis_hash, finalized_head) = futures::try_join!(
self.rpc_client.chain_get_header(None),
self.rpc_client.chain_get_head(Some(0)),
self.rpc_client.chain_get_finalized_head()
)?;
let best_header = best_header_opt.ok_or_else(|| {
RelayChainError::GenericError(
"Unable to retrieve best header from relay chain.".to_string(),
)
})?;

let finalized_header =
self.rpc_client.chain_get_header(Some(finalized_head)).await?.ok_or_else(|| {
RelayChainError::GenericError(
"Unable to retrieve finalized header from relay chain.".to_string(),
)
})?;
Ok(Info {
best_hash: best_header.hash(),
best_number: best_header.number,
genesis_hash,
finalized_hash: finalized_head,
finalized_number: finalized_header.number,
finalized_state: Some((finalized_header.hash(), finalized_header.number)),
number_leaves: 1,
block_gap: None,
})
}

async fn number(
&self,
hash: <Block as BlockT>::Hash,
) -> sp_blockchain::Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
Ok(self
.rpc_client
.chain_get_header(Some(hash))
.await?
.map(|maybe_header| maybe_header.number))
}

async fn hash(
&self,
number: NumberFor<Block>,
) -> sp_blockchain::Result<Option<<Block as BlockT>::Hash>> {
Ok(self.rpc_client.chain_get_block_hash(number.into()).await?)
}
}

#[async_trait::async_trait]
impl RuntimeApiSubsystemClient for BlockChainRpcClient {
async fn validators(
Expand Down Expand Up @@ -403,3 +463,25 @@ impl BlockChainRpcClient {
Ok(self.rpc_client.get_finalized_heads_stream()?.boxed())
}
}

// Implementation required by ChainApiSubsystem
// but never called in our case.
impl AuxStore for BlockChainRpcClient {
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
D: IntoIterator<Item = &'a &'b [u8]>,
>(
&self,
_insert: I,
_delete: D,
) -> sp_blockchain::Result<()> {
unimplemented!("Not supported on the RPC collator")
}

fn get_aux(&self, _key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
unimplemented!("Not supported on the RPC collator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use polkadot_network_bridge::{
NetworkBridgeTx as NetworkBridgeTxSubsystem,
};
use polkadot_node_collation_generation::CollationGenerationSubsystem;
use polkadot_node_core_chain_api::ChainApiSubsystem;
use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_node_network_protocol::{
Expand Down Expand Up @@ -112,7 +113,7 @@ fn build_overseer(
.candidate_backing(DummySubsystem)
.candidate_validation(DummySubsystem)
.pvf_checker(DummySubsystem)
.chain_api(DummySubsystem)
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
.collator_protocol({
let side = ProtocolSide::Collator {
Expand Down
5 changes: 3 additions & 2 deletions polkadot/node/core/chain-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ description = "The Chain API subsystem provides access to chain related utility
[dependencies]
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../gum" }
sp-blockchain = { path = "../../../../substrate/primitives/blockchain" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-types = { path = "../../subsystem-types" }
sc-client-api = { path = "../../../../substrate/client/api" }
sc-consensus-babe = { path = "../../../../substrate/client/consensus/babe" }

Expand All @@ -21,5 +20,7 @@ futures = { version = "0.3.21", features = ["thread-pool"] }
maplit = "1.0.2"
parity-scale-codec = "3.6.1"
polkadot-node-primitives = { path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
sp-core = { path = "../../../../substrate/primitives/core" }
sp-blockchain = { path = "../../../../substrate/primitives/blockchain" }
79 changes: 43 additions & 36 deletions polkadot/node/core/chain-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ use std::sync::Arc;

use futures::prelude::*;
use sc_client_api::AuxStore;
use sp_blockchain::HeaderBackend;

use futures::stream::StreamExt;
use polkadot_node_subsystem::{
messages::ChainApiMessage, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem,
SubsystemError, SubsystemResult,
};
use polkadot_primitives::Block;
use polkadot_node_subsystem_types::ChainApiBackend;

mod metrics;
use self::metrics::Metrics;
Expand All @@ -67,7 +67,7 @@ impl<Client> ChainApiSubsystem<Client> {
#[overseer::subsystem(ChainApi, error = SubsystemError, prefix = self::overseer)]
impl<Client, Context> ChainApiSubsystem<Client>
where
Client: HeaderBackend<Block> + AuxStore + 'static,
Client: ChainApiBackend + AuxStore + 'static,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run::<Client, Context>(ctx, self)
Expand All @@ -83,7 +83,7 @@ async fn run<Client, Context>(
subsystem: ChainApiSubsystem<Client>,
) -> SubsystemResult<()>
where
Client: HeaderBackend<Block> + AuxStore,
Client: ChainApiBackend + AuxStore,
{
loop {
match ctx.recv().await? {
Expand All @@ -93,13 +93,15 @@ where
FromOrchestra::Communication { msg } => match msg {
ChainApiMessage::BlockNumber(hash, response_channel) => {
let _timer = subsystem.metrics.time_block_number();
let result = subsystem.client.number(hash).map_err(|e| e.to_string().into());
let result =
subsystem.client.number(hash).await.map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::BlockHeader(hash, response_channel) => {
let _timer = subsystem.metrics.time_block_header();
let result = subsystem.client.header(hash).map_err(|e| e.to_string().into());
let result =
subsystem.client.header(hash).await.map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
Expand All @@ -113,46 +115,51 @@ where
ChainApiMessage::FinalizedBlockHash(number, response_channel) => {
let _timer = subsystem.metrics.time_finalized_block_hash();
// Note: we don't verify it's finalized
let result = subsystem.client.hash(number).map_err(|e| e.to_string().into());
let result =
subsystem.client.hash(number).await.map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::FinalizedBlockNumber(response_channel) => {
let _timer = subsystem.metrics.time_finalized_block_number();
let result = subsystem.client.info().finalized_number;
// always succeeds
subsystem.metrics.on_request(true);
let _ = response_channel.send(Ok(result));
let result = subsystem
.client
.info()
.await
.map_err(|e| e.to_string().into())
.map(|info| info.finalized_number);
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::Ancestors { hash, k, response_channel } => {
let _timer = subsystem.metrics.time_ancestors();
gum::trace!(target: LOG_TARGET, hash=%hash, k=k, "ChainApiMessage::Ancestors");

let mut hash = hash;

let next_parent = core::iter::from_fn(|| {
let maybe_header = subsystem.client.header(hash);
match maybe_header {
// propagate the error
Err(e) => {
let e = e.to_string().into();
Some(Err(e))
},
// fewer than `k` ancestors are available
Ok(None) => None,
Ok(Some(header)) => {
// stop at the genesis header.
if header.number == 0 {
None
} else {
hash = header.parent_hash;
Some(Ok(hash))
}
},
}
});

let result = next_parent.take(k).collect::<Result<Vec<_>, _>>();
let next_parent_stream = futures::stream::unfold(
(hash, subsystem.client.clone()),
|(hash, client)| async move {
let maybe_header = client.header(hash).await;
match maybe_header {
// propagate the error
Err(e) => {
let e = e.to_string().into();
Some((Err(e), (hash, client)))
},
// fewer than `k` ancestors are available
Ok(None) => None,
Ok(Some(header)) => {
// stop at the genesis header.
if header.number == 0 {
None
} else {
Some((Ok(header.parent_hash), (header.parent_hash, client)))
}
},
}
},
);

let result = next_parent_stream.take(k).try_collect().await;
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
Expand Down
Loading

0 comments on commit 4987488

Please sign in to comment.