Skip to content

Commit

Permalink
Add async_trait to SourceClient and TargetClient (paritytech#123)
Browse files Browse the repository at this point in the history
* Use async_trait for SourceClient

* Use aync_trait for TargetClient

* Revert async_trait usage for Source/Target client

This reverts commit f636ffaffd60197e90e887362b4a0c35a0dc5a6c.
This reverts commit 2c15755e8c93318f8e0a605852efe87d72edb769.

I'm having a very hard time finding out what is causing compilation
issues, and I think it's best to start over again.

* Use async_trait for TargetClient

* Use async_trait for SourceClient

* Move where non-async methods are

* RustFmt

* QueuedHeader holds Arc to actual data

* Clean up async return type

Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>

* Clean up async return type

Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>

* Clean up async return type

Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>

* Remove unused import

Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>
  • Loading branch information
2 people authored and bkchr committed Apr 10, 2024
1 parent b701c2b commit 650f323
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 158 deletions.
2 changes: 1 addition & 1 deletion bridges/relays/ethereum/src/ethereum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub async fn submit_substrate_headers(
Some(contract_address),
Some(nonce),
false,
bridge_contract::functions::import_header::encode_input(header.extract().0.encode(),),
bridge_contract::functions::import_header::encode_input(header.header().encode(),),
)
.await
)
Expand Down
91 changes: 44 additions & 47 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Heade
use crate::substrate_client::{self, SubstrateConnectionParams, SubstrateSigningParams};
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient};
use futures::future::{ready, FutureExt, Ready};
use std::{collections::HashSet, future::Future, pin::Pin, time::Duration};

use async_trait::async_trait;
use futures::future::FutureExt;
use std::{collections::HashSet, time::Duration};
use web3::types::H256;

/// Interval at which we check new Ethereum headers when we are synced/almost synced.
Expand Down Expand Up @@ -80,40 +82,40 @@ struct EthereumHeadersSource {

type EthereumFutureOutput<T> = OwnedSourceFutureOutput<EthereumHeadersSource, EthereumHeadersSyncPipeline, T>;

#[async_trait]
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
type Error = ethereum_client::Error;
type BestBlockNumberFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<u64>>>>;
type HeaderByHashFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<Header>>>>;
type HeaderByNumberFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<Header>>>>;
type HeaderExtraFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<(EthereumHeaderId, Vec<Receipt>)>>>>;
type HeaderCompletionFuture = Ready<EthereumFutureOutput<(EthereumHeaderId, Option<()>)>>;

fn best_block_number(self) -> Self::BestBlockNumberFuture {
async fn best_block_number(self) -> EthereumFutureOutput<u64> {
ethereum_client::best_block_number(self.client)
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
.await
}

fn header_by_hash(self, hash: H256) -> Self::HeaderByHashFuture {
async fn header_by_hash(self, hash: H256) -> EthereumFutureOutput<Header> {
ethereum_client::header_by_hash(self.client, hash)
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
.await
}

fn header_by_number(self, number: u64) -> Self::HeaderByNumberFuture {
async fn header_by_number(self, number: u64) -> EthereumFutureOutput<Header> {
ethereum_client::header_by_number(self.client, number)
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
.await
}

fn header_extra(self, id: EthereumHeaderId, header: &Header) -> Self::HeaderExtraFuture {
ethereum_client::transactions_receipts(self.client, id, header.transactions.clone())
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
async fn header_completion(self, id: EthereumHeaderId) -> EthereumFutureOutput<(EthereumHeaderId, Option<()>)> {
(self, Ok((id, None)))
}

fn header_completion(self, id: EthereumHeaderId) -> Self::HeaderCompletionFuture {
ready((self, Ok((id, None))))
async fn header_extra(
self,
id: EthereumHeaderId,
header: QueuedEthereumHeader,
) -> EthereumFutureOutput<(EthereumHeaderId, Vec<Receipt>)> {
ethereum_client::transactions_receipts(self.client, id, header.header().transactions.clone())
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.await
}
}

Expand All @@ -129,16 +131,11 @@ struct SubstrateHeadersTarget {

type SubstrateFutureOutput<T> = OwnedTargetFutureOutput<SubstrateHeadersTarget, EthereumHeadersSyncPipeline, T>;

#[async_trait]
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
type Error = substrate_client::Error;
type BestHeaderIdFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<EthereumHeaderId>>>>;
type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<(EthereumHeaderId, bool)>>>>;
type RequiresExtraFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<(EthereumHeaderId, bool)>>>>;
type SubmitHeadersFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Vec<EthereumHeaderId>>>>>;
type IncompleteHeadersFuture = Ready<SubstrateFutureOutput<HashSet<EthereumHeaderId>>>;
type CompleteHeadersFuture = Ready<SubstrateFutureOutput<EthereumHeaderId>>;

fn best_header_id(self) -> Self::BestHeaderIdFuture {

async fn best_header_id(self) -> SubstrateFutureOutput<EthereumHeaderId> {
let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params);
substrate_client::best_ethereum_block(self.client)
.map(move |(client, result)| {
Expand All @@ -151,10 +148,10 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
result,
)
})
.boxed()
.await
}

fn is_known_header(self, id: EthereumHeaderId) -> Self::IsKnownHeaderFuture {
async fn is_known_header(self, id: EthereumHeaderId) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> {
let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params);
substrate_client::ethereum_header_known(self.client, id)
.map(move |(client, result)| {
Expand All @@ -167,15 +164,12 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
result,
)
})
.boxed()
.await
}

fn requires_extra(self, header: &QueuedEthereumHeader) -> Self::RequiresExtraFuture {
// we can minimize number of receipts_check calls by checking header
// logs bloom here, but it may give us false positives (when authorities
// source is contract, we never need any logs)
async fn submit_headers(self, headers: Vec<QueuedEthereumHeader>) -> SubstrateFutureOutput<Vec<EthereumHeaderId>> {
let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params);
substrate_client::ethereum_receipts_required(self.client, header.clone())
substrate_client::submit_ethereum_headers(self.client, sign_params.clone(), headers, sign_transactions)
.map(move |(client, result)| {
(
SubstrateHeadersTarget {
Expand All @@ -186,12 +180,23 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
result,
)
})
.boxed()
.await
}

async fn incomplete_headers_ids(self) -> SubstrateFutureOutput<HashSet<EthereumHeaderId>> {
(self, Ok(HashSet::new()))
}

fn submit_headers(self, headers: Vec<QueuedEthereumHeader>) -> Self::SubmitHeadersFuture {
async fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> SubstrateFutureOutput<EthereumHeaderId> {
(self, Ok(id))
}

async fn requires_extra(self, header: QueuedEthereumHeader) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> {
// we can minimize number of receipts_check calls by checking header
// logs bloom here, but it may give us false positives (when authorities
// source is contract, we never need any logs)
let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params);
substrate_client::submit_ethereum_headers(self.client, sign_params.clone(), headers, sign_transactions)
substrate_client::ethereum_receipts_required(self.client, header)
.map(move |(client, result)| {
(
SubstrateHeadersTarget {
Expand All @@ -202,15 +207,7 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
result,
)
})
.boxed()
}

fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture {
ready((self, Ok(HashSet::new())))
}

fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> Self::CompleteHeadersFuture {
ready((self, Ok(id)))
.await
}
}

Expand Down
10 changes: 4 additions & 6 deletions bridges/relays/ethereum/src/substrate_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,9 @@ fn create_signed_submit_transaction(
headers
.into_iter()
.map(|header| {
let (header, receipts) = header.extract();
(
into_substrate_ethereum_header(&header),
into_substrate_ethereum_receipts(&receipts),
into_substrate_ethereum_header(header.header()),
into_substrate_ethereum_receipts(header.extra()),
)
})
.collect(),
Expand Down Expand Up @@ -422,11 +421,10 @@ fn create_signed_submit_transaction(

/// Create unsigned Substrate transaction for submitting Ethereum header.
fn create_unsigned_submit_transaction(header: QueuedEthereumHeader) -> bridge_node_runtime::UncheckedExtrinsic {
let (header, receipts) = header.extract();
let function =
bridge_node_runtime::Call::BridgeEthPoA(bridge_node_runtime::BridgeEthPoACall::import_unsigned_header(
into_substrate_ethereum_header(&header),
into_substrate_ethereum_receipts(&receipts),
into_substrate_ethereum_header(header.header()),
into_substrate_ethereum_receipts(header.extra()),
));

bridge_node_runtime::UncheckedExtrinsic::new_unsigned(function)
Expand Down
87 changes: 45 additions & 42 deletions bridges/relays/ethereum/src/substrate_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ use crate::substrate_types::{
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient};
use crate::sync_types::SourceHeader;
use futures::future::{ready, FutureExt, Ready};
use std::{collections::HashSet, future::Future, pin::Pin, time::Duration};

use async_trait::async_trait;
use futures::future::FutureExt;
use std::{collections::HashSet, time::Duration};

/// Interval at which we check new Substrate headers when we are synced/almost synced.
const SUBSTRATE_TICK_INTERVAL: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -87,41 +89,43 @@ struct SubstrateHeadersSource {

type SubstrateFutureOutput<T> = OwnedSourceFutureOutput<SubstrateHeadersSource, SubstrateHeadersSyncPipeline, T>;

#[async_trait]
impl SourceClient<SubstrateHeadersSyncPipeline> for SubstrateHeadersSource {
type Error = substrate_client::Error;
type BestBlockNumberFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Number>>>>;
type HeaderByHashFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Header>>>>;
type HeaderByNumberFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Header>>>>;
type HeaderExtraFuture = Ready<SubstrateFutureOutput<(SubstrateHeaderId, ())>>;
type HeaderCompletionFuture =
Pin<Box<dyn Future<Output = SubstrateFutureOutput<(SubstrateHeaderId, Option<GrandpaJustification>)>>>>;

fn best_block_number(self) -> Self::BestBlockNumberFuture {

async fn best_block_number(self) -> SubstrateFutureOutput<Number> {
substrate_client::best_header(self.client)
.map(|(client, result)| (SubstrateHeadersSource { client }, result.map(|header| header.number)))
.boxed()
.await
}

fn header_by_hash(self, hash: Hash) -> Self::HeaderByHashFuture {
async fn header_by_hash(self, hash: Hash) -> SubstrateFutureOutput<Header> {
substrate_client::header_by_hash(self.client, hash)
.map(|(client, result)| (SubstrateHeadersSource { client }, result))
.boxed()
.await
}

fn header_by_number(self, number: Number) -> Self::HeaderByNumberFuture {
async fn header_by_number(self, number: Number) -> SubstrateFutureOutput<Header> {
substrate_client::header_by_number(self.client, number)
.map(|(client, result)| (SubstrateHeadersSource { client }, result))
.boxed()
}

fn header_extra(self, id: SubstrateHeaderId, _header: &Header) -> Self::HeaderExtraFuture {
ready((self, Ok((id, ()))))
.await
}

fn header_completion(self, id: SubstrateHeaderId) -> Self::HeaderCompletionFuture {
async fn header_completion(
self,
id: SubstrateHeaderId,
) -> SubstrateFutureOutput<(SubstrateHeaderId, Option<GrandpaJustification>)> {
substrate_client::grandpa_justification(self.client, id)
.map(|(client, result)| (SubstrateHeadersSource { client }, result))
.boxed()
.await
}

async fn header_extra(
self,
id: SubstrateHeaderId,
_header: QueuedSubstrateHeader,
) -> SubstrateFutureOutput<(SubstrateHeaderId, ())> {
(self, Ok((id, ())))
}
}

Expand All @@ -137,16 +141,11 @@ struct EthereumHeadersTarget {

type EthereumFutureOutput<T> = OwnedTargetFutureOutput<EthereumHeadersTarget, SubstrateHeadersSyncPipeline, T>;

#[async_trait]
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
type Error = ethereum_client::Error;
type BestHeaderIdFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<SubstrateHeaderId>>>>;
type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<(SubstrateHeaderId, bool)>>>>;
type RequiresExtraFuture = Ready<EthereumFutureOutput<(SubstrateHeaderId, bool)>>;
type SubmitHeadersFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<Vec<SubstrateHeaderId>>>>>;
type IncompleteHeadersFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<HashSet<SubstrateHeaderId>>>>>;
type CompleteHeadersFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<SubstrateHeaderId>>>>;

fn best_header_id(self) -> Self::BestHeaderIdFuture {

async fn best_header_id(self) -> EthereumFutureOutput<SubstrateHeaderId> {
let (contract, sign_params) = (self.contract, self.sign_params);
ethereum_client::best_substrate_block(self.client, contract)
.map(move |(client, result)| {
Expand All @@ -159,10 +158,10 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
result,
)
})
.boxed()
.await
}

fn is_known_header(self, id: SubstrateHeaderId) -> Self::IsKnownHeaderFuture {
async fn is_known_header(self, id: SubstrateHeaderId) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> {
let (contract, sign_params) = (self.contract, self.sign_params);
ethereum_client::substrate_header_known(self.client, contract, id)
.map(move |(client, result)| {
Expand All @@ -175,14 +174,10 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
result,
)
})
.boxed()
}

fn requires_extra(self, header: &QueuedSubstrateHeader) -> Self::RequiresExtraFuture {
ready((self, Ok((header.header().id(), false))))
.await
}

fn submit_headers(self, headers: Vec<QueuedSubstrateHeader>) -> Self::SubmitHeadersFuture {
async fn submit_headers(self, headers: Vec<QueuedSubstrateHeader>) -> EthereumFutureOutput<Vec<SubstrateHeaderId>> {
let (contract, sign_params) = (self.contract, self.sign_params);
ethereum_client::submit_substrate_headers(self.client, sign_params.clone(), contract, headers)
.map(move |(client, result)| {
Expand All @@ -195,10 +190,10 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
result,
)
})
.boxed()
.await
}

fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture {
async fn incomplete_headers_ids(self) -> EthereumFutureOutput<HashSet<SubstrateHeaderId>> {
let (contract, sign_params) = (self.contract, self.sign_params);
ethereum_client::incomplete_substrate_headers(self.client, contract)
.map(move |(client, result)| {
Expand All @@ -211,10 +206,14 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
result,
)
})
.boxed()
.await
}

fn complete_header(self, id: SubstrateHeaderId, completion: GrandpaJustification) -> Self::CompleteHeadersFuture {
async fn complete_header(
self,
id: SubstrateHeaderId,
completion: GrandpaJustification,
) -> EthereumFutureOutput<SubstrateHeaderId> {
let (contract, sign_params) = (self.contract, self.sign_params);
ethereum_client::complete_substrate_header(self.client, sign_params.clone(), contract, id, completion)
.map(move |(client, result)| {
Expand All @@ -227,7 +226,11 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
result,
)
})
.boxed()
.await
}

async fn requires_extra(self, header: QueuedSubstrateHeader) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> {
(self, Ok((header.header().id(), false)))
}
}

Expand Down
Loading

0 comments on commit 650f323

Please sign in to comment.