Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Switch the client to new futures (#3103)
Browse files Browse the repository at this point in the history
* Switch the client to new futures

* No need for compat in the client

* Fix client tests

* Address review
  • Loading branch information
tomaka authored and bkchr committed Jul 11, 2019
1 parent 98d866d commit 9505713
Show file tree
Hide file tree
Showing 28 changed files with 249 additions and 112 deletions.
90 changes: 88 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy_static = "1.3"
app_dirs = "1.2"
tokio = "0.1.7"
futures = "0.1.17"
futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] }
fdlimit = "0.1"
exit-future = "0.1"
serde_json = "1.0"
Expand Down
3 changes: 2 additions & 1 deletion core/cli/src/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use ansi_term::Colour;
use std::fmt;
use std::time;
use futures::{Future, Stream};
use futures03::{StreamExt as _, TryStreamExt as _};
use service::{Service, Components};
use tokio::runtime::TaskExecutor;
use network::SyncState;
Expand Down Expand Up @@ -81,7 +82,7 @@ where C: Components {
Some((info.chain.best_number, info.chain.best_hash))
};

let display_block_import = client.import_notification_stream().for_each(move |n| {
let display_block_import = client.import_notification_stream().map(|v| Ok::<_, ()>(v)).compat().for_each(move |n| {
// detect and log reorganizations.
if let Some((ref last_num, ref last_hash)) = last {
if n.header.parent_hash() != last_hash {
Expand Down
4 changes: 2 additions & 2 deletions core/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fnv = { version = "1.0", optional = true }
log = { version = "0.4", optional = true }
parking_lot = { version = "0.8.0", optional = true }
hex = { package = "hex-literal", version = "0.1", optional = true }
futures = { version = "0.1.17", optional = true }
futures-preview = { version = "0.3.0-alpha.17", optional = true }
consensus = { package = "substrate-consensus-common", path = "../consensus/common", optional = true }
executor = { package = "substrate-executor", path = "../executor", optional = true }
state-machine = { package = "substrate-state-machine", path = "../state-machine", optional = true }
Expand Down Expand Up @@ -47,7 +47,7 @@ std = [
"fnv",
"log",
"hex",
"futures",
"futures-preview",
"executor",
"state-machine",
"keyring",
Expand Down
2 changes: 1 addition & 1 deletion core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
panic::UnwindSafe, result, cell::RefCell, rc::Rc,
};
use crate::error::Error;
use futures::sync::mpsc;
use futures::channel::mpsc;
use parking_lot::{Mutex, RwLock};
use primitives::NativeOrEncoded;
use runtime_primitives::{
Expand Down
2 changes: 1 addition & 1 deletion core/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use crate::client::{
new_with_backend,
new_in_mem,
BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents,
BlockImportNotification, Client, ClientInfo, ExecutionStrategies,
BlockImportNotification, Client, ClientInfo, ExecutionStrategies, FinalityNotification,
LongestChain,
};
#[cfg(feature = "std")]
Expand Down
18 changes: 9 additions & 9 deletions core/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use futures::{Future, IntoFuture};
use parking_lot::{RwLock, Mutex};

use runtime_primitives::{generic::BlockId, Justification, StorageOverlay, ChildrenStorageOverlay};
Expand Down Expand Up @@ -359,14 +358,15 @@ where
*self.cached_header.write() = Some(cached_header);
}

self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_read(RemoteReadRequest {
block: self.block,
header: header.expect("if block above guarantees that header is_some(); qed"),
key: key.to_vec(),
retry_count: None,
})
.into_future().wait()
futures::executor::block_on(
self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_read(RemoteReadRequest {
block: self.block,
header: header.expect("if block above guarantees that header is_some(); qed"),
key: key.to_vec(),
retry_count: None,
})
)
}

fn child_storage(&self, _storage_key: &[u8], _key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
Expand Down
30 changes: 15 additions & 15 deletions core/client/src/light/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! blocks. CHT roots are stored for headers of ancient blocks.
use std::{sync::{Weak, Arc}, collections::HashMap};
use futures::{Future, IntoFuture};
use parking_lot::Mutex;

use runtime_primitives::{Justification, generic::BlockId};
Expand Down Expand Up @@ -122,14 +121,15 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
return Ok(None);
}

self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.header_cht_root(cht::size(), number)?,
block: number,
retry_count: None,
futures::executor::block_on(
self.fetcher().upgrade()
.ok_or(ClientError::NotAvailableOnLightClient)?
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.header_cht_root(cht::size(), number)?,
block: number,
retry_count: None,
})
.into_future().wait()
.map(Some)
).map(Some)
}
}
}
Expand Down Expand Up @@ -158,13 +158,13 @@ impl<S, F, Block> BlockchainBackend<Block> for Blockchain<S, F> where Block: Blo
None => return Ok(None),
};

self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_body(RemoteBodyRequest {
header,
retry_count: None,
})
.into_future().wait()
.map(Some)
futures::executor::block_on(
self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_body(RemoteBodyRequest {
header,
retry_count: None,
})
).map(Some)
}

fn justification(&self, _id: BlockId<Block>) -> ClientResult<Option<Justification>> {
Expand Down
5 changes: 2 additions & 3 deletions core/client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::{
collections::HashSet, sync::Arc, panic::UnwindSafe, result,
marker::PhantomData, cell::RefCell, rc::Rc,
};
use futures::{IntoFuture, Future};

use parity_codec::{Encode, Decode};
use primitives::{offchain, H256, Blake2Hasher, convert_hash, NativeOrEncoded};
Expand Down Expand Up @@ -100,13 +99,13 @@ where
let block_hash = self.blockchain.expect_block_hash_from_id(id)?;
let block_header = self.blockchain.expect_header(id.clone())?;

self.fetcher.remote_call(RemoteCallRequest {
futures::executor::block_on(self.fetcher.remote_call(RemoteCallRequest {
block: block_hash,
header: block_header,
method: method.into(),
call_data: call_data.to_vec(),
retry_count: None,
}).into_future().wait()
}))
}

fn contextual_call<
Expand Down
Loading

0 comments on commit 9505713

Please sign in to comment.