diff --git a/relays/client-substrate/src/client.rs b/relays/client-substrate/src/client.rs index 80db593acb..6a381c4e97 100644 --- a/relays/client-substrate/src/client.rs +++ b/relays/client-substrate/src/client.rs @@ -26,7 +26,7 @@ use crate::{ Result, SignParam, TransactionTracker, UnsignedTransaction, }; -use async_std::sync::{Arc, Mutex}; +use async_std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider}; use codec::{Decode, Encode}; @@ -111,23 +111,31 @@ pub enum ChainRuntimeVersion { /// Substrate client type. /// -/// Cloning `Client` is a cheap operation. +/// Cloning `Client` is a cheap operation that only clones internal references. Different +/// clones of the same client are guaranteed to use the same references. pub struct Client { - /// Tokio runtime handle. - tokio: Arc, + // Lock order: `submit_signed_extrinsic_lock`, `data` /// Client connection params. params: Arc, - /// Substrate RPC client. - client: Arc, - /// Genesis block hash. - genesis_hash: HashOf, + /// Saved chain runtime version. + chain_runtime_version: ChainRuntimeVersion, /// If several tasks are submitting their transactions simultaneously using /// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of /// transactions will be rejected from the pool. This lock is here to prevent situations like /// that. submit_signed_extrinsic_lock: Arc>, - /// Saved chain runtime version - chain_runtime_version: ChainRuntimeVersion, + /// Genesis block hash. + genesis_hash: HashOf, + /// Shared dynamic data. + data: Arc>, +} + +/// Client data, shared by all `Client` clones. +struct ClientData { + /// Tokio runtime handle. + tokio: Arc, + /// Substrate RPC client. + client: Arc, } #[async_trait] @@ -135,9 +143,10 @@ impl relay_utils::relay_loop::Client for Client { type Error = Error; async fn reconnect(&mut self) -> Result<()> { + let mut data = self.data.write().await; let (tokio, client) = Self::build_client(&self.params).await?; - self.tokio = tokio; - self.client = client; + data.tokio = tokio; + data.client = client; Ok(()) } } @@ -145,12 +154,11 @@ impl relay_utils::relay_loop::Client for Client { impl Clone for Client { fn clone(&self) -> Self { Client { - tokio: self.tokio.clone(), params: self.params.clone(), - client: self.client.clone(), - genesis_hash: self.genesis_hash, - submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(), chain_runtime_version: self.chain_runtime_version.clone(), + submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(), + genesis_hash: self.genesis_hash, + data: self.data.clone(), } } } @@ -199,12 +207,11 @@ impl Client { let chain_runtime_version = params.chain_runtime_version.clone(); Ok(Self { - tokio, params, - client, - genesis_hash, - submit_signed_extrinsic_lock: Arc::new(Mutex::new(())), chain_runtime_version, + submit_signed_extrinsic_lock: Arc::new(Mutex::new(())), + genesis_hash, + data: Arc::new(RwLock::new(ClientData { tokio, client })), }) } @@ -572,7 +579,7 @@ impl Client { Ok((tracker, subscription)) }) .await?; - self.tokio.spawn(Subscription::background_worker( + self.data.read().await.tokio.spawn(Subscription::background_worker( C::NAME.into(), "extrinsic".into(), subscription, @@ -719,7 +726,7 @@ impl Client { }) .await?; let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); - self.tokio.spawn(Subscription::background_worker( + self.data.read().await.tokio.spawn(Subscription::background_worker( C::NAME.into(), "justification".into(), subscription, @@ -735,8 +742,9 @@ impl Client { F: Future> + Send, T: Send + 'static, { - let client = self.client.clone(); - self.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await? + let data = self.data.read().await; + let client = data.client.clone(); + data.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await? } /// Returns `true` if version guard can be started.