diff --git a/bridges/relays/ethereum/src/ethereum_client.rs b/bridges/relays/ethereum/src/ethereum_client.rs index 110c8733c03b..5dba31344b0a 100644 --- a/bridges/relays/ethereum/src/ethereum_client.rs +++ b/bridges/relays/ethereum/src/ethereum_client.rs @@ -284,7 +284,7 @@ pub async fn submit_substrate_headers( Some(contract_address), Some(nonce), false, - bridge_contract::functions::import_header::encode_input(header.extract().0.encode(),), + bridge_contract::functions::import_header::encode_input(header.header().encode(),), ) .await ) diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index c9e8a987f018..95f415a75322 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -21,8 +21,10 @@ use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Heade use crate::substrate_client::{self, SubstrateConnectionParams, SubstrateSigningParams}; use crate::sync::{HeadersSyncParams, TargetTransactionMode}; use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; -use futures::future::{ready, FutureExt, Ready}; -use std::{collections::HashSet, future::Future, pin::Pin, time::Duration}; + +use async_trait::async_trait; +use futures::future::FutureExt; +use std::{collections::HashSet, time::Duration}; use web3::types::H256; /// Interval at which we check new Ethereum headers when we are synced/almost synced. @@ -80,40 +82,40 @@ struct EthereumHeadersSource { type EthereumFutureOutput = OwnedSourceFutureOutput; +#[async_trait] impl SourceClient for EthereumHeadersSource { type Error = ethereum_client::Error; - type BestBlockNumberFuture = Pin>>>; - type HeaderByHashFuture = Pin>>>; - type HeaderByNumberFuture = Pin>>>; - type HeaderExtraFuture = Pin)>>>>; - type HeaderCompletionFuture = Ready)>>; - fn best_block_number(self) -> Self::BestBlockNumberFuture { + async fn best_block_number(self) -> EthereumFutureOutput { ethereum_client::best_block_number(self.client) .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .boxed() + .await } - fn header_by_hash(self, hash: H256) -> Self::HeaderByHashFuture { + async fn header_by_hash(self, hash: H256) -> EthereumFutureOutput
{ ethereum_client::header_by_hash(self.client, hash) .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .boxed() + .await } - fn header_by_number(self, number: u64) -> Self::HeaderByNumberFuture { + async fn header_by_number(self, number: u64) -> EthereumFutureOutput
{ ethereum_client::header_by_number(self.client, number) .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .boxed() + .await } - fn header_extra(self, id: EthereumHeaderId, header: &Header) -> Self::HeaderExtraFuture { - ethereum_client::transactions_receipts(self.client, id, header.transactions.clone()) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .boxed() + async fn header_completion(self, id: EthereumHeaderId) -> EthereumFutureOutput<(EthereumHeaderId, Option<()>)> { + (self, Ok((id, None))) } - fn header_completion(self, id: EthereumHeaderId) -> Self::HeaderCompletionFuture { - ready((self, Ok((id, None)))) + async fn header_extra( + self, + id: EthereumHeaderId, + header: QueuedEthereumHeader, + ) -> EthereumFutureOutput<(EthereumHeaderId, Vec)> { + ethereum_client::transactions_receipts(self.client, id, header.header().transactions.clone()) + .map(|(client, result)| (EthereumHeadersSource { client }, result)) + .await } } @@ -129,16 +131,11 @@ struct SubstrateHeadersTarget { type SubstrateFutureOutput = OwnedTargetFutureOutput; +#[async_trait] impl TargetClient for SubstrateHeadersTarget { type Error = substrate_client::Error; - type BestHeaderIdFuture = Pin>>>; - type IsKnownHeaderFuture = Pin>>>; - type RequiresExtraFuture = Pin>>>; - type SubmitHeadersFuture = Pin>>>>; - type IncompleteHeadersFuture = Ready>>; - type CompleteHeadersFuture = Ready>; - - fn best_header_id(self) -> Self::BestHeaderIdFuture { + + async fn best_header_id(self) -> SubstrateFutureOutput { let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); substrate_client::best_ethereum_block(self.client) .map(move |(client, result)| { @@ -151,10 +148,10 @@ impl TargetClient for SubstrateHeadersTarget { result, ) }) - .boxed() + .await } - fn is_known_header(self, id: EthereumHeaderId) -> Self::IsKnownHeaderFuture { + async fn is_known_header(self, id: EthereumHeaderId) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> { let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); substrate_client::ethereum_header_known(self.client, id) .map(move |(client, result)| { @@ -167,15 +164,12 @@ impl TargetClient for SubstrateHeadersTarget { result, ) }) - .boxed() + .await } - fn requires_extra(self, header: &QueuedEthereumHeader) -> Self::RequiresExtraFuture { - // we can minimize number of receipts_check calls by checking header - // logs bloom here, but it may give us false positives (when authorities - // source is contract, we never need any logs) + async fn submit_headers(self, headers: Vec) -> SubstrateFutureOutput> { let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::ethereum_receipts_required(self.client, header.clone()) + substrate_client::submit_ethereum_headers(self.client, sign_params.clone(), headers, sign_transactions) .map(move |(client, result)| { ( SubstrateHeadersTarget { @@ -186,12 +180,23 @@ impl TargetClient for SubstrateHeadersTarget { result, ) }) - .boxed() + .await + } + + async fn incomplete_headers_ids(self) -> SubstrateFutureOutput> { + (self, Ok(HashSet::new())) } - fn submit_headers(self, headers: Vec) -> Self::SubmitHeadersFuture { + async fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> SubstrateFutureOutput { + (self, Ok(id)) + } + + async fn requires_extra(self, header: QueuedEthereumHeader) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> { + // we can minimize number of receipts_check calls by checking header + // logs bloom here, but it may give us false positives (when authorities + // source is contract, we never need any logs) let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::submit_ethereum_headers(self.client, sign_params.clone(), headers, sign_transactions) + substrate_client::ethereum_receipts_required(self.client, header) .map(move |(client, result)| { ( SubstrateHeadersTarget { @@ -202,15 +207,7 @@ impl TargetClient for SubstrateHeadersTarget { result, ) }) - .boxed() - } - - fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture { - ready((self, Ok(HashSet::new()))) - } - - fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> Self::CompleteHeadersFuture { - ready((self, Ok(id))) + .await } } diff --git a/bridges/relays/ethereum/src/substrate_client.rs b/bridges/relays/ethereum/src/substrate_client.rs index 0319c0b5ac82..e7067cbda3a0 100644 --- a/bridges/relays/ethereum/src/substrate_client.rs +++ b/bridges/relays/ethereum/src/substrate_client.rs @@ -380,10 +380,9 @@ fn create_signed_submit_transaction( headers .into_iter() .map(|header| { - let (header, receipts) = header.extract(); ( - into_substrate_ethereum_header(&header), - into_substrate_ethereum_receipts(&receipts), + into_substrate_ethereum_header(header.header()), + into_substrate_ethereum_receipts(header.extra()), ) }) .collect(), @@ -422,11 +421,10 @@ fn create_signed_submit_transaction( /// Create unsigned Substrate transaction for submitting Ethereum header. fn create_unsigned_submit_transaction(header: QueuedEthereumHeader) -> bridge_node_runtime::UncheckedExtrinsic { - let (header, receipts) = header.extract(); let function = bridge_node_runtime::Call::BridgeEthPoA(bridge_node_runtime::BridgeEthPoACall::import_unsigned_header( - into_substrate_ethereum_header(&header), - into_substrate_ethereum_receipts(&receipts), + into_substrate_ethereum_header(header.header()), + into_substrate_ethereum_receipts(header.extra()), )); bridge_node_runtime::UncheckedExtrinsic::new_unsigned(function) diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 88dc4b6276a4..7ca80f9672ad 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -25,8 +25,10 @@ use crate::substrate_types::{ use crate::sync::{HeadersSyncParams, TargetTransactionMode}; use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; use crate::sync_types::SourceHeader; -use futures::future::{ready, FutureExt, Ready}; -use std::{collections::HashSet, future::Future, pin::Pin, time::Duration}; + +use async_trait::async_trait; +use futures::future::FutureExt; +use std::{collections::HashSet, time::Duration}; /// Interval at which we check new Substrate headers when we are synced/almost synced. const SUBSTRATE_TICK_INTERVAL: Duration = Duration::from_secs(10); @@ -87,41 +89,43 @@ struct SubstrateHeadersSource { type SubstrateFutureOutput = OwnedSourceFutureOutput; +#[async_trait] impl SourceClient for SubstrateHeadersSource { type Error = substrate_client::Error; - type BestBlockNumberFuture = Pin>>>; - type HeaderByHashFuture = Pin>>>; - type HeaderByNumberFuture = Pin>>>; - type HeaderExtraFuture = Ready>; - type HeaderCompletionFuture = - Pin)>>>>; - - fn best_block_number(self) -> Self::BestBlockNumberFuture { + + async fn best_block_number(self) -> SubstrateFutureOutput { substrate_client::best_header(self.client) .map(|(client, result)| (SubstrateHeadersSource { client }, result.map(|header| header.number))) - .boxed() + .await } - fn header_by_hash(self, hash: Hash) -> Self::HeaderByHashFuture { + async fn header_by_hash(self, hash: Hash) -> SubstrateFutureOutput
{ substrate_client::header_by_hash(self.client, hash) .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .boxed() + .await } - fn header_by_number(self, number: Number) -> Self::HeaderByNumberFuture { + async fn header_by_number(self, number: Number) -> SubstrateFutureOutput
{ substrate_client::header_by_number(self.client, number) .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .boxed() - } - - fn header_extra(self, id: SubstrateHeaderId, _header: &Header) -> Self::HeaderExtraFuture { - ready((self, Ok((id, ())))) + .await } - fn header_completion(self, id: SubstrateHeaderId) -> Self::HeaderCompletionFuture { + async fn header_completion( + self, + id: SubstrateHeaderId, + ) -> SubstrateFutureOutput<(SubstrateHeaderId, Option)> { substrate_client::grandpa_justification(self.client, id) .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .boxed() + .await + } + + async fn header_extra( + self, + id: SubstrateHeaderId, + _header: QueuedSubstrateHeader, + ) -> SubstrateFutureOutput<(SubstrateHeaderId, ())> { + (self, Ok((id, ()))) } } @@ -137,16 +141,11 @@ struct EthereumHeadersTarget { type EthereumFutureOutput = OwnedTargetFutureOutput; +#[async_trait] impl TargetClient for EthereumHeadersTarget { type Error = ethereum_client::Error; - type BestHeaderIdFuture = Pin>>>; - type IsKnownHeaderFuture = Pin>>>; - type RequiresExtraFuture = Ready>; - type SubmitHeadersFuture = Pin>>>>; - type IncompleteHeadersFuture = Pin>>>>; - type CompleteHeadersFuture = Pin>>>; - - fn best_header_id(self) -> Self::BestHeaderIdFuture { + + async fn best_header_id(self) -> EthereumFutureOutput { let (contract, sign_params) = (self.contract, self.sign_params); ethereum_client::best_substrate_block(self.client, contract) .map(move |(client, result)| { @@ -159,10 +158,10 @@ impl TargetClient for EthereumHeadersTarget { result, ) }) - .boxed() + .await } - fn is_known_header(self, id: SubstrateHeaderId) -> Self::IsKnownHeaderFuture { + async fn is_known_header(self, id: SubstrateHeaderId) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> { let (contract, sign_params) = (self.contract, self.sign_params); ethereum_client::substrate_header_known(self.client, contract, id) .map(move |(client, result)| { @@ -175,14 +174,10 @@ impl TargetClient for EthereumHeadersTarget { result, ) }) - .boxed() - } - - fn requires_extra(self, header: &QueuedSubstrateHeader) -> Self::RequiresExtraFuture { - ready((self, Ok((header.header().id(), false)))) + .await } - fn submit_headers(self, headers: Vec) -> Self::SubmitHeadersFuture { + async fn submit_headers(self, headers: Vec) -> EthereumFutureOutput> { let (contract, sign_params) = (self.contract, self.sign_params); ethereum_client::submit_substrate_headers(self.client, sign_params.clone(), contract, headers) .map(move |(client, result)| { @@ -195,10 +190,10 @@ impl TargetClient for EthereumHeadersTarget { result, ) }) - .boxed() + .await } - fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture { + async fn incomplete_headers_ids(self) -> EthereumFutureOutput> { let (contract, sign_params) = (self.contract, self.sign_params); ethereum_client::incomplete_substrate_headers(self.client, contract) .map(move |(client, result)| { @@ -211,10 +206,14 @@ impl TargetClient for EthereumHeadersTarget { result, ) }) - .boxed() + .await } - fn complete_header(self, id: SubstrateHeaderId, completion: GrandpaJustification) -> Self::CompleteHeadersFuture { + async fn complete_header( + self, + id: SubstrateHeaderId, + completion: GrandpaJustification, + ) -> EthereumFutureOutput { let (contract, sign_params) = (self.contract, self.sign_params); ethereum_client::complete_substrate_header(self.client, sign_params.clone(), contract, id, completion) .map(move |(client, result)| { @@ -227,7 +226,11 @@ impl TargetClient for EthereumHeadersTarget { result, ) }) - .boxed() + .await + } + + async fn requires_extra(self, header: QueuedSubstrateHeader) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> { + (self, Ok((header.header().id(), false))) } } diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index ce89a846a0fb..978db603591d 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -16,11 +16,12 @@ use crate::sync::HeadersSyncParams; use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader}; + +use async_trait::async_trait; use futures::{future::FutureExt, stream::StreamExt}; use num_traits::{Saturating, Zero}; use std::{ collections::HashSet, - future::Future, time::{Duration, Instant}, }; @@ -48,69 +49,70 @@ pub type OwnedSourceFutureOutput = (Client, Result = (Client, Result>::Error>); /// Source client trait. +#[async_trait] pub trait SourceClient: Sized { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; - /// Future that returns best block number. - type BestBlockNumberFuture: Future>; - /// Future that returns header by hash. - type HeaderByHashFuture: Future>; - /// Future that returns header by number. - type HeaderByNumberFuture: Future>; - /// Future that returns extra data associated with header. - type HeaderExtraFuture: Future, P::Extra)>>; - /// Future that returns data required to 'complete' header. - type HeaderCompletionFuture: Future< - Output = OwnedSourceFutureOutput, Option)>, - >; /// Get best block number. - fn best_block_number(self) -> Self::BestBlockNumberFuture; + async fn best_block_number(self) -> OwnedSourceFutureOutput; + /// Get header by hash. - fn header_by_hash(self, hash: P::Hash) -> Self::HeaderByHashFuture; + async fn header_by_hash(self, hash: P::Hash) -> OwnedSourceFutureOutput; + /// Get canonical header by number. - fn header_by_number(self, number: P::Number) -> Self::HeaderByNumberFuture; - /// Get extra data by header hash. - fn header_extra(self, id: HeaderId, header: &P::Header) -> Self::HeaderExtraFuture; + async fn header_by_number(self, number: P::Number) -> OwnedSourceFutureOutput; + /// Get completion data by header hash. - fn header_completion(self, id: HeaderId) -> Self::HeaderCompletionFuture; + async fn header_completion( + self, + id: HeaderId, + ) -> OwnedSourceFutureOutput, Option)>; + + /// Get extra data by header hash. + async fn header_extra( + self, + id: HeaderId, + header: QueuedHeader

, + ) -> OwnedSourceFutureOutput, P::Extra)>; } /// Target client trait. +#[async_trait] pub trait TargetClient: Sized { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; - /// Future that returns best header id. - type BestHeaderIdFuture: Future>>; - /// Future that returns known header check result. - type IsKnownHeaderFuture: Future, bool)>>; - /// Future that returns extra check result. - type RequiresExtraFuture: Future, bool)>>; - /// Future that returns header submission result. - type SubmitHeadersFuture: Future>>>; - /// Future that returns incomplete headers ids. - type IncompleteHeadersFuture: Future< - Output = OwnedTargetFutureOutput>>, - >; - /// Future that returns header completion result. - type CompleteHeadersFuture: Future>>; /// Returns ID of best header known to the target node. - fn best_header_id(self) -> Self::BestHeaderIdFuture; + async fn best_header_id(self) -> OwnedTargetFutureOutput>; + /// Returns true if header is known to the target node. - fn is_known_header(self, id: HeaderId) -> Self::IsKnownHeaderFuture; - /// Returns true if header requires extra data to be submitted. - fn requires_extra(self, header: &QueuedHeader

) -> Self::RequiresExtraFuture; + async fn is_known_header( + self, + id: HeaderId, + ) -> OwnedTargetFutureOutput, bool)>; + /// Submit headers. - fn submit_headers(self, headers: Vec>) -> Self::SubmitHeadersFuture; + async fn submit_headers( + self, + headers: Vec>, + ) -> OwnedTargetFutureOutput>>; + /// Returns ID of headers that require to be 'completed' before children can be submitted. - fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture; + async fn incomplete_headers_ids(self) -> OwnedTargetFutureOutput>>; + /// Submit completion data for header. - fn complete_header( + async fn complete_header( self, id: HeaderId, completion: P::Completion, - ) -> Self::CompleteHeadersFuture; + ) -> OwnedTargetFutureOutput>; + + /// Returns true if header requires extra data to be submitted. + async fn requires_extra( + self, + header: QueuedHeader

, + ) -> OwnedTargetFutureOutput, bool)>; } /// Run headers synchronization. @@ -385,7 +387,7 @@ pub fn run( header.id(), ); - target_extra_check_future.set(target_client.requires_extra(header).fuse()); + target_extra_check_future.set(target_client.requires_extra(header.clone()).fuse()); } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeOrphan) { // for MaybeOrphan we actually ask for parent' header existence let parent_id = header.parent_id(); @@ -452,7 +454,7 @@ pub fn run( "Retrieving extra data for header: {:?}", id, ); - source_extra_future.set(source_client.header_extra(id, header.header()).fuse()); + source_extra_future.set(source_client.header_extra(id, header.clone()).fuse()); } else if let Some(header) = sync.headers().header(HeaderStatus::Orphan) { // for Orphan we actually ask for parent' header let parent_id = header.parent_id(); diff --git a/bridges/relays/ethereum/src/sync_types.rs b/bridges/relays/ethereum/src/sync_types.rs index 8d68bd8759c3..391d18afab39 100644 --- a/bridges/relays/ethereum/src/sync_types.rs +++ b/bridges/relays/ethereum/src/sync_types.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +use std::{ops::Deref, sync::Arc}; + /// Ethereum header Id. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] pub struct HeaderId(pub Number, pub Hash); @@ -70,7 +72,7 @@ pub trait HeadersSyncPipeline: Clone + Copy { + num_traits::Zero + num_traits::One; /// Type of header that we're syncing. - type Header: Clone + std::fmt::Debug + SourceHeader; + type Header: Clone + std::fmt::Debug + PartialEq + SourceHeader; /// Type of extra data for the header that we're receiving from the source node: /// 1) extra data is required for some headers; /// 2) target node may answer if it'll require extra data before header is submitted; @@ -78,7 +80,7 @@ pub trait HeadersSyncPipeline: Clone + Copy { /// 4) header and extra data are submitted in single transaction. /// /// Example: Ethereum transactions receipts. - type Extra: Clone + std::fmt::Debug; + type Extra: Clone + PartialEq + std::fmt::Debug; /// Type of data required to 'complete' header that we're receiving from the source node: /// 1) completion data is required for some headers; /// 2) target node can't answer if it'll require completion data before header is accepted; @@ -101,19 +103,44 @@ pub trait SourceHeader { } /// Header how it's stored in the synchronization queue. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] #[cfg_attr(test, derive(PartialEq))] -pub struct QueuedHeader { - header: P::Header, - extra: Option, -} +pub struct QueuedHeader(Arc>); impl QueuedHeader

{ /// Creates new queued header. pub fn new(header: P::Header) -> Self { - QueuedHeader { header, extra: None } + QueuedHeader(Arc::new(QueuedHeaderData { header, extra: None })) } + /// Set associated extra data. + pub fn set_extra(self, extra: P::Extra) -> Self { + QueuedHeader(Arc::new(QueuedHeaderData { + header: Arc::try_unwrap(self.0) + .map(|data| data.header) + .unwrap_or_else(|data| data.header.clone()), + extra: Some(extra), + })) + } +} + +impl Deref for QueuedHeader

{ + type Target = QueuedHeaderData

; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Header how it's stored in the synchronization queue. +#[derive(Clone, Debug, Default)] +#[cfg_attr(test, derive(PartialEq))] +pub struct QueuedHeaderData { + header: P::Header, + extra: Option, +} + +impl QueuedHeader

{ /// Returns ID of header. pub fn id(&self) -> HeaderId { self.header.id() @@ -133,15 +160,4 @@ impl QueuedHeader

{ pub fn extra(&self) -> &Option { &self.extra } - - /// Extract header and extra from self. - pub fn extract(self) -> (P::Header, Option) { - (self.header, self.extra) - } - - /// Set associated extra data. - pub fn set_extra(mut self, extra: P::Extra) -> Self { - self.extra = Some(extra); - self - } }