,
{
- let mut finalized_heads = relay_chain.finalized_heads(para_id);
+ let mut finalized_heads = match relay_chain.finalized_heads(para_id).await {
+ Ok(finalized_heads_stream) => finalized_heads_stream,
+ Err(err) => {
+ tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
+ return
+ },
+ };
loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
@@ -165,7 +175,14 @@ async fn follow_new_best(
R: RelaychainClient,
B: Backend,
{
- let mut new_best_heads = relay_chain.new_best_heads(para_id).fuse();
+ let mut new_best_heads = match relay_chain.new_best_heads(para_id).await {
+ Ok(best_heads_stream) => best_heads_stream.fuse(),
+ Err(err) => {
+ tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
+ return
+ },
+ };
+
let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
@@ -368,6 +385,7 @@ where
}
}
+#[async_trait]
impl RelaychainClient for RCInterface
where
RCInterface: RelayChainInterface + Clone + 'static,
@@ -376,39 +394,53 @@ where
type HeadStream = Pin> + Send>>;
- fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream {
+ async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult {
let relay_chain = self.clone();
- self.import_notification_stream()
+ let new_best_notification_stream = self
+ .new_best_notification_stream()
+ .await?
.filter_map(move |n| {
- future::ready(if n.is_new_best {
- relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten()
- } else {
- None
- })
+ let relay_chain = relay_chain.clone();
+ async move {
+ relay_chain
+ .parachain_head_at(&BlockId::hash(n.hash()), para_id)
+ .await
+ .ok()
+ .flatten()
+ }
})
- .boxed()
+ .boxed();
+ Ok(new_best_notification_stream)
}
- fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream {
+ async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult {
let relay_chain = self.clone();
- self.finality_notification_stream()
+ let finality_notification_stream = self
+ .finality_notification_stream()
+ .await?
.filter_map(move |n| {
- future::ready(
- relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten(),
- )
+ let relay_chain = relay_chain.clone();
+ async move {
+ relay_chain
+ .parachain_head_at(&BlockId::hash(n.hash()), para_id)
+ .await
+ .ok()
+ .flatten()
+ }
})
- .boxed()
+ .boxed();
+ Ok(finality_notification_stream)
}
- fn parachain_head_at(
+ async fn parachain_head_at(
&self,
at: &BlockId,
para_id: ParaId,
- ) -> ClientResult>> {
+ ) -> RelayChainResult >> {
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
+ .await
.map(|s| s.map(|s| s.parent_head.0))
- .map_err(Into::into)
}
}
diff --git a/client/consensus/common/src/tests.rs b/client/consensus/common/src/tests.rs
index 4340b7b681..ceb60aa501 100644
--- a/client/consensus/common/src/tests.rs
+++ b/client/consensus/common/src/tests.rs
@@ -16,7 +16,9 @@
use crate::*;
+use async_trait::async_trait;
use codec::Encode;
+use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
@@ -26,7 +28,7 @@ use futures_timer::Delay;
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId};
use sc_client_api::UsageProvider;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
-use sp_blockchain::{Error as ClientError, Result as ClientResult};
+use sp_blockchain::Error as ClientError;
use sp_consensus::BlockOrigin;
use sp_runtime::generic::BlockId;
use std::{
@@ -66,12 +68,13 @@ impl Relaychain {
}
}
+#[async_trait]
impl crate::parachain_consensus::RelaychainClient for Relaychain {
type Error = ClientError;
type HeadStream = Box> + Send + Unpin>;
- fn new_best_heads(&self, _: ParaId) -> Self::HeadStream {
+ async fn new_best_heads(&self, _: ParaId) -> RelayChainResult {
let stream = self
.inner
.lock()
@@ -80,10 +83,10 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");
- Box::new(stream.map(|v| v.encode()))
+ Ok(Box::new(stream.map(|v| v.encode())))
}
- fn finalized_heads(&self, _: ParaId) -> Self::HeadStream {
+ async fn finalized_heads(&self, _: ParaId) -> RelayChainResult {
let stream = self
.inner
.lock()
@@ -92,10 +95,14 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");
- Box::new(stream.map(|v| v.encode()))
+ Ok(Box::new(stream.map(|v| v.encode())))
}
- fn parachain_head_at(&self, _: &BlockId, _: ParaId) -> ClientResult>> {
+ async fn parachain_head_at(
+ &self,
+ _: &BlockId,
+ _: ParaId,
+ ) -> RelayChainResult>> {
unimplemented!("Not required for tests")
}
}
diff --git a/client/consensus/relay-chain/src/lib.rs b/client/consensus/relay-chain/src/lib.rs
index 7ab3ef2861..69a92175da 100644
--- a/client/consensus/relay-chain/src/lib.rs
+++ b/client/consensus/relay-chain/src/lib.rs
@@ -176,7 +176,7 @@ where
.propose(
inherent_data,
Default::default(),
- //TODO: Fix this.
+ // TODO: Fix this.
Duration::from_millis(500),
// Set the block limit to 50% of the maximum PoV size.
//
diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs
index 79e4f7c1b7..2010803d38 100644
--- a/client/network/src/lib.rs
+++ b/client/network/src/lib.rs
@@ -38,11 +38,7 @@ use polkadot_primitives::v1::{
};
use codec::{Decode, DecodeAll, Encode};
-use futures::{
- channel::oneshot,
- future::{ready, FutureExt},
- Future,
-};
+use futures::{channel::oneshot, future::FutureExt, Future};
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
@@ -128,7 +124,7 @@ impl BlockAnnounceData {
/// Check the signature of the statement.
///
/// Returns an `Err(_)` if it failed.
- fn check_signature(
+ async fn check_signature(
self,
relay_chain_client: &RCInterface,
) -> Result
@@ -138,16 +134,16 @@ impl BlockAnnounceData {
let validator_index = self.statement.unchecked_validator_index();
let runtime_api_block_id = BlockId::Hash(self.relay_parent);
- let session_index = match relay_chain_client.session_index_for_child(&runtime_api_block_id)
- {
- Ok(r) => r,
- Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
- };
+ let session_index =
+ match relay_chain_client.session_index_for_child(&runtime_api_block_id).await {
+ Ok(r) => r,
+ Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
+ };
let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
// Check that the signer is a legit validator.
- let authorities = match relay_chain_client.validators(&runtime_api_block_id) {
+ let authorities = match relay_chain_client.validators(&runtime_api_block_id).await {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
@@ -222,6 +218,7 @@ impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
/// it. However, if the announcement is for a block below the tip the announcement is accepted
/// as it probably comes from a node that is currently syncing the chain.
+#[derive(Clone)]
pub struct BlockAnnounceValidator {
phantom: PhantomData,
relay_chain_interface: RCInterface,
@@ -247,13 +244,14 @@ where
RCInterface: RelayChainInterface + Clone,
{
/// Get the included block of the given parachain in the relay chain.
- fn included_block(
+ async fn included_block(
relay_chain_interface: &RCInterface,
block_id: &BlockId,
para_id: ParaId,
) -> Result {
let validation_data = relay_chain_interface
.persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut)
+ .await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
@@ -269,56 +267,59 @@ where
}
/// Get the backed block hash of the given parachain in the relay chain.
- fn backed_block_hash(
+ async fn backed_block_hash(
relay_chain_interface: &RCInterface,
block_id: &BlockId,
para_id: ParaId,
) -> Result, BoxedError> {
let candidate_receipt = relay_chain_interface
.candidate_pending_availability(block_id, para_id)
+ .await
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
Ok(candidate_receipt.map(|cr| cr.descriptor.para_head))
}
/// Handle a block announcement with empty data (no statement) attached to it.
- fn handle_empty_block_announce_data(
+ async fn handle_empty_block_announce_data(
&self,
header: Block::Header,
- ) -> impl Future> {
+ ) -> Result {
let relay_chain_interface = self.relay_chain_interface.clone();
let para_id = self.para_id;
- async move {
- // Check if block is equal or higher than best (this requires a justification)
- let relay_chain_best_hash = relay_chain_interface.best_block_hash();
- let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
- let block_number = header.number();
-
- let best_head =
- Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id)?;
- let known_best_number = best_head.number();
- let backed_block =
- || Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id);
-
- if best_head == header {
- tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
-
- Ok(Validation::Success { is_new_best: true })
- } else if Some(HeadData(header.encode()).hash()) == backed_block()? {
- tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
-
- Ok(Validation::Success { is_new_best: true })
- } else if block_number >= known_best_number {
- tracing::debug!(
+ // Check if block is equal or higher than best (this requires a justification)
+ let relay_chain_best_hash = relay_chain_interface
+ .best_block_hash()
+ .await
+ .map_err(|e| Box::new(e) as Box<_>)?;
+ let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
+ let block_number = header.number();
+
+ let best_head =
+ Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id).await?;
+ let known_best_number = best_head.number();
+ let backed_block = || async {
+ Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id).await
+ };
+
+ if best_head == header {
+ tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
+
+ Ok(Validation::Success { is_new_best: true })
+ } else if Some(HeadData(header.encode()).hash()) == backed_block().await? {
+ tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
+
+ Ok(Validation::Success { is_new_best: true })
+ } else if block_number >= known_best_number {
+ tracing::debug!(
target: LOG_TARGET,
"Validation failed because a justification is needed if the block at the top of the chain."
);
- Ok(Validation::Failure { disconnect: false })
- } else {
- Ok(Validation::Success { is_new_best: false })
- }
+ Ok(Validation::Failure { disconnect: false })
+ } else {
+ Ok(Validation::Success { is_new_best: false })
}
}
}
@@ -331,32 +332,40 @@ where
fn validate(
&mut self,
header: &Block::Header,
- mut data: &[u8],
+ data: &[u8],
) -> Pin> + Send>> {
- if self.relay_chain_interface.is_major_syncing() {
- return ready(Ok(Validation::Success { is_new_best: false })).boxed()
- }
+ let relay_chain_interface = self.relay_chain_interface.clone();
+ let mut data = data.to_vec();
+ let header = header.clone();
+ let header_encoded = header.encode();
+ let block_announce_validator = self.clone();
- if data.is_empty() {
- return self.handle_empty_block_announce_data(header.clone()).boxed()
- }
+ async move {
+ let relay_chain_is_syncing = relay_chain_interface
+ .is_major_syncing()
+ .await
+ .map_err(|e| {
+ tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e)
+ })
+ .unwrap_or(false);
- let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
- Ok(r) => r,
- Err(err) =>
- return async move {
- Err(Box::new(BlockAnnounceError(format!(
+ if relay_chain_is_syncing {
+ return Ok(Validation::Success { is_new_best: false })
+ }
+
+ if data.is_empty() {
+ return block_announce_validator.handle_empty_block_announce_data(header).await
+ }
+
+ let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
+ Ok(r) => r,
+ Err(err) =>
+ return Err(Box::new(BlockAnnounceError(format!(
"Can not decode the `BlockAnnounceData`: {:?}",
err
- ))) as Box<_>)
- }
- .boxed(),
- };
+ ))) as Box<_>),
+ };
- let relay_chain_interface = self.relay_chain_interface.clone();
- let header_encoded = header.encode();
-
- async move {
if let Err(e) = block_announce_data.validate(header_encoded) {
return Ok(e)
}
@@ -370,6 +379,7 @@ where
block_announce_data
.check_signature(&relay_chain_interface)
+ .await
.map_err(|e| Box::new(e) as Box<_>)
}
.boxed()
diff --git a/client/network/src/tests.rs b/client/network/src/tests.rs
index 34584edd69..bd52fc0b93 100644
--- a/client/network/src/tests.rs
+++ b/client/network/src/tests.rs
@@ -16,15 +16,15 @@
use super::*;
use async_trait::async_trait;
-use cumulus_relay_chain_interface::WaitError;
+use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus};
use cumulus_test_service::runtime::{Block, Hash, Header};
-use futures::{executor::block_on, poll, task::Poll, FutureExt, StreamExt};
+use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v1::{
- Block as PBlock, CandidateCommitments, CandidateDescriptor, CollatorPair,
- CommittedCandidateReceipt, Hash as PHash, HeadData, Id as ParaId, InboundDownwardMessage,
+ CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt,
+ Hash as PHash, HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage,
InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
SigningContext, ValidationCodeHash, ValidatorId,
};
@@ -77,53 +77,60 @@ impl DummyRelayChainInterface {
#[async_trait]
impl RelayChainInterface for DummyRelayChainInterface {
- fn validators(
+ async fn validators(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
- ) -> Result, sp_api::ApiError> {
+ ) -> RelayChainResult> {
Ok(self.data.lock().validators.clone())
}
- fn block_status(
+ async fn block_status(
&self,
block_id: cumulus_primitives_core::relay_chain::BlockId,
- ) -> Result {
- self.relay_backend.blockchain().status(block_id)
+ ) -> RelayChainResult {
+ self.relay_backend
+ .blockchain()
+ .status(block_id)
+ .map_err(RelayChainError::BlockchainError)
}
- fn best_block_hash(&self) -> PHash {
- self.relay_backend.blockchain().info().best_hash
+ async fn best_block_hash(&self) -> RelayChainResult {
+ Ok(self.relay_backend.blockchain().info().best_hash)
}
- fn retrieve_dmq_contents(&self, _: ParaId, _: PHash) -> Option> {
+ async fn retrieve_dmq_contents(
+ &self,
+ _: ParaId,
+ _: PHash,
+ ) -> RelayChainResult> {
unimplemented!("Not needed for test")
}
- fn retrieve_all_inbound_hrmp_channel_contents(
+ async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
_: ParaId,
_: PHash,
- ) -> Option>> {
- Some(BTreeMap::new())
+ ) -> RelayChainResult>> {
+ Ok(BTreeMap::new())
}
- fn persisted_validation_data(
+ async fn persisted_validation_data(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
_: OccupiedCoreAssumption,
- ) -> Result, sp_api::ApiError> {
+ ) -> RelayChainResult > {
Ok(Some(PersistedValidationData {
parent_head: HeadData(default_header().encode()),
..Default::default()
}))
}
- fn candidate_pending_availability(
+ async fn candidate_pending_availability(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
- ) -> Result , sp_api::ApiError> {
+ ) -> RelayChainResult > {
if self.data.lock().has_pending_availability {
Ok(Some(CommittedCandidateReceipt {
descriptor: CandidateDescriptor {
@@ -152,60 +159,58 @@ impl RelayChainInterface for DummyRelayChainInterface {
}
}
- fn session_index_for_child(
+ async fn session_index_for_child(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
- ) -> Result {
+ ) -> RelayChainResult {
Ok(0)
}
- fn import_notification_stream(&self) -> sc_client_api::ImportNotifications {
- self.relay_client.import_notification_stream()
- }
-
- fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications {
- self.relay_client.finality_notification_stream()
+ async fn import_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>> {
+ Ok(Box::pin(
+ self.relay_client
+ .import_notification_stream()
+ .map(|notification| notification.header),
+ ))
}
- fn storage_changes_notification_stream(
+ async fn finality_notification_stream(
&self,
- filter_keys: Option<&[sc_client_api::StorageKey]>,
- child_filter_keys: Option<
- &[(sc_client_api::StorageKey, Option>)],
- >,
- ) -> sc_client_api::blockchain::Result> {
- self.relay_client
- .storage_changes_notification_stream(filter_keys, child_filter_keys)
+ ) -> RelayChainResult + Send>>> {
+ Ok(Box::pin(
+ self.relay_client
+ .finality_notification_stream()
+ .map(|notification| notification.header),
+ ))
}
- fn is_major_syncing(&self) -> bool {
- false
+ async fn is_major_syncing(&self) -> RelayChainResult {
+ Ok(false)
}
- fn overseer_handle(&self) -> Option {
+ fn overseer_handle(&self) -> RelayChainResult> {
unimplemented!("Not needed for test")
}
- fn get_storage_by_key(
+ async fn get_storage_by_key(
&self,
_: &polkadot_service::BlockId,
_: &[u8],
- ) -> Result , sp_blockchain::Error> {
+ ) -> RelayChainResult > {
unimplemented!("Not needed for test")
}
- fn prove_read(
+ async fn prove_read(
&self,
_: &polkadot_service::BlockId,
_: &Vec>,
- ) -> Result, Box> {
+ ) -> RelayChainResult {
unimplemented!("Not needed for test")
}
- async fn wait_for_block(
- &self,
- hash: PHash,
- ) -> Result<(), cumulus_relay_chain_interface::WaitError> {
+ async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
let mut listener = match check_block_in_chain(
self.relay_backend.clone(),
self.relay_client.clone(),
@@ -219,16 +224,32 @@ impl RelayChainInterface for DummyRelayChainInterface {
loop {
futures::select! {
- _ = timeout => return Err(WaitError::Timeout(hash)),
+ _ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
- None => return Err(WaitError::ImportListenerClosed(hash)),
+ None => return Err(RelayChainError::ImportListenerClosed(hash)),
}
}
}
}
+
+ async fn new_best_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>> {
+ let notifications_stream =
+ self.relay_client
+ .import_notification_stream()
+ .filter_map(|notification| async move {
+ if notification.is_new_best {
+ Some(notification.header)
+ } else {
+ None
+ }
+ });
+ Ok(Box::pin(notifications_stream))
+ }
}
fn make_validator_and_api(
@@ -274,6 +295,7 @@ async fn make_gossip_message_and_header(
.unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
+ .await
.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
@@ -442,9 +464,9 @@ fn check_statement_is_correctly_signed() {
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
}
-#[test]
-fn check_statement_seconded() {
- let (mut validator, api) = make_validator_and_api();
+#[tokio::test]
+async fn check_statement_seconded() {
+ let (mut validator, relay_chain_interface) = make_validator_and_api();
let header = default_header();
let relay_parent = H256::from_low_u64_be(1);
@@ -455,7 +477,10 @@ fn check_statement_seconded() {
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
- let session_index = api.session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
+ let session_index = relay_chain_interface
+ .session_index_for_child(&BlockId::Hash(relay_parent))
+ .await
+ .unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let statement = Statement::Valid(Default::default());
diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs
index 4d3f67ea06..d5d1a19b1d 100644
--- a/client/pov-recovery/src/lib.rs
+++ b/client/pov-recovery/src/lib.rs
@@ -56,7 +56,7 @@ use polkadot_primitives::v1::{
};
use cumulus_primitives_core::ParachainBlockData;
-use cumulus_relay_chain_interface::RelayChainInterface;
+use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use codec::Decode;
use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
@@ -381,7 +381,14 @@ where
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
let pending_candidates =
- pending_candidates(self.relay_chain_interface.clone(), self.para_id).fuse();
+ match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await {
+ Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
+ Err(err) => {
+ tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
+ return
+ },
+ };
+
futures::pin_mut!(pending_candidates);
loop {
@@ -435,28 +442,41 @@ where
}
/// Returns a stream over pending candidates for the parachain corresponding to `para_id`.
-fn pending_candidates(
- relay_chain_client: impl RelayChainInterface,
+async fn pending_candidates(
+ relay_chain_client: impl RelayChainInterface + Clone,
para_id: ParaId,
-) -> impl Stream- {
- relay_chain_client.import_notification_stream().filter_map(move |n| {
- let res = relay_chain_client
- .candidate_pending_availability(&BlockId::hash(n.hash), para_id)
- .and_then(|pa| {
- relay_chain_client
- .session_index_for_child(&BlockId::hash(n.hash))
- .map(|v| pa.map(|pa| (pa, v)))
- })
- .map_err(|e| {
- tracing::error!(
- target: LOG_TARGET,
- error = ?e,
- "Failed fetch pending candidates.",
- )
- })
- .ok()
- .flatten();
-
- async move { res }
- })
+) -> RelayChainResult
> {
+ let import_notification_stream = relay_chain_client.import_notification_stream().await?;
+
+ let filtered_stream = import_notification_stream.filter_map(move |n| {
+ let client_for_closure = relay_chain_client.clone();
+ async move {
+ let block_id = BlockId::hash(n.hash());
+ let pending_availability_result = client_for_closure
+ .candidate_pending_availability(&block_id, para_id)
+ .await
+ .map_err(|e| {
+ tracing::error!(
+ target: LOG_TARGET,
+ error = ?e,
+ "Failed to fetch pending candidates.",
+ )
+ });
+ let session_index_result =
+ client_for_closure.session_index_for_child(&block_id).await.map_err(|e| {
+ tracing::error!(
+ target: LOG_TARGET,
+ error = ?e,
+ "Failed to fetch session index.",
+ )
+ });
+
+ if let Ok(Some(candidate)) = pending_availability_result {
+ session_index_result.map(|session_index| (candidate, session_index)).ok()
+ } else {
+ None
+ }
+ }
+ });
+ Ok(filtered_stream)
}
diff --git a/client/relay-chain-interface/Cargo.toml b/client/relay-chain-interface/Cargo.toml
index a962155ed1..b76ebcc313 100644
--- a/client/relay-chain-interface/Cargo.toml
+++ b/client/relay-chain-interface/Cargo.toml
@@ -15,7 +15,10 @@ sp-runtime = { git = "/~https://github.com/paritytech/substrate", branch = "master
sp-blockchain = { git = "/~https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "/~https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "/~https://github.com/paritytech/substrate", branch = "master" }
+sc-service = { git = "/~https://github.com/paritytech/substrate", branch = "master" }
+futures = "0.3.1"
parking_lot = "0.11.1"
derive_more = "0.99.2"
async-trait = "0.1.52"
+thiserror = "1.0.30"
diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs
index 185e9a6f0a..13b0551b38 100644
--- a/client/relay-chain-interface/src/lib.rs
+++ b/client/relay-chain-interface/src/lib.rs
@@ -14,136 +14,140 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see .
-use std::{collections::BTreeMap, sync::Arc};
+use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use cumulus_primitives_core::{
relay_chain::{
v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
- Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
+ BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use polkadot_overseer::Handle as OverseerHandle;
use sc_client_api::{blockchain::BlockStatus, StorageProof};
+use futures::Stream;
+
+use async_trait::async_trait;
use sp_api::ApiError;
use sp_state_machine::StorageValue;
-use async_trait::async_trait;
+pub type RelayChainResult = Result;
-#[derive(Debug, derive_more::Display)]
-pub enum WaitError {
- #[display(fmt = "Timeout while waiting for relay-chain block `{}` to be imported.", _0)]
- Timeout(PHash),
- #[display(
- fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
- _0
- )]
+#[derive(thiserror::Error, Debug)]
+pub enum RelayChainError {
+ #[error("Error occured while calling relay chain runtime: {0:?}")]
+ ApiError(#[from] ApiError),
+ #[error("Timeout while waiting for relay-chain block `{0}` to be imported.")]
+ WaitTimeout(PHash),
+ #[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")]
ImportListenerClosed(PHash),
- #[display(
- fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}",
- _0,
- _1
- )]
- BlockchainError(PHash, sp_blockchain::Error),
+ #[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1:?}")]
+ WaitBlockchainError(PHash, sp_blockchain::Error),
+ #[error("Blockchain returned an error: {0:?}")]
+ BlockchainError(#[from] sp_blockchain::Error),
+ #[error("State machine error occured: {0:?}")]
+ StateMachineError(Box),
+ #[error("Unspecified error occured: {0:?}")]
+ GenericError(String),
}
/// Trait that provides all necessary methods for interaction between collator and relay chain.
#[async_trait]
pub trait RelayChainInterface: Send + Sync {
/// Fetch a storage item by key.
- fn get_storage_by_key(
+ async fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
- ) -> Result, sp_blockchain::Error>;
+ ) -> RelayChainResult >;
/// Fetch a vector of current validators.
- fn validators(&self, block_id: &BlockId) -> Result, ApiError>;
+ async fn validators(&self, block_id: &BlockId) -> RelayChainResult>;
/// Get the status of a given block.
- fn block_status(&self, block_id: BlockId) -> Result;
+ async fn block_status(&self, block_id: BlockId) -> RelayChainResult;
/// Get the hash of the current best block.
- fn best_block_hash(&self) -> PHash;
+ async fn best_block_hash(&self) -> RelayChainResult;
/// Returns the whole contents of the downward message queue for the parachain we are collating
/// for.
///
/// Returns `None` in case of an error.
- fn retrieve_dmq_contents(
+ async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
- ) -> Option>;
+ ) -> RelayChainResult>;
/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
/// collating for.
///
/// Empty channels are also included.
- fn retrieve_all_inbound_hrmp_channel_contents(
+ async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
- ) -> Option>>;
+ ) -> RelayChainResult>>;
/// Yields the persisted validation data for the given `ParaId` along with an assumption that
/// should be used if the para currently occupies a core.
///
/// Returns `None` if either the para is not registered or the assumption is `Freed`
/// and the para already occupies a core.
- fn persisted_validation_data(
+ async fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
_: OccupiedCoreAssumption,
- ) -> Result, ApiError>;
+ ) -> RelayChainResult >;
/// Get the receipt of a candidate pending availability. This returns `Some` for any paras
/// assigned to occupied cores in `availability_cores` and `None` otherwise.
- fn candidate_pending_availability(
+ async fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
- ) -> Result , ApiError>;
+ ) -> RelayChainResult >;
/// Returns the session index expected at a child of the block.
- fn session_index_for_child(&self, block_id: &BlockId) -> Result;
+ async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult;
/// Get a stream of import block notifications.
- fn import_notification_stream(&self) -> sc_client_api::ImportNotifications;
+ async fn import_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>>;
+
+ /// Get a stream of new best block notifications.
+ async fn new_best_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>>;
/// Wait for a block with a given hash in the relay chain.
///
/// This method returns immediately on error or if the block is already
/// reported to be in chain. Otherwise, it waits for the block to arrive.
- async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError>;
+ async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()>;
/// Get a stream of finality notifications.
- fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications;
-
- /// Get a stream of storage change notifications.
- fn storage_changes_notification_stream(
+ async fn finality_notification_stream(
&self,
- filter_keys: Option<&[sc_client_api::StorageKey]>,
- child_filter_keys: Option<
- &[(sc_client_api::StorageKey, Option>)],
- >,
- ) -> sc_client_api::blockchain::Result>;
+ ) -> RelayChainResult + Send>>>;
/// Whether the synchronization service is undergoing major sync.
/// Returns true if so.
- fn is_major_syncing(&self) -> bool;
+ async fn is_major_syncing(&self) -> RelayChainResult;
/// Get a handle to the overseer.
- fn overseer_handle(&self) -> Option;
+ fn overseer_handle(&self) -> RelayChainResult>;
/// Generate a storage read proof.
- fn prove_read(
+ async fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec>,
- ) -> Result, Box>;
+ ) -> RelayChainResult;
}
#[async_trait]
@@ -151,98 +155,100 @@ impl RelayChainInterface for Arc
where
T: RelayChainInterface + ?Sized,
{
- fn retrieve_dmq_contents(
+ async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
- ) -> Option> {
- (**self).retrieve_dmq_contents(para_id, relay_parent)
+ ) -> RelayChainResult> {
+ (**self).retrieve_dmq_contents(para_id, relay_parent).await
}
- fn retrieve_all_inbound_hrmp_channel_contents(
+ async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
- ) -> Option>> {
- (**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
+ ) -> RelayChainResult>> {
+ (**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await
}
- fn persisted_validation_data(
+ async fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
- ) -> Result, ApiError> {
- (**self).persisted_validation_data(block_id, para_id, occupied_core_assumption)
+ ) -> RelayChainResult > {
+ (**self)
+ .persisted_validation_data(block_id, para_id, occupied_core_assumption)
+ .await
}
- fn candidate_pending_availability(
+ async fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
- ) -> Result , ApiError> {
- (**self).candidate_pending_availability(block_id, para_id)
+ ) -> RelayChainResult > {
+ (**self).candidate_pending_availability(block_id, para_id).await
}
- fn session_index_for_child(&self, block_id: &BlockId) -> Result {
- (**self).session_index_for_child(block_id)
+ async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult {
+ (**self).session_index_for_child(block_id).await
}
- fn validators(&self, block_id: &BlockId) -> Result, ApiError> {
- (**self).validators(block_id)
+ async fn validators(&self, block_id: &BlockId) -> RelayChainResult> {
+ (**self).validators(block_id).await
}
- fn import_notification_stream(&self) -> sc_client_api::ImportNotifications {
- (**self).import_notification_stream()
- }
-
- fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications {
- (**self).finality_notification_stream()
+ async fn import_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>> {
+ (**self).import_notification_stream().await
}
- fn storage_changes_notification_stream(
+ async fn finality_notification_stream(
&self,
- filter_keys: Option<&[sc_client_api::StorageKey]>,
- child_filter_keys: Option<
- &[(sc_client_api::StorageKey, Option>)],
- >,
- ) -> sc_client_api::blockchain::Result> {
- (**self).storage_changes_notification_stream(filter_keys, child_filter_keys)
+ ) -> RelayChainResult + Send>>> {
+ (**self).finality_notification_stream().await
}
- fn best_block_hash(&self) -> PHash {
- (**self).best_block_hash()
+ async fn best_block_hash(&self) -> RelayChainResult {
+ (**self).best_block_hash().await
}
- fn block_status(&self, block_id: BlockId) -> Result {
- (**self).block_status(block_id)
+ async fn block_status(&self, block_id: BlockId) -> RelayChainResult {
+ (**self).block_status(block_id).await
}
- fn is_major_syncing(&self) -> bool {
- (**self).is_major_syncing()
+ async fn is_major_syncing(&self) -> RelayChainResult {
+ (**self).is_major_syncing().await
}
- fn overseer_handle(&self) -> Option {
+ fn overseer_handle(&self) -> RelayChainResult> {
(**self).overseer_handle()
}
- fn get_storage_by_key(
+ async fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
- ) -> Result , sp_blockchain::Error> {
- (**self).get_storage_by_key(block_id, key)
+ ) -> RelayChainResult > {
+ (**self).get_storage_by_key(block_id, key).await
}
- fn prove_read(
+ async fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec>,
- ) -> Result, Box> {
- (**self).prove_read(block_id, relevant_keys)
+ ) -> RelayChainResult {
+ (**self).prove_read(block_id, relevant_keys).await
}
- async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
+ async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
(**self).wait_for_block(hash).await
}
+
+ async fn new_best_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>> {
+ (**self).new_best_notification_stream().await
+ }
}
diff --git a/client/relay-chain-local/src/lib.rs b/client/relay-chain-local/src/lib.rs
index 5177d1f4af..903a8ff3c6 100644
--- a/client/relay-chain-local/src/lib.rs
+++ b/client/relay-chain-local/src/lib.rs
@@ -14,19 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see .
-use std::{sync::Arc, time::Duration};
+use std::{pin::Pin, sync::Arc, time::Duration};
use async_trait::async_trait;
use cumulus_primitives_core::{
relay_chain::{
v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
v2::ParachainHost,
- Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
+ Block as PBlock, BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
-use cumulus_relay_chain_interface::{RelayChainInterface, WaitError};
-use futures::{FutureExt, StreamExt};
+use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
+use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use polkadot_client::{ClientHandle, ExecuteWithClient, FullBackend};
use polkadot_service::{
@@ -37,12 +37,11 @@ use sc_client_api::{
StorageProof, UsageProvider,
};
use sc_telemetry::TelemetryWorkerHandle;
-use sp_api::{ApiError, ProvideRuntimeApi};
+use sp_api::ProvideRuntimeApi;
use sp_consensus::SyncOracle;
use sp_core::{sp_std::collections::btree_map::BTreeMap, Pair};
use sp_state_machine::{Backend as StateBackend, StorageValue};
-const LOG_TARGET: &str = "relay-chain-local";
/// The timeout in seconds after that the waiting for a block should be aborted.
const TIMEOUT_IN_SECONDS: u64 = 6;
@@ -88,158 +87,117 @@ where
+ Send,
Client::Api: ParachainHost + BabeApi,
{
- fn retrieve_dmq_contents(
+ async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
- ) -> Option> {
- self.full_client
- .runtime_api()
- .dmq_contents_with_context(
- &BlockId::hash(relay_parent),
- sp_core::ExecutionContext::Importing,
- para_id,
- )
- .map_err(|e| {
- tracing::error!(
- target: LOG_TARGET,
- relay_parent = ?relay_parent,
- error = ?e,
- "An error occured during requesting the downward messages.",
- );
- })
- .ok()
+ ) -> RelayChainResult> {
+ Ok(self.full_client.runtime_api().dmq_contents_with_context(
+ &BlockId::hash(relay_parent),
+ sp_core::ExecutionContext::Importing,
+ para_id,
+ )?)
}
- fn retrieve_all_inbound_hrmp_channel_contents(
+ async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
- ) -> Option>> {
- self.full_client
- .runtime_api()
- .inbound_hrmp_channels_contents_with_context(
- &BlockId::hash(relay_parent),
- sp_core::ExecutionContext::Importing,
- para_id,
- )
- .map_err(|e| {
- tracing::error!(
- target: LOG_TARGET,
- relay_parent = ?relay_parent,
- error = ?e,
- "An error occured during requesting the inbound HRMP messages.",
- );
- })
- .ok()
+ ) -> RelayChainResult>> {
+ Ok(self.full_client.runtime_api().inbound_hrmp_channels_contents_with_context(
+ &BlockId::hash(relay_parent),
+ sp_core::ExecutionContext::Importing,
+ para_id,
+ )?)
}
- fn persisted_validation_data(
+ async fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
- ) -> Result, ApiError> {
- self.full_client.runtime_api().persisted_validation_data(
+ ) -> RelayChainResult > {
+ Ok(self.full_client.runtime_api().persisted_validation_data(
block_id,
para_id,
occupied_core_assumption,
- )
+ )?)
}
- fn candidate_pending_availability(
+ async fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
- ) -> Result , ApiError> {
- self.full_client.runtime_api().candidate_pending_availability(block_id, para_id)
- }
-
- fn session_index_for_child(&self, block_id: &BlockId) -> Result {
- self.full_client.runtime_api().session_index_for_child(block_id)
+ ) -> RelayChainResult> {
+ Ok(self
+ .full_client
+ .runtime_api()
+ .candidate_pending_availability(block_id, para_id)?)
}
- fn validators(&self, block_id: &BlockId) -> Result, ApiError> {
- self.full_client.runtime_api().validators(block_id)
+ async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult {
+ Ok(self.full_client.runtime_api().session_index_for_child(block_id)?)
}
- fn import_notification_stream(&self) -> sc_client_api::ImportNotifications {
- self.full_client.import_notification_stream()
+ async fn validators(&self, block_id: &BlockId) -> RelayChainResult> {
+ Ok(self.full_client.runtime_api().validators(block_id)?)
}
- fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications {
- self.full_client.finality_notification_stream()
+ async fn import_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>> {
+ let notification_stream = self
+ .full_client
+ .import_notification_stream()
+ .map(|notification| notification.header);
+ Ok(Box::pin(notification_stream))
}
- fn storage_changes_notification_stream(
+ async fn finality_notification_stream(
&self,
- filter_keys: Option<&[sc_client_api::StorageKey]>,
- child_filter_keys: Option<
- &[(sc_client_api::StorageKey, Option>)],
- >,
- ) -> sc_client_api::blockchain::Result> {
- self.full_client
- .storage_changes_notification_stream(filter_keys, child_filter_keys)
+ ) -> RelayChainResult + Send>>> {
+ let notification_stream = self
+ .full_client
+ .finality_notification_stream()
+ .map(|notification| notification.header);
+ Ok(Box::pin(notification_stream))
}
- fn best_block_hash(&self) -> PHash {
- self.backend.blockchain().info().best_hash
+ async fn best_block_hash(&self) -> RelayChainResult {
+ Ok(self.backend.blockchain().info().best_hash)
}
- fn block_status(&self, block_id: BlockId) -> Result {
- self.backend.blockchain().status(block_id)
+ async fn block_status(&self, block_id: BlockId) -> RelayChainResult {
+ Ok(self.backend.blockchain().status(block_id)?)
}
- fn is_major_syncing(&self) -> bool {
+ async fn is_major_syncing(&self) -> RelayChainResult {
let mut network = self.sync_oracle.lock();
- network.is_major_syncing()
+ Ok(network.is_major_syncing())
}
- fn overseer_handle(&self) -> Option {
- self.overseer_handle.clone()
+ fn overseer_handle(&self) -> RelayChainResult> {
+ Ok(self.overseer_handle.clone())
}
- fn get_storage_by_key(
+ async fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
- ) -> Result , sp_blockchain::Error> {
+ ) -> RelayChainResult > {
let state = self.backend.state_at(*block_id)?;
- state.storage(key).map_err(sp_blockchain::Error::Storage)
+ state.storage(key).map_err(RelayChainError::GenericError)
}
- fn prove_read(
+ async fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec>,
- ) -> Result, Box> {
- let state_backend = self
- .backend
- .state_at(*block_id)
- .map_err(|e| {
- tracing::error!(
- target: LOG_TARGET,
- relay_parent = ?block_id,
- error = ?e,
- "Cannot obtain the state of the relay chain.",
- );
- })
- .ok();
-
- match state_backend {
- Some(state) => sp_state_machine::prove_read(state, relevant_keys)
- .map_err(|e| {
- tracing::error!(
- target: LOG_TARGET,
- relay_parent = ?block_id,
- error = ?e,
- "Failed to collect required relay chain state storage proof.",
- );
- e
- })
- .map(Some),
- None => Ok(None),
- }
+ ) -> RelayChainResult {
+ let state_backend = self.backend.state_at(*block_id)?;
+
+ sp_state_machine::prove_read(state_backend, relevant_keys)
+ .map_err(RelayChainError::StateMachineError)
}
/// Wait for a given relay chain block in an async way.
@@ -259,7 +217,7 @@ where
///
/// The timeout is set to 6 seconds. This should be enough time to import the block in the current
/// round and if not, the new round of the relay chain already started anyway.
- async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
+ async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
let mut listener =
match check_block_in_chain(self.backend.clone(), self.full_client.clone(), hash)? {
BlockCheckStatus::InChain => return Ok(()),
@@ -270,16 +228,28 @@ where
loop {
futures::select! {
- _ = timeout => return Err(WaitError::Timeout(hash)),
+ _ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
- None => return Err(WaitError::ImportListenerClosed(hash)),
+ None => return Err(RelayChainError::ImportListenerClosed(hash)),
}
}
}
}
+
+ async fn new_best_notification_stream(
+ &self,
+ ) -> RelayChainResult + Send>>> {
+ let notifications_stream =
+ self.full_client
+ .import_notification_stream()
+ .filter_map(|notification| async move {
+ notification.is_new_best.then(|| notification.header)
+ });
+ Ok(Box::pin(notifications_stream))
+ }
}
pub enum BlockCheckStatus {
@@ -294,16 +264,15 @@ pub fn check_block_in_chain(
backend: Arc,
client: Arc,
hash: PHash,
-) -> Result
+) -> RelayChainResult
where
Client: BlockchainEvents,
{
let _lock = backend.get_import_lock().read();
let block_id = BlockId::Hash(hash);
- match backend.blockchain().status(block_id) {
- Ok(BlockStatus::InChain) => return Ok(BlockCheckStatus::InChain),
- Err(err) => return Err(WaitError::BlockchainError(hash, err)),
+ match backend.blockchain().status(block_id)? {
+ BlockStatus::InChain => return Ok(BlockCheckStatus::InChain),
_ => {},
}
@@ -495,7 +464,7 @@ mod tests {
assert!(matches!(
block_on(relay_chain_interface.wait_for_block(hash)),
- Err(WaitError::Timeout(_))
+ Err(RelayChainError::WaitTimeout(_))
));
}
diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs
index 5b050e75aa..08cd8584f2 100644
--- a/client/service/src/lib.rs
+++ b/client/service/src/lib.rs
@@ -107,10 +107,13 @@ where
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
+ let overseer_handle = relay_chain_interface
+ .overseer_handle()
+ .map_err(|e| sc_service::Error::Application(Box::new(e)))?
+ .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?;
+
let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
- relay_chain_interface
- .overseer_handle()
- .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
+ overseer_handle.clone(),
// We want that collators wait at maximum the relay chain slot duration before starting
// to recover blocks.
cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration },
@@ -128,9 +131,7 @@ where
runtime_api: client.clone(),
block_status,
announce_block,
- overseer_handle: relay_chain_interface
- .overseer_handle()
- .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
+ overseer_handle,
spawner,
para_id,
key: collator_key,
@@ -192,10 +193,13 @@ where
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
+ let overseer_handle = relay_chain_interface
+ .overseer_handle()
+ .map_err(|e| sc_service::Error::Application(Box::new(e)))?
+ .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?;
+
let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
- relay_chain_interface
- .overseer_handle()
- .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
+ overseer_handle,
// Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and
// in maximum 5 minutes before starting to recover blocks. Collators should already start
// the recovery way before full nodes try to recover a certain block and then share the
diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs
index b30d077707..7c261e8583 100644
--- a/pallets/parachain-system/src/lib.rs
+++ b/pallets/parachain-system/src/lib.rs
@@ -605,7 +605,7 @@ pub mod pallet {
#[pallet::genesis_build]
impl GenesisBuild for GenesisConfig {
fn build(&self) {
- //TODO: Remove after /~https://github.com/paritytech/cumulus/issues/479
+ // TODO: Remove after /~https://github.com/paritytech/cumulus/issues/479
sp_io::storage::set(b":c", &[]);
}
}
diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs
index d0c0826e32..400daeaa67 100644
--- a/parachain-template/node/src/service.rs
+++ b/parachain-template/node/src/service.rs
@@ -436,14 +436,15 @@ pub async fn start_parachain_node(
BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
- let parachain_inherent =
+ let relay_chain_interface = relay_chain_interface.clone();
+ async move {
+ let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
- );
- async move {
+ ).await;
let time = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
diff --git a/polkadot-parachains/src/service.rs b/polkadot-parachains/src/service.rs
index 9f81b9af0d..a67b20c11e 100644
--- a/polkadot-parachains/src/service.rs
+++ b/polkadot-parachains/src/service.rs
@@ -738,14 +738,15 @@ pub async fn start_rococo_parachain_node(
>(BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
+ let relay_chain_interface = relay_chain_interface.clone();
+ async move {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
- );
- async move {
+ ).await;
let time = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
@@ -875,14 +876,15 @@ where
block_import: client.clone(),
relay_chain_interface: relay_chain_interface.clone(),
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
- let parachain_inherent =
+ let relay_chain_interface = relay_chain_interface.clone();
+ async move {
+ let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
- );
- async move {
+ ).await;
let parachain_inherent = parachain_inherent.ok_or_else(|| {
Box::::from(
"Failed to create parachain inherent",
@@ -1157,14 +1159,15 @@ where
proposer_factory,
create_inherent_data_providers:
move |_, (relay_parent, validation_data)| {
- let parachain_inherent =
+ let relay_chain_for_aura = relay_chain_for_aura.clone();
+ async move {
+ let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_for_aura,
&validation_data,
id,
- );
- async move {
+ ).await;
let time =
sp_timestamp::InherentDataProvider::from_system_time();
@@ -1216,14 +1219,15 @@ where
relay_chain_interface: relay_chain_interface.clone(),
create_inherent_data_providers:
move |_, (relay_parent, validation_data)| {
- let parachain_inherent =
+ let relay_chain_interface = relay_chain_interface.clone();
+ async move {
+ let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
- );
- async move {
+ ).await;
let parachain_inherent =
parachain_inherent.ok_or_else(|| {
Box::::from(
diff --git a/primitives/parachain-inherent/src/client_side.rs b/primitives/parachain-inherent/src/client_side.rs
index dab368dc6c..b14c225765 100644
--- a/primitives/parachain-inherent/src/client_side.rs
+++ b/primitives/parachain-inherent/src/client_side.rs
@@ -29,7 +29,7 @@ const LOG_TARGET: &str = "parachain-inherent";
/// Collect the relevant relay chain state in form of a proof for putting it into the validation
/// data inherent.
-fn collect_relay_storage_proof(
+async fn collect_relay_storage_proof(
relay_chain_interface: &impl RelayChainInterface,
para_id: ParaId,
relay_parent: PHash,
@@ -42,6 +42,7 @@ fn collect_relay_storage_proof(
&relay_parent_block_id,
&relay_well_known_keys::hrmp_ingress_channel_index(para_id),
)
+ .await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
@@ -70,6 +71,7 @@ fn collect_relay_storage_proof(
&relay_parent_block_id,
&relay_well_known_keys::hrmp_egress_channel_index(para_id),
)
+ .await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
@@ -108,26 +110,57 @@ fn collect_relay_storage_proof(
relay_well_known_keys::hrmp_channels(HrmpChannelId { sender: para_id, recipient })
}));
- relay_chain_interface.prove_read(&relay_parent_block_id, &relevant_keys).ok()?
+ relay_chain_interface
+ .prove_read(&relay_parent_block_id, &relevant_keys)
+ .await
+ .map_err(|e| {
+ tracing::error!(
+ target: LOG_TARGET,
+ relay_parent = ?relay_parent_block_id,
+ error = ?e,
+ "Cannot obtain read proof from relay chain.",
+ );
+ })
+ .ok()
}
impl ParachainInherentData {
/// Create the [`ParachainInherentData`] at the given `relay_parent`.
///
/// Returns `None` if the creation failed.
- pub fn create_at(
+ pub async fn create_at(
relay_parent: PHash,
relay_chain_interface: &impl RelayChainInterface,
validation_data: &PersistedValidationData,
para_id: ParaId,
) -> Option {
let relay_chain_state =
- collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent)?;
-
- let downward_messages =
- relay_chain_interface.retrieve_dmq_contents(para_id, relay_parent)?;
+ collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent).await?;
+
+ let downward_messages = relay_chain_interface
+ .retrieve_dmq_contents(para_id, relay_parent)
+ .await
+ .map_err(|e| {
+ tracing::error!(
+ target: LOG_TARGET,
+ relay_parent = ?relay_parent,
+ error = ?e,
+ "An error occured during requesting the downward messages.",
+ );
+ })
+ .ok()?;
let horizontal_messages = relay_chain_interface
- .retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)?;
+ .retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
+ .await
+ .map_err(|e| {
+ tracing::error!(
+ target: LOG_TARGET,
+ relay_parent = ?relay_parent,
+ error = ?e,
+ "An error occured during requesting the inbound HRMP messages.",
+ );
+ })
+ .ok()?;
Some(ParachainInherentData {
downward_messages,
diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs
index 319076be30..750cb7d881 100644
--- a/test/service/src/lib.rs
+++ b/test/service/src/lib.rs
@@ -31,9 +31,9 @@ use cumulus_client_service::{
use cumulus_primitives_core::ParaId;
use cumulus_relay_chain_local::RelayChainLocal;
use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
+use parking_lot::Mutex;
use frame_system_rpc_runtime_api::AccountNonceApi;
-use parking_lot::Mutex;
use polkadot_primitives::v1::{CollatorPair, Hash as PHash, PersistedValidationData};
use polkadot_service::ProvideRuntimeApi;
use sc_client_api::execution_extensions::ExecutionStrategies;
@@ -288,15 +288,16 @@ where
para_id,
proposer_factory,
move |_, (relay_parent, validation_data)| {
- let parachain_inherent =
+ let relay_chain_interface = relay_chain_interface_for_closure.clone();
+ async move {
+ let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
- &relay_chain_interface_for_closure,
+ &relay_chain_interface,
&validation_data,
para_id,
- );
+ ).await;
- async move {
let time = sp_timestamp::InherentDataProvider::from_system_time();
let parachain_inherent = parachain_inherent.ok_or_else(|| {