From 15244e53edc2ba90ccfc472e12ba69cf5a00b86a Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 16 Dec 2022 15:04:36 +0300 Subject: [PATCH] Batch transactions in complex relays (#1669) * batch transactions in message relay: API prototype * get rid of Box and actually submit it * test batch transactions * message_lane_loop_works_with_batch_transactions * removed logger * BatchConfirmationTransaction + BatchDeliveryTransaction * more prototyping * fmt * continue with batch calls * impl BatchCallBuilder for () * BatchDeliveryTransaction impl * BundledBatchCallBuilder * proper impl of BundledBatchCallBuilder + use it in RialtoParachain -> Millau * impl prove_header in OnDemandHeadersRelay * impl OnDemandParachainsRelay::prove_header (needs extensive tests) * added a couple of TODOs * return Result> when asking for more headers * prove headers when reauire_* is called && return proper headers from required_header_id * split parachains::prove_header and test select_headers_to_prove * more traces and leave TODOs * use finality stream in SubstrateFinalitySource::prove_block_finality * prove parachain head at block, selected by headers relay * const ANCIENT_BLOCK_THRESHOLD * TODO -> proof * clippy and spelling * BatchCallBuilder::build_batch_call() returns Result * read first proof from two streams * FailedToFindFinalityProof -> FinalityProofNotFound * changed select_headers_to_prove to version from PR review --- Cargo.lock | 2 + ...ub_rococo_messages_to_bridge_hub_wococo.rs | 3 + ...ub_wococo_messages_to_bridge_hub_rococo.rs | 3 + .../src/chains/millau_messages_to_rialto.rs | 3 + .../millau_messages_to_rialto_parachain.rs | 3 + .../src/chains/rialto_messages_to_millau.rs | 3 + .../rialto_parachain_messages_to_millau.rs | 12 +- .../src/cli/relay_headers_and_messages/mod.rs | 10 +- .../parachain_to_parachain.rs | 17 +- .../relay_to_parachain.rs | 13 +- .../relay_to_relay.rs | 9 +- relays/client-substrate/Cargo.toml | 1 + relays/client-substrate/src/client.rs | 12 + relays/client-substrate/src/error.rs | 7 + relays/client-substrate/src/lib.rs | 5 +- relays/lib-substrate-relay/Cargo.toml | 1 + .../src/finality/source.rs | 151 +++++++- relays/lib-substrate-relay/src/lib.rs | 52 +++ .../lib-substrate-relay/src/messages_lane.rs | 15 +- .../src/messages_source.rs | 100 +++++- .../src/messages_target.rs | 123 ++++++- .../src/on_demand/headers.rs | 53 ++- .../lib-substrate-relay/src/on_demand/mod.rs | 14 +- .../src/on_demand/parachains.rs | 332 +++++++++++++++++- relays/messages/src/message_lane_loop.rs | 261 ++++++++++++-- relays/messages/src/message_race_delivery.rs | 6 +- relays/messages/src/message_race_loop.rs | 123 ++++++- relays/messages/src/message_race_receiving.rs | 6 +- relays/utils/src/lib.rs | 2 +- 29 files changed, 1191 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5eff1301f6..bcf1295023 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8580,6 +8580,7 @@ dependencies = [ "async-trait", "bp-header-chain", "bp-messages", + "bp-polkadot-core", "bp-runtime", "finality-relay", "frame-support", @@ -11692,6 +11693,7 @@ dependencies = [ "pallet-bridge-messages", "pallet-bridge-parachains", "pallet-transaction-payment", + "pallet-utility", "parachains-relay", "parity-scale-codec", "relay-rialto-client", diff --git a/relays/bin-substrate/src/chains/bridge_hub_rococo_messages_to_bridge_hub_wococo.rs b/relays/bin-substrate/src/chains/bridge_hub_rococo_messages_to_bridge_hub_wococo.rs index 6f9f05e663..339be92063 100644 --- a/relays/bin-substrate/src/chains/bridge_hub_rococo_messages_to_bridge_hub_wococo.rs +++ b/relays/bin-substrate/src/chains/bridge_hub_rococo_messages_to_bridge_hub_wococo.rs @@ -59,4 +59,7 @@ impl SubstrateMessageLane for BridgeHubRococoMessagesToBridgeHubWococoMessageLan BridgeHubRococoMessagesToBridgeHubWococoMessageLaneReceiveMessagesProofCallBuilder; type ReceiveMessagesDeliveryProofCallBuilder = BridgeHubRococoMessagesToBridgeHubWococoMessageLaneReceiveMessagesDeliveryProofCallBuilder; + + type SourceBatchCallBuilder = (); + type TargetBatchCallBuilder = (); } diff --git a/relays/bin-substrate/src/chains/bridge_hub_wococo_messages_to_bridge_hub_rococo.rs b/relays/bin-substrate/src/chains/bridge_hub_wococo_messages_to_bridge_hub_rococo.rs index 2b38eccbcb..3bb2aabde0 100644 --- a/relays/bin-substrate/src/chains/bridge_hub_wococo_messages_to_bridge_hub_rococo.rs +++ b/relays/bin-substrate/src/chains/bridge_hub_wococo_messages_to_bridge_hub_rococo.rs @@ -59,4 +59,7 @@ impl SubstrateMessageLane for BridgeHubWococoMessagesToBridgeHubRococoMessageLan BridgeHubWococoMessagesToBridgeHubRococoMessageLaneReceiveMessagesProofCallBuilder; type ReceiveMessagesDeliveryProofCallBuilder = BridgeHubWococoMessagesToBridgeHubRococoMessageLaneReceiveMessagesDeliveryProofCallBuilder; + + type SourceBatchCallBuilder = (); + type TargetBatchCallBuilder = (); } diff --git a/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs b/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs index b9920db53d..e6a2ef1a85 100644 --- a/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs +++ b/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs @@ -41,4 +41,7 @@ impl SubstrateMessageLane for MillauMessagesToRialto { millau_runtime::Runtime, millau_runtime::WithRialtoMessagesInstance, >; + + type SourceBatchCallBuilder = (); + type TargetBatchCallBuilder = (); } diff --git a/relays/bin-substrate/src/chains/millau_messages_to_rialto_parachain.rs b/relays/bin-substrate/src/chains/millau_messages_to_rialto_parachain.rs index 70cb887fa3..0b1d3afb79 100644 --- a/relays/bin-substrate/src/chains/millau_messages_to_rialto_parachain.rs +++ b/relays/bin-substrate/src/chains/millau_messages_to_rialto_parachain.rs @@ -41,4 +41,7 @@ impl SubstrateMessageLane for MillauMessagesToRialtoParachain { millau_runtime::Runtime, millau_runtime::WithRialtoParachainMessagesInstance, >; + + type SourceBatchCallBuilder = (); + type TargetBatchCallBuilder = (); } diff --git a/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs b/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs index 80b6b9fdbc..b45239fb9a 100644 --- a/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs +++ b/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs @@ -41,4 +41,7 @@ impl SubstrateMessageLane for RialtoMessagesToMillau { rialto_runtime::Runtime, rialto_runtime::WithMillauMessagesInstance, >; + + type SourceBatchCallBuilder = (); + type TargetBatchCallBuilder = (); } diff --git a/relays/bin-substrate/src/chains/rialto_parachain_messages_to_millau.rs b/relays/bin-substrate/src/chains/rialto_parachain_messages_to_millau.rs index 5cca26105b..8400157b9d 100644 --- a/relays/bin-substrate/src/chains/rialto_parachain_messages_to_millau.rs +++ b/relays/bin-substrate/src/chains/rialto_parachain_messages_to_millau.rs @@ -18,9 +18,12 @@ use relay_millau_client::Millau; use relay_rialto_parachain_client::RialtoParachain; -use substrate_relay_helper::messages_lane::{ - DirectReceiveMessagesDeliveryProofCallBuilder, DirectReceiveMessagesProofCallBuilder, - SubstrateMessageLane, +use substrate_relay_helper::{ + messages_lane::{ + DirectReceiveMessagesDeliveryProofCallBuilder, DirectReceiveMessagesProofCallBuilder, + SubstrateMessageLane, + }, + BundledBatchCallBuilder, }; /// Description of RialtoParachain -> Millau messages bridge. @@ -41,4 +44,7 @@ impl SubstrateMessageLane for RialtoParachainMessagesToMillau { rialto_parachain_runtime::Runtime, rialto_parachain_runtime::WithMillauMessagesInstance, >; + + type SourceBatchCallBuilder = (); + type TargetBatchCallBuilder = BundledBatchCallBuilder; } diff --git a/relays/bin-substrate/src/cli/relay_headers_and_messages/mod.rs b/relays/bin-substrate/src/cli/relay_headers_and_messages/mod.rs index 11cae48d1d..1ee7172228 100644 --- a/relays/bin-substrate/src/cli/relay_headers_and_messages/mod.rs +++ b/relays/bin-substrate/src/cli/relay_headers_and_messages/mod.rs @@ -59,7 +59,7 @@ use crate::{ declare_chain_cli_schema, }; use bp_messages::LaneId; -use bp_runtime::{BalanceOf, BlockNumberOf}; +use bp_runtime::BalanceOf; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, Chain, ChainWithBalances, ChainWithTransactions, Client, Parachain, @@ -167,8 +167,8 @@ where /// Returns message relay parameters. fn messages_relay_params( &self, - source_to_target_headers_relay: Arc>>, - target_to_source_headers_relay: Arc>>, + source_to_target_headers_relay: Arc>, + target_to_source_headers_relay: Arc>, lane_id: LaneId, ) -> MessagesRelayParams { MessagesRelayParams { @@ -243,8 +243,8 @@ trait Full2WayBridgeBase: Sized + Send + Sync { async fn start_on_demand_headers_relayers( &mut self, ) -> anyhow::Result<( - Arc>>, - Arc>>, + Arc>, + Arc>, )>; } diff --git a/relays/bin-substrate/src/cli/relay_headers_and_messages/parachain_to_parachain.rs b/relays/bin-substrate/src/cli/relay_headers_and_messages/parachain_to_parachain.rs index eedebe93c3..48fc0fab49 100644 --- a/relays/bin-substrate/src/cli/relay_headers_and_messages/parachain_to_parachain.rs +++ b/relays/bin-substrate/src/cli/relay_headers_and_messages/parachain_to_parachain.rs @@ -23,7 +23,6 @@ use crate::cli::{ CliChain, }; use bp_polkadot_core::parachains::ParaHash; -use bp_runtime::BlockNumberOf; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, Chain, ChainWithTransactions, Client, Parachain, @@ -215,8 +214,8 @@ where async fn start_on_demand_headers_relayers( &mut self, ) -> anyhow::Result<( - Arc>>, - Arc>>, + Arc>, + Arc>, )> { self.common.left.accounts.push(TaggedAccount::Headers { id: self.right_headers_to_left_transaction_params.signer.public().into(), @@ -249,31 +248,31 @@ where .await?; let left_relay_to_right_on_demand_headers = - OnDemandHeadersRelay::new::<::RelayFinality>( + OnDemandHeadersRelay::<::RelayFinality>::new( self.left_relay.clone(), self.common.right.client.clone(), self.left_headers_to_right_transaction_params.clone(), self.common.shared.only_mandatory_headers, ); let right_relay_to_left_on_demand_headers = - OnDemandHeadersRelay::new::<::RelayFinality>( + OnDemandHeadersRelay::<::RelayFinality>::new( self.right_relay.clone(), self.common.left.client.clone(), self.right_headers_to_left_transaction_params.clone(), self.common.shared.only_mandatory_headers, ); - let left_to_right_on_demand_parachains = OnDemandParachainsRelay::new::< + let left_to_right_on_demand_parachains = OnDemandParachainsRelay::< ::ParachainFinality, - >( + >::new( self.left_relay.clone(), self.common.right.client.clone(), self.left_parachains_to_right_transaction_params.clone(), Arc::new(left_relay_to_right_on_demand_headers), ); - let right_to_left_on_demand_parachains = OnDemandParachainsRelay::new::< + let right_to_left_on_demand_parachains = OnDemandParachainsRelay::< ::ParachainFinality, - >( + >::new( self.right_relay.clone(), self.common.left.client.clone(), self.right_parachains_to_left_transaction_params.clone(), diff --git a/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_parachain.rs b/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_parachain.rs index 22324c1d6f..aa4aaf16d7 100644 --- a/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_parachain.rs +++ b/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_parachain.rs @@ -26,7 +26,6 @@ use crate::cli::{ CliChain, }; use bp_polkadot_core::parachains::ParaHash; -use bp_runtime::BlockNumberOf; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, Chain, ChainWithTransactions, Client, Parachain, @@ -199,8 +198,8 @@ where async fn start_on_demand_headers_relayers( &mut self, ) -> anyhow::Result<( - Arc>>, - Arc>>, + Arc>, + Arc>, )> { self.common.left.accounts.push(TaggedAccount::Headers { id: self.right_headers_to_left_transaction_params.signer.public().into(), @@ -229,22 +228,22 @@ where .await?; let left_to_right_on_demand_headers = - OnDemandHeadersRelay::new::<::Finality>( + OnDemandHeadersRelay::<::Finality>::new( self.common.left.client.clone(), self.common.right.client.clone(), self.left_headers_to_right_transaction_params.clone(), self.common.shared.only_mandatory_headers, ); let right_relay_to_left_on_demand_headers = - OnDemandHeadersRelay::new::<::RelayFinality>( + OnDemandHeadersRelay::<::RelayFinality>::new( self.right_relay.clone(), self.common.left.client.clone(), self.right_headers_to_left_transaction_params.clone(), self.common.shared.only_mandatory_headers, ); - let right_to_left_on_demand_parachains = OnDemandParachainsRelay::new::< + let right_to_left_on_demand_parachains = OnDemandParachainsRelay::< ::ParachainFinality, - >( + >::new( self.right_relay.clone(), self.common.left.client.clone(), self.right_parachains_to_left_transaction_params.clone(), diff --git a/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_relay.rs b/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_relay.rs index bda532a2af..625f1e6632 100644 --- a/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_relay.rs +++ b/relays/bin-substrate/src/cli/relay_headers_and_messages/relay_to_relay.rs @@ -22,7 +22,6 @@ use crate::cli::{ relay_headers_and_messages::{Full2WayBridgeBase, Full2WayBridgeCommonParams}, CliChain, }; -use bp_runtime::BlockNumberOf; use relay_substrate_client::{AccountIdOf, AccountKeyPairOf, ChainWithTransactions}; use sp_core::Pair; use substrate_relay_helper::{ @@ -149,8 +148,8 @@ where async fn start_on_demand_headers_relayers( &mut self, ) -> anyhow::Result<( - Arc>>, - Arc>>, + Arc>, + Arc>, )> { self.common.right.accounts.push(TaggedAccount::Headers { id: self.left_to_right_transaction_params.signer.public().into(), @@ -175,14 +174,14 @@ where .await?; let left_to_right_on_demand_headers = - OnDemandHeadersRelay::new::<::Finality>( + OnDemandHeadersRelay::<::Finality>::new( self.common.left.client.clone(), self.common.right.client.clone(), self.left_to_right_transaction_params.clone(), self.common.shared.only_mandatory_headers, ); let right_to_left_on_demand_headers = - OnDemandHeadersRelay::new::<::Finality>( + OnDemandHeadersRelay::<::Finality>::new( self.common.right.client.clone(), self.common.left.client.clone(), self.right_to_left_transaction_params.clone(), diff --git a/relays/client-substrate/Cargo.toml b/relays/client-substrate/Cargo.toml index 60ef1c67a2..3f5156b765 100644 --- a/relays/client-substrate/Cargo.toml +++ b/relays/client-substrate/Cargo.toml @@ -21,6 +21,7 @@ thiserror = "1.0.26" bp-header-chain = { path = "../../primitives/header-chain" } bp-messages = { path = "../../primitives/messages" } +bp-polkadot-core = { path = "../../primitives/polkadot-core" } bp-runtime = { path = "../../primitives/runtime" } pallet-bridge-messages = { path = "../../modules/messages" } finality-relay = { path = "../finality" } diff --git a/relays/client-substrate/src/client.rs b/relays/client-substrate/src/client.rs index 4f783291ee..07293a7e65 100644 --- a/relays/client-substrate/src/client.rs +++ b/relays/client-substrate/src/client.rs @@ -57,6 +57,18 @@ const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_validate_transaction"; const MAX_SUBSCRIPTION_CAPACITY: usize = 4096; +/// The difference between best block number and number of its ancestor, that is enough +/// for us to consider that ancestor an "ancient" block with dropped state. +/// +/// The relay does not assume that it is connected to the archive node, so it always tries +/// to use the best available chain state. But sometimes it still may use state of some +/// old block. If the state of that block is already dropped, relay will see errors when +/// e.g. it tries to prove something. +/// +/// By default Substrate-based nodes are storing state for last 256 blocks. We'll use +/// half of this value. +pub const ANCIENT_BLOCK_THRESHOLD: u32 = 128; + /// Opaque justifications subscription type. pub struct Subscription(pub(crate) Mutex>>); diff --git a/relays/client-substrate/src/error.rs b/relays/client-substrate/src/error.rs index 9323b75722..ddea1819fb 100644 --- a/relays/client-substrate/src/error.rs +++ b/relays/client-substrate/src/error.rs @@ -16,6 +16,7 @@ //! Substrate node RPC errors. +use bp_polkadot_core::parachains::ParaId; use jsonrpsee::core::Error as RpcError; use relay_utils::MaybeConnectionError; use sc_rpc_api::system::Health; @@ -45,6 +46,12 @@ pub enum Error { /// Runtime storage is missing some mandatory value. #[error("Mandatory storage value is missing from the runtime storage.")] MissingMandatoryStorageValue, + /// Required parachain head is not present at the relay chain. + #[error("Parachain {0:?} head {1} is missing from the relay chain storage.")] + MissingRequiredParachainHead(ParaId, u64), + /// Failed to find finality proof for the given header. + #[error("Failed to find finality proof for header {0}.")] + FinalityProofNotFound(u64), /// The client we're connected to is not synced, so we can't rely on its state. #[error("Substrate client is not synced {0}.")] ClientNotSynced(Health), diff --git a/relays/client-substrate/src/lib.rs b/relays/client-substrate/src/lib.rs index c9926855fd..dc90737523 100644 --- a/relays/client-substrate/src/lib.rs +++ b/relays/client-substrate/src/lib.rs @@ -37,7 +37,10 @@ pub use crate::{ ChainWithGrandpa, ChainWithMessages, ChainWithTransactions, Parachain, RelayChain, SignParam, TransactionStatusOf, UnsignedTransaction, }, - client::{ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription}, + client::{ + ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription, + ANCIENT_BLOCK_THRESHOLD, + }, error::{Error, Result}, rpc::{SubstrateBeefyFinalityClient, SubstrateFinalityClient, SubstrateGrandpaFinalityClient}, sync_header::SyncHeader, diff --git a/relays/lib-substrate-relay/Cargo.toml b/relays/lib-substrate-relay/Cargo.toml index bdf49d42ea..1dd7cb7b37 100644 --- a/relays/lib-substrate-relay/Cargo.toml +++ b/relays/lib-substrate-relay/Cargo.toml @@ -41,6 +41,7 @@ bp-messages = { path = "../../primitives/messages" } frame-support = { git = "/~https://github.com/paritytech/substrate", branch = "master" } frame-system = { git = "/~https://github.com/paritytech/substrate", branch = "master" } pallet-balances = { git = "/~https://github.com/paritytech/substrate", branch = "master" } +pallet-utility = { git = "/~https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "/~https://github.com/paritytech/substrate", branch = "master" } sp-finality-grandpa = { git = "/~https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "/~https://github.com/paritytech/substrate", branch = "master" } diff --git a/relays/lib-substrate-relay/src/finality/source.rs b/relays/lib-substrate-relay/src/finality/source.rs index e75862a822..c8f11d9946 100644 --- a/relays/lib-substrate-relay/src/finality/source.rs +++ b/relays/lib-substrate-relay/src/finality/source.rs @@ -20,13 +20,18 @@ use crate::finality::{engine::Engine, FinalitySyncPipelineAdapter, SubstrateFina use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; +use bp_header_chain::FinalityProof; use codec::Decode; use finality_relay::SourceClient; -use futures::stream::{unfold, Stream, StreamExt}; +use futures::{ + select, + stream::{try_unfold, unfold, Stream, StreamExt, TryStreamExt}, +}; +use num_traits::One; use relay_substrate_client::{ BlockNumberOf, BlockWithJustification, Chain, Client, Error, HeaderOf, }; -use relay_utils::relay_loop::Client as RelayClient; +use relay_utils::{relay_loop::Client as RelayClient, UniqueSaturatedInto}; use std::pin::Pin; /// Shared updatable reference to the maximal header number that we want to sync from the source. @@ -70,6 +75,111 @@ impl SubstrateFinalitySource

{ // target node may be missing proofs that are already available at the source self.client.best_finalized_header_number().await } + + /// Return header and its justification of the given block or its descendant that + /// has a GRANDPA justification. + /// + /// This method is optimized for cases when `block_number` is close to the best finalized + /// chain block. + pub async fn prove_block_finality( + &self, + block_number: BlockNumberOf, + ) -> Result< + (relay_substrate_client::SyncHeader>, SubstrateFinalityProof

), + Error, + > { + // first, subscribe to proofs + let next_persistent_proof = + self.persistent_proofs_stream(block_number + One::one()).await?.fuse(); + let next_ephemeral_proof = self.ephemeral_proofs_stream(block_number).await?.fuse(); + + // in perfect world we'll need to return justfication for the requested `block_number` + let (header, maybe_proof) = self.header_and_finality_proof(block_number).await?; + if let Some(proof) = maybe_proof { + return Ok((header, proof)) + } + + // otherwise we don't care which header to return, so let's select first + futures::pin_mut!(next_persistent_proof, next_ephemeral_proof); + loop { + select! { + maybe_header_and_proof = next_persistent_proof.next() => match maybe_header_and_proof { + Some(header_and_proof) => return header_and_proof, + None => continue, + }, + maybe_header_and_proof = next_ephemeral_proof.next() => match maybe_header_and_proof { + Some(header_and_proof) => return header_and_proof, + None => continue, + }, + complete => return Err(Error::FinalityProofNotFound(block_number.unique_saturated_into())) + } + } + } + + /// Returns stream of headers and their persistent proofs, starting from given block. + async fn persistent_proofs_stream( + &self, + block_number: BlockNumberOf, + ) -> Result< + impl Stream< + Item = Result< + ( + relay_substrate_client::SyncHeader>, + SubstrateFinalityProof

, + ), + Error, + >, + >, + Error, + > { + let client = self.client.clone(); + let best_finalized_block_number = self.client.best_finalized_header_number().await?; + Ok(try_unfold((client, block_number), move |(client, current_block_number)| async move { + // if we've passed the `best_finalized_block_number`, we no longer need persistent + // justifications + if current_block_number > best_finalized_block_number { + return Ok(None) + } + + let (header, maybe_proof) = + header_and_finality_proof::

(&client, current_block_number).await?; + let next_block_number = current_block_number + One::one(); + let next_state = (client, next_block_number); + + Ok(Some((maybe_proof.map(|proof| (header, proof)), next_state))) + }) + .try_filter_map(|maybe_result| async { Ok(maybe_result) })) + } + + /// Returns stream of headers and their ephemeral proofs, starting from given block. + async fn ephemeral_proofs_stream( + &self, + block_number: BlockNumberOf, + ) -> Result< + impl Stream< + Item = Result< + ( + relay_substrate_client::SyncHeader>, + SubstrateFinalityProof

, + ), + Error, + >, + >, + Error, + > { + let client = self.client.clone(); + Ok(self.finality_proofs().await?.map(Ok).try_filter_map(move |proof| { + let client = client.clone(); + async move { + if proof.target_header_number() < block_number { + return Ok(None) + } + + let header = client.header_by_number(proof.target_header_number()).await?; + Ok(Some((header.into(), proof))) + } + })) + } } impl Clone for SubstrateFinalitySource

{ @@ -119,18 +229,7 @@ impl SourceClient { - let header_hash = self.client.block_hash_by_number(number).await?; - let signed_block = self.client.get_block(Some(header_hash)).await?; - - let justification = signed_block - .justification(P::FinalityEngine::ID) - .map(|raw_justification| { - SubstrateFinalityProof::

::decode(&mut raw_justification.as_slice()) - }) - .transpose() - .map_err(Error::ResponseParseFailed)?; - - Ok((signed_block.header().into(), justification)) + header_and_finality_proof::

(&self.client, number).await } async fn finality_proofs(&self) -> Result { @@ -173,3 +272,27 @@ impl SourceClient( + client: &Client, + number: BlockNumberOf, +) -> Result< + ( + relay_substrate_client::SyncHeader>, + Option>, + ), + Error, +> { + let header_hash = client.block_hash_by_number(number).await?; + let signed_block = client.get_block(Some(header_hash)).await?; + + let justification = signed_block + .justification(P::FinalityEngine::ID) + .map(|raw_justification| { + SubstrateFinalityProof::

::decode(&mut raw_justification.as_slice()) + }) + .transpose() + .map_err(Error::ResponseParseFailed)?; + + Ok((signed_block.header().into(), justification)) +} diff --git a/relays/lib-substrate-relay/src/lib.rs b/relays/lib-substrate-relay/src/lib.rs index 62ae756e00..2181f09358 100644 --- a/relays/lib-substrate-relay/src/lib.rs +++ b/relays/lib-substrate-relay/src/lib.rs @@ -18,6 +18,9 @@ #![warn(missing_docs)] +use relay_substrate_client::Error as SubstrateError; +use std::marker::PhantomData; + pub mod error; pub mod finality; pub mod messages_lane; @@ -96,3 +99,52 @@ impl TaggedAccount { } } } + +/// Batch call builder. +pub trait BatchCallBuilder { + /// Associated error type. + type Error; + /// If `true`, then batch calls are supported at the chain. + const BATCH_CALL_SUPPORTED: bool; + + /// Create batch call from given calls vector. + fn build_batch_call(_calls: Vec) -> Result; +} + +impl BatchCallBuilder for () { + type Error = SubstrateError; + const BATCH_CALL_SUPPORTED: bool = false; + + fn build_batch_call(_calls: Vec) -> Result { + debug_assert!( + false, + "only called if `BATCH_CALL_SUPPORTED` is true;\ + `BATCH_CALL_SUPPORTED` is false;\ + qed" + ); + + Err(SubstrateError::Custom("<() as BatchCallBuilder>::build_batch_call() is called".into())) + } +} + +/// Batch call builder for bundled runtimes. +pub struct BundledBatchCallBuilder(PhantomData); + +impl BatchCallBuilder<::RuntimeCall> for BundledBatchCallBuilder +where + R: pallet_utility::Config::RuntimeCall>, + ::RuntimeCall: From>, +{ + type Error = SubstrateError; + const BATCH_CALL_SUPPORTED: bool = true; + + fn build_batch_call( + mut calls: Vec<::RuntimeCall>, + ) -> Result<::RuntimeCall, SubstrateError> { + Ok(if calls.len() == 1 { + calls.remove(0) + } else { + pallet_utility::Call::batch_all { calls }.into() + }) + } +} diff --git a/relays/lib-substrate-relay/src/messages_lane.rs b/relays/lib-substrate-relay/src/messages_lane.rs index da138a3d12..8ec78d0ae2 100644 --- a/relays/lib-substrate-relay/src/messages_lane.rs +++ b/relays/lib-substrate-relay/src/messages_lane.rs @@ -20,7 +20,7 @@ use crate::{ messages_source::{SubstrateMessagesProof, SubstrateMessagesSource}, messages_target::{SubstrateMessagesDeliveryProof, SubstrateMessagesTarget}, on_demand::OnDemandRelay, - TransactionParams, + BatchCallBuilder, TransactionParams, }; use async_std::sync::Arc; @@ -35,7 +35,7 @@ use messages_relay::message_lane::MessageLane; use pallet_bridge_messages::{Call as BridgeMessagesCall, Config as BridgeMessagesConfig}; use relay_substrate_client::{ transaction_stall_timeout, AccountKeyPairOf, BalanceOf, BlockNumberOf, CallOf, Chain, - ChainWithMessages, ChainWithTransactions, Client, HashOf, + ChainWithMessages, ChainWithTransactions, Client, Error as SubstrateError, HashOf, }; use relay_utils::{ metrics::{GlobalMetrics, MetricsParams, StandaloneMetric}, @@ -55,11 +55,16 @@ pub trait SubstrateMessageLane: 'static + Clone + Debug + Send + Sync { type ReceiveMessagesProofCallBuilder: ReceiveMessagesProofCallBuilder; /// How receive messages delivery proof call is built? type ReceiveMessagesDeliveryProofCallBuilder: ReceiveMessagesDeliveryProofCallBuilder; + + /// How batch calls are built at the source chain? + type SourceBatchCallBuilder: BatchCallBuilder, Error = SubstrateError>; + /// How batch calls are built at the target chain? + type TargetBatchCallBuilder: BatchCallBuilder, Error = SubstrateError>; } /// Adapter that allows all `SubstrateMessageLane` to act as `MessageLane`. #[derive(Clone, Debug)] -pub(crate) struct MessageLaneAdapter { +pub struct MessageLaneAdapter { _phantom: PhantomData

, } @@ -90,10 +95,10 @@ pub struct MessagesRelayParams { pub target_transaction_params: TransactionParams>, /// Optional on-demand source to target headers relay. pub source_to_target_headers_relay: - Option>>>, + Option>>, /// Optional on-demand target to source headers relay. pub target_to_source_headers_relay: - Option>>>, + Option>>, /// Identifier of lane that needs to be served. pub lane_id: LaneId, /// Metrics parameters. diff --git a/relays/lib-substrate-relay/src/messages_source.rs b/relays/lib-substrate-relay/src/messages_source.rs index bf3779b3b4..e2876b3107 100644 --- a/relays/lib-substrate-relay/src/messages_source.rs +++ b/relays/lib-substrate-relay/src/messages_source.rs @@ -24,7 +24,7 @@ use crate::{ }, messages_target::SubstrateMessagesDeliveryProof, on_demand::OnDemandRelay, - TransactionParams, + BatchCallBuilder, TransactionParams, }; use async_std::sync::Arc; @@ -41,14 +41,14 @@ use frame_support::weights::Weight; use messages_relay::{ message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane_loop::{ - ClientState, MessageDetails, MessageDetailsMap, MessageProofParameters, SourceClient, - SourceClientState, + BatchTransaction, ClientState, MessageDetails, MessageDetailsMap, MessageProofParameters, + SourceClient, SourceClientState, }, }; use num_traits::Zero; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, - Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, + AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, CallOf, Chain, ChainWithMessages, + Client, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, TransactionTracker, UnsignedTransaction, }; use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; @@ -68,7 +68,7 @@ pub struct SubstrateMessagesSource { target_client: Client, lane_id: LaneId, transaction_params: TransactionParams>, - target_to_source_headers_relay: Option>>>, + target_to_source_headers_relay: Option>>, } impl SubstrateMessagesSource

{ @@ -79,7 +79,7 @@ impl SubstrateMessagesSource

{ lane_id: LaneId, transaction_params: TransactionParams>, target_to_source_headers_relay: Option< - Arc>>, + Arc>, >, ) -> Self { SubstrateMessagesSource { @@ -140,6 +140,7 @@ impl SourceClient> for SubstrateM where AccountIdOf: From< as Pair>::Public>, { + type BatchTransaction = BatchConfirmationTransaction

; type TransactionTracker = TransactionTracker>; async fn state(&self) -> Result>, SubstrateError> { @@ -360,10 +361,93 @@ where .await } - async fn require_target_header_on_source(&self, id: TargetHeaderIdOf>) { + async fn require_target_header_on_source( + &self, + id: TargetHeaderIdOf>, + ) -> Result, SubstrateError> { if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay { + if P::SourceBatchCallBuilder::BATCH_CALL_SUPPORTED { + return BatchConfirmationTransaction::

::new(self.clone(), id).await.map(Some) + } + target_to_source_headers_relay.require_more_headers(id.0).await; } + + Ok(None) + } +} + +/// Batch transaction that brings target headers + and delivery confirmations to the source node. +pub struct BatchConfirmationTransaction { + messages_source: SubstrateMessagesSource

, + proved_header: TargetHeaderIdOf>, + prove_calls: Vec>, +} + +impl BatchConfirmationTransaction

{ + async fn new( + messages_source: SubstrateMessagesSource

, + required_target_header_on_source: TargetHeaderIdOf>, + ) -> Result { + let (proved_header, prove_calls) = messages_source + .target_to_source_headers_relay + .as_ref() + .expect("BatchConfirmationTransaction is only created when target_to_source_headers_relay is Some; qed") + .prove_header(required_target_header_on_source.0) + .await?; + Ok(Self { messages_source, proved_header, prove_calls }) + } +} + +#[async_trait] +impl + BatchTransaction< + TargetHeaderIdOf>, + as MessageLane>::MessagesReceivingProof, + TransactionTracker>, + SubstrateError, + > for BatchConfirmationTransaction

+where + AccountIdOf: From< as Pair>::Public>, +{ + fn required_header_id(&self) -> TargetHeaderIdOf> { + self.proved_header + } + + async fn append_proof_and_send( + self, + proof: as MessageLane>::MessagesReceivingProof, + ) -> Result>, SubstrateError> { + let mut calls = self.prove_calls; + calls.push( + P::ReceiveMessagesDeliveryProofCallBuilder::build_receive_messages_delivery_proof_call( + proof, false, + ), + ); + let batch_call = P::SourceBatchCallBuilder::build_batch_call(calls)?; + + let (spec_version, transaction_version) = + self.messages_source.source_client.simple_runtime_version().await?; + self.messages_source + .source_client + .submit_and_watch_signed_extrinsic( + self.messages_source.transaction_params.signer.public().into(), + SignParam:: { + spec_version, + transaction_version, + genesis_hash: *self.messages_source.source_client.genesis_hash(), + signer: self.messages_source.transaction_params.signer.clone(), + }, + move |best_block_id, transaction_nonce| { + Ok(UnsignedTransaction::new(batch_call.into(), transaction_nonce).era( + TransactionEra::new( + best_block_id, + self.messages_source.transaction_params.mortality, + ), + )) + }, + ) + .await } } diff --git a/relays/lib-substrate-relay/src/messages_target.rs b/relays/lib-substrate-relay/src/messages_target.rs index 22a50acf37..9d80b9166c 100644 --- a/relays/lib-substrate-relay/src/messages_target.rs +++ b/relays/lib-substrate-relay/src/messages_target.rs @@ -22,7 +22,7 @@ use crate::{ messages_lane::{MessageLaneAdapter, ReceiveMessagesProofCallBuilder, SubstrateMessageLane}, messages_source::{ensure_messages_pallet_active, read_client_state, SubstrateMessagesProof}, on_demand::OnDemandRelay, - TransactionParams, + BatchCallBuilder, TransactionParams, }; use async_std::sync::Arc; @@ -34,10 +34,10 @@ use bp_messages::{ use bridge_runtime_common::messages::source::FromBridgedChainMessagesDeliveryProof; use messages_relay::{ message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, - message_lane_loop::{NoncesSubmitArtifacts, TargetClient, TargetClientState}, + message_lane_loop::{BatchTransaction, NoncesSubmitArtifacts, TargetClient, TargetClientState}, }; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, + AccountIdOf, AccountKeyPairOf, BalanceOf, CallOf, Chain, ChainWithMessages, Client, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, TransactionTracker, UnsignedTransaction, }; @@ -56,7 +56,7 @@ pub struct SubstrateMessagesTarget { lane_id: LaneId, relayer_id_at_source: AccountIdOf, transaction_params: TransactionParams>, - source_to_target_headers_relay: Option>>>, + source_to_target_headers_relay: Option>>, } impl SubstrateMessagesTarget

{ @@ -68,7 +68,7 @@ impl SubstrateMessagesTarget

{ relayer_id_at_source: AccountIdOf, transaction_params: TransactionParams>, source_to_target_headers_relay: Option< - Arc>>, + Arc>, >, ) -> Self { SubstrateMessagesTarget { @@ -132,6 +132,7 @@ where AccountIdOf: From< as Pair>::Public>, BalanceOf: TryFrom>, { + type BatchTransaction = BatchDeliveryTransaction

; type TransactionTracker = TransactionTracker>; async fn state(&self) -> Result>, SubstrateError> { @@ -267,32 +268,126 @@ where Ok(NoncesSubmitArtifacts { nonces, tx_tracker }) } - async fn require_source_header_on_target(&self, id: SourceHeaderIdOf>) { + async fn require_source_header_on_target( + &self, + id: SourceHeaderIdOf>, + ) -> Result, SubstrateError> { if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay { + if P::TargetBatchCallBuilder::BATCH_CALL_SUPPORTED { + return BatchDeliveryTransaction::

::new(self.clone(), id).await.map(Some) + } + source_to_target_headers_relay.require_more_headers(id.0).await; } + + Ok(None) } } -/// Make messages delivery transaction from given proof. -fn make_messages_delivery_transaction( - target_transaction_params: &TransactionParams>, - target_best_block_id: HeaderIdOf, - transaction_nonce: IndexOf, +/// Batch transaction that brings target headers + and delivery confirmations to the source node. +pub struct BatchDeliveryTransaction { + messages_target: SubstrateMessagesTarget

, + proved_header: SourceHeaderIdOf>, + prove_calls: Vec>, +} + +impl BatchDeliveryTransaction

{ + async fn new( + messages_target: SubstrateMessagesTarget

, + required_source_header_on_target: SourceHeaderIdOf>, + ) -> Result { + let (proved_header, prove_calls) = messages_target + .source_to_target_headers_relay + .as_ref() + .expect("BatchDeliveryTransaction is only created when source_to_target_headers_relay is Some; qed") + .prove_header(required_source_header_on_target.0) + .await?; + Ok(Self { messages_target, proved_header, prove_calls }) + } +} + +#[async_trait] +impl + BatchTransaction< + SourceHeaderIdOf>, + as MessageLane>::MessagesProof, + TransactionTracker>, + SubstrateError, + > for BatchDeliveryTransaction

+where + AccountIdOf: From< as Pair>::Public>, +{ + fn required_header_id(&self) -> SourceHeaderIdOf> { + self.proved_header + } + + async fn append_proof_and_send( + self, + proof: as MessageLane>::MessagesProof, + ) -> Result>, SubstrateError> { + let mut calls = self.prove_calls; + calls.push(make_messages_delivery_call::

( + self.messages_target.relayer_id_at_source, + proof.1.nonces_start..=proof.1.nonces_end, + proof, + false, + )); + let batch_call = P::TargetBatchCallBuilder::build_batch_call(calls)?; + + let (spec_version, transaction_version) = + self.messages_target.target_client.simple_runtime_version().await?; + self.messages_target + .target_client + .submit_and_watch_signed_extrinsic( + self.messages_target.transaction_params.signer.public().into(), + SignParam:: { + spec_version, + transaction_version, + genesis_hash: *self.messages_target.target_client.genesis_hash(), + signer: self.messages_target.transaction_params.signer.clone(), + }, + move |best_block_id, transaction_nonce| { + Ok(UnsignedTransaction::new(batch_call.into(), transaction_nonce).era( + TransactionEra::new( + best_block_id, + self.messages_target.transaction_params.mortality, + ), + )) + }, + ) + .await + } +} + +/// Make messages delivery call from given proof. +fn make_messages_delivery_call( relayer_id_at_source: AccountIdOf, nonces: RangeInclusive, proof: SubstrateMessagesProof, trace_call: bool, -) -> Result, SubstrateError> { +) -> CallOf { let messages_count = nonces.end() - nonces.start() + 1; let dispatch_weight = proof.0; - let call = P::ReceiveMessagesProofCallBuilder::build_receive_messages_proof_call( + P::ReceiveMessagesProofCallBuilder::build_receive_messages_proof_call( relayer_id_at_source, proof, messages_count as _, dispatch_weight, trace_call, - ); + ) +} + +/// Make messages delivery transaction from given proof. +fn make_messages_delivery_transaction( + target_transaction_params: &TransactionParams>, + target_best_block_id: HeaderIdOf, + transaction_nonce: IndexOf, + relayer_id_at_source: AccountIdOf, + nonces: RangeInclusive, + proof: SubstrateMessagesProof, + trace_call: bool, +) -> Result, SubstrateError> { + let call = make_messages_delivery_call::

(relayer_id_at_source, nonces, proof, trace_call); Ok(UnsignedTransaction::new(call.into(), transaction_nonce) .era(TransactionEra::new(target_best_block_id, target_transaction_params.mortality))) } diff --git a/relays/lib-substrate-relay/src/on_demand/headers.rs b/relays/lib-substrate-relay/src/on_demand/headers.rs index c0603cda8c..369520f0e9 100644 --- a/relays/lib-substrate-relay/src/on_demand/headers.rs +++ b/relays/lib-substrate-relay/src/on_demand/headers.rs @@ -16,15 +16,21 @@ //! On-demand Substrate -> Substrate header finality relay. +use crate::finality::SubmitFinalityProofCallBuilder; + use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use bp_header_chain::ConsensusLogReader; +use bp_runtime::HeaderIdProvider; use futures::{select, FutureExt}; use num_traits::{One, Zero}; use sp_runtime::traits::Header; use finality_relay::{FinalitySyncParams, TargetClient as FinalityTargetClient}; -use relay_substrate_client::{AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client}; +use relay_substrate_client::{ + AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError, + HeaderIdOf, +}; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError, STALL_TIMEOUT, @@ -47,16 +53,18 @@ use crate::{ /// relay) needs it to continue its regular work. When enough headers are relayed, on-demand stops /// syncing headers. #[derive(Clone)] -pub struct OnDemandHeadersRelay { +pub struct OnDemandHeadersRelay { /// Relay task name. relay_task_name: String, /// Shared reference to maximal required finalized header number. - required_header_number: RequiredHeaderNumberRef, + required_header_number: RequiredHeaderNumberRef, + /// Client of the source chain. + source_client: Client, } -impl OnDemandHeadersRelay { +impl OnDemandHeadersRelay

{ /// Create new on-demand headers relay. - pub fn new>( + pub fn new( source_client: Client, target_client: Client, target_transaction_params: TransactionParams>, @@ -70,6 +78,7 @@ impl OnDemandHeadersRelay { let this = OnDemandHeadersRelay { relay_task_name: on_demand_headers_relay_name::(), required_header_number: required_header_number.clone(), + source_client: source_client.clone(), }; async_std::task::spawn(async move { background_task::

( @@ -87,23 +96,49 @@ impl OnDemandHeadersRelay { } #[async_trait] -impl OnDemandRelay> - for OnDemandHeadersRelay +impl OnDemandRelay + for OnDemandHeadersRelay

{ - async fn require_more_headers(&self, required_header: BlockNumberOf) { + async fn require_more_headers(&self, required_header: BlockNumberOf) { let mut required_header_number = self.required_header_number.lock().await; if required_header > *required_header_number { log::trace!( target: "bridge", "[{}] More {} headers required. Going to sync up to the {}", self.relay_task_name, - SourceChain::NAME, + P::SourceChain::NAME, required_header, ); *required_header_number = required_header; } } + + async fn prove_header( + &self, + required_header: BlockNumberOf, + ) -> Result<(HeaderIdOf, Vec>), SubstrateError> { + // first find proper header (either `required_header`) or its descendant + let finality_source = SubstrateFinalitySource::

::new(self.source_client.clone(), None); + let (header, proof) = finality_source.prove_block_finality(required_header).await?; + let header_id = header.id(); + + log::debug!( + target: "bridge", + "[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?}", + self.relay_task_name, + P::SourceChain::NAME, + required_header, + P::SourceChain::NAME, + header_id, + ); + + // and then craft the submit-proof call + let call = + P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(header, proof); + + Ok((header_id, vec![call])) + } } /// Background task that is responsible for starting headers relay. diff --git a/relays/lib-substrate-relay/src/on_demand/mod.rs b/relays/lib-substrate-relay/src/on_demand/mod.rs index 7a2dfc9c15..eca7d20163 100644 --- a/relays/lib-substrate-relay/src/on_demand/mod.rs +++ b/relays/lib-substrate-relay/src/on_demand/mod.rs @@ -18,18 +18,28 @@ //! on-demand pipelines. use async_trait::async_trait; +use relay_substrate_client::{BlockNumberOf, CallOf, Chain, Error as SubstrateError, HeaderIdOf}; pub mod headers; pub mod parachains; /// On-demand headers relay that is relaying finalizing headers only when requested. #[async_trait] -pub trait OnDemandRelay: Send + Sync { +pub trait OnDemandRelay: Send + Sync { /// Ask relay to relay source header with given number to the target chain. /// /// Depending on implementation, on-demand relay may also relay `required_header` ancestors /// (e.g. if they're mandatory), or its descendants. The request is considered complete if /// the best avbailable header at the target chain has number that is larger than or equal /// to the `required_header`. - async fn require_more_headers(&self, required_header: SourceHeaderNumber); + async fn require_more_headers(&self, required_header: BlockNumberOf); + + /// Ask relay to prove source `required_header` to the `TargetChain`. + /// + /// Returns number of header that is proved (it may be the `required_header` or one of its + /// descendants) and calls for delivering the proof. + async fn prove_header( + &self, + required_header: BlockNumberOf, + ) -> Result<(HeaderIdOf, Vec>), SubstrateError>; } diff --git a/relays/lib-substrate-relay/src/on_demand/parachains.rs b/relays/lib-substrate-relay/src/on_demand/parachains.rs index 991a84352e..cfb9d9c7a9 100644 --- a/relays/lib-substrate-relay/src/on_demand/parachains.rs +++ b/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -21,7 +21,7 @@ use crate::{ on_demand::OnDemandRelay, parachains::{ source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter, - SubstrateParachainsPipeline, + SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline, }, TransactionParams, }; @@ -31,18 +31,21 @@ use async_std::{ sync::{Arc, Mutex}, }; use async_trait::async_trait; -use bp_polkadot_core::parachains::ParaHash; +use bp_polkadot_core::parachains::{ParaHash, ParaId}; use bp_runtime::HeaderIdProvider; use futures::{select, FutureExt}; use num_traits::Zero; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; -use parachains_relay::parachains_loop::{AvailableHeader, ParachainSyncParams, TargetClient}; +use parachains_relay::parachains_loop::{ + AvailableHeader, ParachainSyncParams, SourceClient, TargetClient, +}; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, - ParachainBase, + AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError, + HashOf, HeaderIdOf, ParachainBase, ANCIENT_BLOCK_THRESHOLD, }; use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, + metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, + HeaderId, UniqueSaturatedInto, }; use std::fmt::Debug; @@ -52,25 +55,32 @@ use std::fmt::Debug; /// (e.g. messages relay) needs it to continue its regular work. When enough parachain headers /// are relayed, on-demand stops syncing headers. #[derive(Clone)] -pub struct OnDemandParachainsRelay { +pub struct OnDemandParachainsRelay { /// Relay task name. relay_task_name: String, /// Channel used to communicate with background task and ask for relay of parachain heads. - required_header_number_sender: Sender>, + required_header_number_sender: Sender>, + /// Source relay chain client. + source_relay_client: Client, + /// Target chain client. + target_client: Client, + /// On-demand relay chain relay. + on_demand_source_relay_to_target_headers: + Arc>, } -impl OnDemandParachainsRelay { +impl OnDemandParachainsRelay

{ /// Create new on-demand parachains relay. /// /// Note that the argument is the source relay chain client, not the parachain client. /// That's because parachain finality is determined by the relay chain and we don't /// need to connect to the parachain itself here. - pub fn new>( + pub fn new( source_relay_client: Client, target_client: Client, target_transaction_params: TransactionParams>, on_demand_source_relay_to_target_headers: Arc< - dyn OnDemandRelay>, + dyn OnDemandRelay, >, ) -> Self where @@ -82,8 +92,13 @@ impl OnDemandParachainsRelay { { let (required_header_number_sender, required_header_number_receiver) = unbounded(); let this = OnDemandParachainsRelay { - relay_task_name: on_demand_parachains_relay_name::(), + relay_task_name: on_demand_parachains_relay_name::( + ), required_header_number_sender, + source_relay_client: source_relay_client.clone(), + target_client: target_client.clone(), + on_demand_source_relay_to_target_headers: on_demand_source_relay_to_target_headers + .clone(), }; async_std::task::spawn(async move { background_task::

( @@ -101,23 +116,109 @@ impl OnDemandParachainsRelay { } #[async_trait] -impl OnDemandRelay> - for OnDemandParachainsRelay +impl OnDemandRelay + for OnDemandParachainsRelay

where - SourceParachain: Chain, + P::SourceParachain: Chain, { - async fn require_more_headers(&self, required_header: BlockNumberOf) { + async fn require_more_headers(&self, required_header: BlockNumberOf) { if let Err(e) = self.required_header_number_sender.send(required_header).await { log::trace!( target: "bridge", "[{}] Failed to request {} header {:?}: {:?}", self.relay_task_name, - SourceParachain::NAME, + P::SourceParachain::NAME, required_header, e, ); } } + + /// Ask relay to prove source `required_header` to the `TargetChain`. + async fn prove_header( + &self, + required_parachain_header: BlockNumberOf, + ) -> Result<(HeaderIdOf, Vec>), SubstrateError> { + // select headers to prove + let parachains_source = ParachainsSource::

::new( + self.source_relay_client.clone(), + Arc::new(Mutex::new(AvailableHeader::Missing)), + ); + let env = (self, ¶chains_source); + let (need_to_prove_relay_block, selected_relay_block, selected_parachain_block) = + select_headers_to_prove(env, required_parachain_header).await?; + + log::debug!( + target: "bridge", + "[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} and {} head {:?}", + self.relay_task_name, + P::SourceParachain::NAME, + required_parachain_header, + P::SourceParachain::NAME, + selected_parachain_block, + P::SourceRelayChain::NAME, + if need_to_prove_relay_block { + Some(selected_relay_block) + } else { + None + }, + ); + + // now let's prove relay chain block (if needed) + let mut calls = Vec::new(); + let mut proved_relay_block = selected_relay_block; + if need_to_prove_relay_block { + let (relay_block, relay_prove_call) = self + .on_demand_source_relay_to_target_headers + .prove_header(selected_relay_block.number()) + .await?; + proved_relay_block = relay_block; + calls.extend(relay_prove_call); + } + + // despite what we've selected before (in `select_headers_to_prove` call), if headers relay + // have chose the different header (e.g. because there's no GRANDPA jusstification for it), + // we need to prove parachain head available at this header + let para_id = ParaId(P::SourceParachain::PARACHAIN_ID); + let mut proved_parachain_block = selected_parachain_block; + if proved_relay_block != selected_relay_block { + proved_parachain_block = parachains_source + .on_chain_para_head_id(proved_relay_block, para_id) + .await? + // this could happen e.g. if parachain has been offboarded? + .ok_or_else(|| { + SubstrateError::MissingRequiredParachainHead( + para_id, + proved_relay_block.number().unique_saturated_into(), + ) + })?; + + log::debug!( + target: "bridge", + "[{}] Selected to prove {} head {:?} and {} head {:?}. Instead proved {} head {:?} and {} head {:?}", + self.relay_task_name, + P::SourceParachain::NAME, + selected_parachain_block, + P::SourceRelayChain::NAME, + selected_relay_block, + P::SourceParachain::NAME, + proved_parachain_block, + P::SourceRelayChain::NAME, + proved_relay_block, + ); + } + + // and finally - prove parachain head + let (para_proof, para_hashes) = + parachains_source.prove_parachain_heads(proved_relay_block, &[para_id]).await?; + calls.push(P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call( + proved_relay_block, + para_hashes.into_iter().map(|h| (para_id, h)).collect(), + para_proof, + )); + + Ok((proved_parachain_block, calls)) + } } /// Background task that is responsible for starting parachain headers relay. @@ -126,7 +227,7 @@ async fn background_task( target_client: Client, target_transaction_params: TransactionParams>, on_demand_source_relay_to_target_headers: Arc< - dyn OnDemandRelay>, + dyn OnDemandRelay, >, required_parachain_header_number_receiver: Receiver>, ) where @@ -487,6 +588,125 @@ where RelayState::RelayingParaHeader(para_header_at_source) } +/// Environment for the `select_headers_to_prove` call. +#[async_trait] +trait SelectHeadersToProveEnvironment { + /// Returns associated parachain id. + fn parachain_id(&self) -> ParaId; + /// Returns best finalized relay block. + async fn best_finalized_relay_block_at_source( + &self, + ) -> Result, SubstrateError>; + /// Returns best finalized relay block that is known at `P::TargetChain`. + async fn best_finalized_relay_block_at_target( + &self, + ) -> Result, SubstrateError>; + /// Returns best finalized parachain block at given source relay chain block. + async fn best_finalized_para_block_at_source( + &self, + at_relay_block: HeaderId, + ) -> Result>, SubstrateError>; +} + +#[async_trait] +impl<'a, P: SubstrateParachainsPipeline> + SelectHeadersToProveEnvironment< + BlockNumberOf, + HashOf, + BlockNumberOf, + HashOf, + > for (&'a OnDemandParachainsRelay

, &'a ParachainsSource

) +{ + fn parachain_id(&self) -> ParaId { + ParaId(P::SourceParachain::PARACHAIN_ID) + } + + async fn best_finalized_relay_block_at_source( + &self, + ) -> Result, SubstrateError> { + Ok(self.0.source_relay_client.best_finalized_header().await?.id()) + } + + async fn best_finalized_relay_block_at_target( + &self, + ) -> Result, SubstrateError> { + Ok(crate::messages_source::read_client_state::( + &self.0.target_client, + None, + P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD, + ) + .await? + .best_finalized_peer_at_best_self) + } + + async fn best_finalized_para_block_at_source( + &self, + at_relay_block: HeaderIdOf, + ) -> Result>, SubstrateError> { + self.1.on_chain_para_head_id(at_relay_block, self.parachain_id()).await + } +} + +/// Given request to prove `required_parachain_header`, select actual headers that need to be +/// proved. +async fn select_headers_to_prove( + env: impl SelectHeadersToProveEnvironment, + required_parachain_header: PBN, +) -> Result<(bool, HeaderId, HeaderId), SubstrateError> +where + RBH: Copy, + RBN: BlockNumberBase, + PBH: Copy, + PBN: BlockNumberBase, +{ + // parachains proof also requires relay header proof. Let's first select relay block + // number that we'll be dealing with + let best_finalized_relay_block_at_source = env.best_finalized_relay_block_at_source().await?; + let best_finalized_relay_block_at_target = env.best_finalized_relay_block_at_target().await?; + + // if we can't prove `required_header` even using `best_finalized_relay_block_at_source`, we + // can't do anything here + // (this shall not actually happen, given current code, because we only require finalized + // headers) + let best_possible_parachain_block = env + .best_finalized_para_block_at_source(best_finalized_relay_block_at_source) + .await? + .filter(|best_possible_parachain_block| { + best_possible_parachain_block.number() >= required_parachain_header + }) + .ok_or(SubstrateError::MissingRequiredParachainHead( + env.parachain_id(), + required_parachain_header.unique_saturated_into(), + ))?; + + // now let's check if `required_header` may be proved using + // `best_finalized_relay_block_at_target` + let selection = env + .best_finalized_para_block_at_source(best_finalized_relay_block_at_target) + .await? + .filter(|best_finalized_para_block_at_target| { + best_finalized_para_block_at_target.number() >= required_parachain_header + }) + .map(|best_finalized_para_block_at_target| { + (false, best_finalized_relay_block_at_target, best_finalized_para_block_at_target) + }) + // we don't require source node to be archive, so we can't craft storage proofs using + // ancient headers. So if the `best_finalized_relay_block_at_target` is too ancient, we + // can't craft storage proofs using it + .filter(|(_, selected_relay_block, _)| { + let difference = best_finalized_relay_block_at_source + .number() + .saturating_sub(selected_relay_block.number()); + difference <= RBN::from(ANCIENT_BLOCK_THRESHOLD) + }); + + Ok(selection.unwrap_or(( + true, + best_finalized_relay_block_at_source, + best_possible_parachain_block, + ))) +} + #[cfg(test)] mod tests { use super::*; @@ -705,4 +925,80 @@ mod tests { RelayState::RelayingRelayHeader(800), ); } + + // tuple is: + // + // - best_finalized_relay_block_at_source + // - best_finalized_relay_block_at_target + // - best_finalized_para_block_at_source at best_finalized_relay_block_at_source + // - best_finalized_para_block_at_source at best_finalized_relay_block_at_target + #[async_trait] + impl SelectHeadersToProveEnvironment for (u32, u32, u32, u32) { + fn parachain_id(&self) -> ParaId { + ParaId(0) + } + + async fn best_finalized_relay_block_at_source( + &self, + ) -> Result, SubstrateError> { + Ok(HeaderId(self.0, self.0)) + } + + async fn best_finalized_relay_block_at_target( + &self, + ) -> Result, SubstrateError> { + Ok(HeaderId(self.1, self.1)) + } + + async fn best_finalized_para_block_at_source( + &self, + at_relay_block: HeaderId, + ) -> Result>, SubstrateError> { + if at_relay_block.0 == self.0 { + Ok(Some(HeaderId(self.2, self.2))) + } else if at_relay_block.0 == self.1 { + Ok(Some(HeaderId(self.3, self.3))) + } else { + Ok(None) + } + } + } + + #[async_std::test] + async fn select_headers_to_prove_returns_err_if_required_para_block_is_missing_at_source() { + assert!(matches!( + select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 300_u32,).await, + Err(SubstrateError::MissingRequiredParachainHead(ParaId(0), 300_u64)), + )); + } + + #[async_std::test] + async fn select_headers_to_prove_fails_to_use_existing_ancient_relay_block() { + assert_eq!( + select_headers_to_prove((220_u32, 10_u32, 200_u32, 100_u32), 100_u32,) + .await + .map_err(drop), + Ok((true, HeaderId(220, 220), HeaderId(200, 200))), + ); + } + + #[async_std::test] + async fn select_headers_to_prove_is_able_to_use_existing_recent_relay_block() { + assert_eq!( + select_headers_to_prove((40_u32, 10_u32, 200_u32, 100_u32), 100_u32,) + .await + .map_err(drop), + Ok((false, HeaderId(10, 10), HeaderId(100, 100))), + ); + } + + #[async_std::test] + async fn select_headers_to_prove_uses_new_relay_block() { + assert_eq!( + select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 200_u32,) + .await + .map_err(drop), + Ok((true, HeaderId(20, 20), HeaderId(200, 200))), + ); + } } diff --git a/relays/messages/src/message_lane_loop.rs b/relays/messages/src/message_lane_loop.rs index 6b28dcbaa6..f4f91d50dd 100644 --- a/relays/messages/src/message_lane_loop.rs +++ b/relays/messages/src/message_lane_loop.rs @@ -109,9 +109,28 @@ pub struct NoncesSubmitArtifacts { pub tx_tracker: T, } +/// Batch transaction that already submit some headers and needs to be extended with +/// messages/delivery proof before sending. +#[async_trait] +pub trait BatchTransaction: Send { + /// Header that was required in the original call and which is bundled within this + /// batch transaction. + fn required_header_id(&self) -> HeaderId; + + /// Append proof and send transaction to the connected node. + async fn append_proof_and_send(self, proof: Proof) -> Result; +} + /// Source client trait. #[async_trait] pub trait SourceClient: RelayClient { + /// Type of batch transaction that submits finality and message receiving proof. + type BatchTransaction: BatchTransaction< + TargetHeaderIdOf

, + P::MessagesReceivingProof, + Self::TransactionTracker, + Self::Error, + >; /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker>; @@ -156,12 +175,31 @@ pub trait SourceClient: RelayClient { ) -> Result; /// We need given finalized target header on source to continue synchronization. - async fn require_target_header_on_source(&self, id: TargetHeaderIdOf

); + /// + /// We assume that the absence of header `id` has already been checked by caller. + /// + /// The client may return `Some(_)`, which means that nothing has happened yet and + /// the caller must generate and append message receiving proof to the batch transaction + /// to actually send it (along with required header) to the node. + /// + /// If function has returned `None`, it means that the caller now must wait for the + /// appearance of the target header `id` at the source client. + async fn require_target_header_on_source( + &self, + id: TargetHeaderIdOf

, + ) -> Result, Self::Error>; } /// Target client trait. #[async_trait] pub trait TargetClient: RelayClient { + /// Type of batch transaction that submits finality and messages proof. + type BatchTransaction: BatchTransaction< + SourceHeaderIdOf

, + P::MessagesProof, + Self::TransactionTracker, + Self::Error, + >; /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker>; @@ -201,7 +239,17 @@ pub trait TargetClient: RelayClient { ) -> Result, Self::Error>; /// We need given finalized source header on target to continue synchronization. - async fn require_source_header_on_target(&self, id: SourceHeaderIdOf

); + /// + /// The client may return `Some(_)`, which means that nothing has happened yet and + /// the caller must generate and append messages proof to the batch transaction + /// to actually send it (along with required header) to the node. + /// + /// If function has returned `None`, it means that the caller now must wait for the + /// appearance of the source header `id` at the target client. + async fn require_source_header_on_target( + &self, + id: SourceHeaderIdOf

, + ) -> Result, Self::Error>; } /// State of the client. @@ -483,6 +531,61 @@ pub(crate) mod tests { type TargetHeaderHash = TestTargetHeaderHash; } + #[derive(Clone, Debug)] + pub struct TestMessagesBatchTransaction { + data: Arc>, + required_header_id: TestSourceHeaderId, + tx_tracker: TestTransactionTracker, + } + + #[async_trait] + impl BatchTransaction + for TestMessagesBatchTransaction + { + fn required_header_id(&self) -> TestSourceHeaderId { + self.required_header_id + } + + async fn append_proof_and_send( + self, + proof: TestMessagesProof, + ) -> Result { + let mut data = self.data.lock(); + data.receive_messages(proof); + Ok(self.tx_tracker) + } + } + + #[derive(Clone, Debug)] + pub struct TestConfirmationBatchTransaction { + data: Arc>, + required_header_id: TestTargetHeaderId, + tx_tracker: TestTransactionTracker, + } + + #[async_trait] + impl + BatchTransaction< + TestTargetHeaderId, + TestMessagesReceivingProof, + TestTransactionTracker, + TestError, + > for TestConfirmationBatchTransaction + { + fn required_header_id(&self) -> TestTargetHeaderId { + self.required_header_id + } + + async fn append_proof_and_send( + self, + proof: TestMessagesReceivingProof, + ) -> Result { + let mut data = self.data.lock(); + data.receive_messages_delivery_proof(proof); + Ok(self.tx_tracker) + } + } + #[derive(Clone, Debug)] pub struct TestTransactionTracker(TrackedTransactionStatus); @@ -517,8 +620,10 @@ pub(crate) mod tests { target_latest_confirmed_received_nonce: MessageNonce, target_tracked_transaction_status: TrackedTransactionStatus, submitted_messages_proofs: Vec, + target_to_source_batch_transaction: Option, target_to_source_header_required: Option, target_to_source_header_requirements: Vec, + source_to_target_batch_transaction: Option, source_to_target_header_required: Option, source_to_target_header_requirements: Vec, } @@ -546,14 +651,38 @@ pub(crate) mod tests { Default::default(), )), submitted_messages_proofs: Vec::new(), + target_to_source_batch_transaction: None, target_to_source_header_required: None, target_to_source_header_requirements: Vec::new(), + source_to_target_batch_transaction: None, source_to_target_header_required: None, source_to_target_header_requirements: Vec::new(), } } } + impl TestClientData { + fn receive_messages(&mut self, proof: TestMessagesProof) { + self.target_state.best_self = + HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1); + self.target_state.best_finalized_self = self.target_state.best_self; + self.target_latest_received_nonce = *proof.0.end(); + if let Some(target_latest_confirmed_received_nonce) = proof.1 { + self.target_latest_confirmed_received_nonce = + target_latest_confirmed_received_nonce; + } + self.submitted_messages_proofs.push(proof); + } + + fn receive_messages_delivery_proof(&mut self, proof: TestMessagesReceivingProof) { + self.source_state.best_self = + HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1); + self.source_state.best_finalized_self = self.source_state.best_self; + self.submitted_messages_receiving_proofs.push(proof); + self.source_latest_confirmed_received_nonce = proof; + } + } + #[derive(Clone)] pub struct TestSourceClient { data: Arc>, @@ -588,6 +717,7 @@ pub(crate) mod tests { #[async_trait] impl SourceClient for TestSourceClient { + type BatchTransaction = TestConfirmationBatchTransaction; type TransactionTracker = TestTransactionTracker; async fn state(&self) -> Result, TestError> { @@ -675,21 +805,25 @@ pub(crate) mod tests { ) -> Result { let mut data = self.data.lock(); (self.tick)(&mut data); - data.source_state.best_self = - HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1); - data.source_state.best_finalized_self = data.source_state.best_self; - data.submitted_messages_receiving_proofs.push(proof); - data.source_latest_confirmed_received_nonce = proof; + data.receive_messages_delivery_proof(proof); (self.post_tick)(&mut data); Ok(TestTransactionTracker(data.source_tracked_transaction_status)) } - async fn require_target_header_on_source(&self, id: TargetHeaderIdOf) { + async fn require_target_header_on_source( + &self, + id: TargetHeaderIdOf, + ) -> Result, Self::Error> { let mut data = self.data.lock(); data.target_to_source_header_required = Some(id); data.target_to_source_header_requirements.push(id); (self.tick)(&mut data); (self.post_tick)(&mut data); + + Ok(data.target_to_source_batch_transaction.take().map(|mut tx| { + tx.required_header_id = id; + tx + })) } } @@ -727,6 +861,7 @@ pub(crate) mod tests { #[async_trait] impl TargetClient for TestTargetClient { + type BatchTransaction = TestMessagesBatchTransaction; type TransactionTracker = TestTransactionTracker; async fn state(&self) -> Result, TestError> { @@ -798,15 +933,7 @@ pub(crate) mod tests { if data.is_target_fails { return Err(TestError) } - data.target_state.best_self = - HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1); - data.target_state.best_finalized_self = data.target_state.best_self; - data.target_latest_received_nonce = *proof.0.end(); - if let Some(target_latest_confirmed_received_nonce) = proof.1 { - data.target_latest_confirmed_received_nonce = - target_latest_confirmed_received_nonce; - } - data.submitted_messages_proofs.push(proof); + data.receive_messages(proof); (self.post_tick)(&mut data); Ok(NoncesSubmitArtifacts { nonces, @@ -814,17 +941,25 @@ pub(crate) mod tests { }) } - async fn require_source_header_on_target(&self, id: SourceHeaderIdOf) { + async fn require_source_header_on_target( + &self, + id: SourceHeaderIdOf, + ) -> Result, Self::Error> { let mut data = self.data.lock(); data.source_to_target_header_required = Some(id); data.source_to_target_header_requirements.push(id); (self.tick)(&mut data); (self.post_tick)(&mut data); + + Ok(data.source_to_target_batch_transaction.take().map(|mut tx| { + tx.required_header_id = id; + tx + })) } } fn run_loop_test( - data: TestClientData, + data: Arc>, source_tick: Arc, source_post_tick: Arc, target_tick: Arc, @@ -832,8 +967,6 @@ pub(crate) mod tests { exit_signal: impl Future + 'static + Send, ) -> TestClientData { async_std::task::block_on(async { - let data = Arc::new(Mutex::new(data)); - let source_client = TestSourceClient { data: data.clone(), tick: source_tick, @@ -876,7 +1009,7 @@ pub(crate) mod tests { // able to deliver messages. let (exit_sender, exit_receiver) = unbounded(); let result = run_loop_test( - TestClientData { + Arc::new(Mutex::new(TestClientData { is_source_fails: true, source_state: ClientState { best_self: HeaderId(0, 0), @@ -893,7 +1026,7 @@ pub(crate) mod tests { }, target_latest_received_nonce: 0, ..Default::default() - }, + })), Arc::new(|data: &mut TestClientData| { if data.is_source_reconnected { data.is_source_fails = false; @@ -929,7 +1062,7 @@ pub(crate) mod tests { let (source_exit_sender, exit_receiver) = unbounded(); let target_exit_sender = source_exit_sender.clone(); let result = run_loop_test( - TestClientData { + Arc::new(Mutex::new(TestClientData { source_state: ClientState { best_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0), @@ -947,7 +1080,7 @@ pub(crate) mod tests { target_latest_received_nonce: 0, target_tracked_transaction_status: TrackedTransactionStatus::Lost, ..Default::default() - }, + })), Arc::new(move |data: &mut TestClientData| { if data.is_source_reconnected { data.source_tracked_transaction_status = @@ -980,7 +1113,7 @@ pub(crate) mod tests { // their corresponding nonce won't be udpated => reconnect will happen let (exit_sender, exit_receiver) = unbounded(); let result = run_loop_test( - TestClientData { + Arc::new(Mutex::new(TestClientData { source_state: ClientState { best_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0), @@ -996,7 +1129,7 @@ pub(crate) mod tests { }, target_latest_received_nonce: 0, ..Default::default() - }, + })), Arc::new(move |data: &mut TestClientData| { // blocks are produced on every tick data.source_state.best_self = @@ -1054,7 +1187,7 @@ pub(crate) mod tests { fn message_lane_loop_works() { let (exit_sender, exit_receiver) = unbounded(); let result = run_loop_test( - TestClientData { + Arc::new(Mutex::new(TestClientData { source_state: ClientState { best_self: HeaderId(10, 10), best_finalized_self: HeaderId(10, 10), @@ -1070,7 +1203,7 @@ pub(crate) mod tests { }, target_latest_received_nonce: 0, ..Default::default() - }, + })), Arc::new(|data: &mut TestClientData| { // blocks are produced on every tick data.source_state.best_self = @@ -1133,4 +1266,74 @@ pub(crate) mod tests { assert!(!result.target_to_source_header_requirements.is_empty()); assert!(!result.source_to_target_header_requirements.is_empty()); } + + #[test] + fn message_lane_loop_works_with_batch_transactions() { + let (exit_sender, exit_receiver) = unbounded(); + let original_data = Arc::new(Mutex::new(TestClientData { + source_state: ClientState { + best_self: HeaderId(10, 10), + best_finalized_self: HeaderId(10, 10), + best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), + }, + source_latest_generated_nonce: 10, + target_state: ClientState { + best_self: HeaderId(0, 0), + best_finalized_self: HeaderId(0, 0), + best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), + }, + target_latest_received_nonce: 0, + ..Default::default() + })); + let target_original_data = original_data.clone(); + let source_original_data = original_data.clone(); + let result = run_loop_test( + original_data, + Arc::new(|_| {}), + Arc::new(move |data: &mut TestClientData| { + if let Some(target_to_source_header_required) = + data.target_to_source_header_required.take() + { + data.target_to_source_batch_transaction = + Some(TestConfirmationBatchTransaction { + data: source_original_data.clone(), + required_header_id: target_to_source_header_required, + tx_tracker: TestTransactionTracker::default(), + }) + } + }), + Arc::new(|_| {}), + Arc::new(move |data: &mut TestClientData| { + if let Some(source_to_target_header_required) = + data.source_to_target_header_required.take() + { + data.source_to_target_batch_transaction = Some(TestMessagesBatchTransaction { + data: target_original_data.clone(), + required_header_id: source_to_target_header_required, + tx_tracker: TestTransactionTracker::default(), + }) + } + + if data.source_latest_confirmed_received_nonce == 10 { + exit_sender.unbounded_send(()).unwrap(); + } + }), + exit_receiver.into_future().map(|(_, _)| ()), + ); + + // there are no strict restrictions on when reward confirmation should come + // (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation + // depends on the state of both clients) + // => we do not check it here + assert_eq!(result.submitted_messages_proofs[0].0, 1..=4); + assert_eq!(result.submitted_messages_proofs[1].0, 5..=8); + assert_eq!(result.submitted_messages_proofs[2].0, 9..=10); + assert!(!result.submitted_messages_receiving_proofs.is_empty()); + + // check that we have at least once required new source->target or target->source headers + assert!(!result.target_to_source_header_requirements.is_empty()); + assert!(!result.source_to_target_header_requirements.is_empty()); + } } diff --git a/relays/messages/src/message_race_delivery.rs b/relays/messages/src/message_race_delivery.rs index b49a05dac5..660eb333dc 100644 --- a/relays/messages/src/message_race_delivery.rs +++ b/relays/messages/src/message_race_delivery.rs @@ -171,9 +171,13 @@ where { type Error = C::Error; type TargetNoncesData = DeliveryRaceTargetNoncesData; + type BatchTransaction = C::BatchTransaction; type TransactionTracker = C::TransactionTracker; - async fn require_source_header(&self, id: SourceHeaderIdOf

) { + async fn require_source_header( + &self, + id: SourceHeaderIdOf

, + ) -> Result, Self::Error> { self.client.require_source_header_on_target(id).await } diff --git a/relays/messages/src/message_race_loop.rs b/relays/messages/src/message_race_loop.rs index 4f59b635ae..0da2d3baf7 100644 --- a/relays/messages/src/message_race_loop.rs +++ b/relays/messages/src/message_race_loop.rs @@ -20,7 +20,7 @@ //! associated data - like messages, lane state, etc) to the target node by //! generating and submitting proof. -use crate::message_lane_loop::{ClientState, NoncesSubmitArtifacts}; +use crate::message_lane_loop::{BatchTransaction, ClientState, NoncesSubmitArtifacts}; use async_trait::async_trait; use bp_messages::MessageNonce; @@ -127,12 +127,29 @@ pub trait TargetClient { type Error: std::fmt::Debug + MaybeConnectionError; /// Type of the additional data from the target client, used by the race. type TargetNoncesData: std::fmt::Debug; + /// Type of batch transaction that submits finality and proof to the target node. + type BatchTransaction: BatchTransaction< + P::SourceHeaderId, + P::Proof, + Self::TransactionTracker, + Self::Error, + >; /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker; /// Ask headers relay to relay finalized headers up to (and including) given header /// from race source to race target. - async fn require_source_header(&self, id: P::SourceHeaderId); + /// + /// The client may return `Some(_)`, which means that nothing has happened yet and + /// the caller must generate and append proof to the batch transaction + /// to actually send it (along with required header) to the node. + /// + /// If function has returned `None`, it means that the caller now must wait for the + /// appearance of the required header `id` at the target client. + async fn require_source_header( + &self, + id: P::SourceHeaderId, + ) -> Result, Self::Error>; /// Return nonces that are known to the target client. async fn nonces( @@ -242,6 +259,7 @@ pub async fn run, TC: TargetClient

>( let mut source_retry_backoff = retry_backoff(); let mut source_client_is_online = true; let mut source_nonces_required = false; + let mut source_required_header = None; let source_nonces = futures::future::Fuse::terminated(); let source_generate_proof = futures::future::Fuse::terminated(); let source_go_offline_future = futures::future::Fuse::terminated(); @@ -250,6 +268,8 @@ pub async fn run, TC: TargetClient

>( let mut target_client_is_online = true; let mut target_best_nonces_required = false; let mut target_finalized_nonces_required = false; + let mut target_batch_transaction = None; + let target_require_source_header = futures::future::Fuse::terminated(); let target_best_nonces = futures::future::Fuse::terminated(); let target_finalized_nonces = futures::future::Fuse::terminated(); let target_submit_proof = futures::future::Fuse::terminated(); @@ -262,6 +282,7 @@ pub async fn run, TC: TargetClient

>( source_generate_proof, source_go_offline_future, race_target_updated, + target_require_source_header, target_best_nonces, target_finalized_nonces, target_submit_proof, @@ -326,13 +347,10 @@ pub async fn run, TC: TargetClient

>( ).fail_if_connection_error(FailedClient::Source)?; // ask for more headers if we have nonces to deliver and required headers are missing - let required_source_header_id = race_state + source_required_header = race_state .best_finalized_source_header_id_at_best_target .as_ref() - .and_then(|best|strategy.required_source_header_at_target(best)); - if let Some(required_source_header_id) = required_source_header_id { - race_target.require_source_header(required_source_header_id).await; - } + .and_then(|best| strategy.required_source_header_at_target(best)); }, nonces = target_best_nonces => { target_best_nonces_required = false; @@ -378,6 +396,28 @@ pub async fn run, TC: TargetClient

>( }, // proof generation and submission + maybe_batch_transaction = target_require_source_header => { + source_required_header = None; + + target_client_is_online = process_future_result( + maybe_batch_transaction, + &mut target_retry_backoff, + |maybe_batch_transaction: Option| { + log::debug!( + target: "bridge", + "Target {} client has been asked for more {} headers. Batch tx: {:?}", + P::target_name(), + P::source_name(), + maybe_batch_transaction.is_some(), + ); + + target_batch_transaction = maybe_batch_transaction; + }, + &mut target_go_offline_future, + async_std::task::sleep, + || format!("Error asking for source headers at {}", P::target_name()), + ).fail_if_connection_error(FailedClient::Target)?; + }, proof = source_generate_proof => { source_client_is_online = process_future_result( proof, @@ -409,6 +449,7 @@ pub async fn run, TC: TargetClient

>( P::target_name(), ); + target_batch_transaction = None; race_state.nonces_to_submit = None; race_state.nonces_submitted = Some(artifacts.nonces); target_tx_tracker.set(artifacts.tx_tracker.wait().fuse()); @@ -479,8 +520,23 @@ pub async fn run, TC: TargetClient

>( if source_client_is_online { source_client_is_online = false; + // if we've started to submit batch transaction, let's prioritize it + let expected_race_state = + if let Some(ref target_batch_transaction) = target_batch_transaction { + // when selecting nonces for the batch transaction, we assume that the required + // source header is already at the target chain + let required_source_header_at_target = + target_batch_transaction.required_header_id(); + let mut expected_race_state = race_state.clone(); + expected_race_state.best_finalized_source_header_id_at_best_target = + Some(required_source_header_at_target); + expected_race_state + } else { + race_state.clone() + }; + let nonces_to_deliver = - select_nonces_to_deliver(race_state.clone(), &mut strategy).await; + select_nonces_to_deliver(expected_race_state, &mut strategy).await; let best_at_source = strategy.best_at_source(); if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver { @@ -491,6 +547,7 @@ pub async fn run, TC: TargetClient

>( nonces_range, at_block, ); + source_generate_proof.set( race_source.generate_proof(at_block, nonces_range, proof_parameters).fuse(), ); @@ -518,17 +575,45 @@ pub async fn run, TC: TargetClient

>( target_client_is_online = false; if let Some((at_block, nonces_range, proof)) = race_state.nonces_to_submit.as_ref() { - log::debug!( - target: "bridge", - "Going to submit proof of messages in range {:?} to {} node", - nonces_range, - P::target_name(), - ); - target_submit_proof.set( - race_target - .submit_proof(at_block.clone(), nonces_range.clone(), proof.clone()) - .fuse(), - ); + if let Some(target_batch_transaction) = target_batch_transaction.take() { + log::debug!( + target: "bridge", + "Going to submit batch transaction with header {:?} and proof of messages in range {:?} to {} node", + target_batch_transaction.required_header_id(), + nonces_range, + P::target_name(), + ); + + let nonces = nonces_range.clone(); + target_submit_proof.set( + target_batch_transaction + .append_proof_and_send(proof.clone()) + .map(|result| { + result + .map(|tx_tracker| NoncesSubmitArtifacts { nonces, tx_tracker }) + }) + .left_future() + .fuse(), + ); + } else { + log::debug!( + target: "bridge", + "Going to submit proof of messages in range {:?} to {} node", + nonces_range, + P::target_name(), + ); + + target_submit_proof.set( + race_target + .submit_proof(at_block.clone(), nonces_range.clone(), proof.clone()) + .right_future() + .fuse(), + ); + } + } else if let Some(source_required_header) = source_required_header.clone() { + log::debug!(target: "bridge", "Going to require {} header {:?} at {}", P::source_name(), source_required_header, P::target_name()); + target_require_source_header + .set(race_target.require_source_header(source_required_header).fuse()); } else if target_best_nonces_required { log::debug!(target: "bridge", "Asking {} about best message nonces", P::target_name()); let at_block = race_state diff --git a/relays/messages/src/message_race_receiving.rs b/relays/messages/src/message_race_receiving.rs index c3d65d0e86..c807d76fe5 100644 --- a/relays/messages/src/message_race_receiving.rs +++ b/relays/messages/src/message_race_receiving.rs @@ -155,9 +155,13 @@ where { type Error = C::Error; type TargetNoncesData = (); + type BatchTransaction = C::BatchTransaction; type TransactionTracker = C::TransactionTracker; - async fn require_source_header(&self, id: TargetHeaderIdOf

) { + async fn require_source_header( + &self, + id: TargetHeaderIdOf

, + ) -> Result, Self::Error> { self.client.require_target_header_on_source(id).await } diff --git a/relays/utils/src/lib.rs b/relays/utils/src/lib.rs index 16e1a18bab..428ee33494 100644 --- a/relays/utils/src/lib.rs +++ b/relays/utils/src/lib.rs @@ -19,7 +19,7 @@ pub use bp_runtime::HeaderId; pub use error::Error; pub use relay_loop::{relay_loop, relay_metrics}; -pub use sp_runtime::traits::UniqueSaturatedInto; +pub use sp_runtime::traits::{UniqueSaturatedFrom, UniqueSaturatedInto}; use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff};