Skip to content

Commit

Permalink
Initial: Offchain Workers (paritytech#1942)
Browse files Browse the repository at this point in the history
* Refactor state-machine stuff.

* Fix tests.

* WiP

* WiP2

* Service support for offchain workers.

* Service support for offchain workers.

* Testing offchain worker.

* Initial version working.

* Pass side effects in call.

* Pass OffchainExt in context.

* Submit extrinsics to the pool.

* Support inherents.

* Insert to inherents pool.

* Inserting to the pool asynchronously.

* Add test to offchain worker.

* Implement convenience syntax for modules.

* Dispatching offchain worker through executive.

* Fix offchain test.

* Remove offchain worker from timestamp.

* Update Cargo.lock.

* Address review comments.

* Use latest patch version for futures.

* Add CLI parameter for offchain worker.

* Fix compilation.

* Fix test.

* Fix extrinsics format for tests.

* Fix RPC test.

* Bump spec version.

* Fix executive.

* Fix support macro.

* Address grumbles.

* Bump runtime
  • Loading branch information
tomusdrw authored and MTDK1 committed Apr 12, 2019
1 parent 4a73126 commit d60b533
Show file tree
Hide file tree
Showing 58 changed files with 1,154 additions and 174 deletions.
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 48 additions & 4 deletions core/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,22 @@
//
use std::{self, time, sync::Arc};

use log::{info, debug};
use log::{info, debug, warn};

use client::{
self, error, Client as SubstrateClient, CallExecutor,
block_builder::api::BlockBuilder as BlockBuilderApi, runtime_api::Core,
};
use codec::Decode;
use consensus_common::{self, evaluation};
use primitives::{H256, Blake2Hasher};
use primitives::{H256, Blake2Hasher, ExecutionContext};
use runtime_primitives::traits::{
Block as BlockT, Hash as HashT, Header as HeaderT, ProvideRuntimeApi, AuthorityIdFor
};
use runtime_primitives::ExecutionContext;
use runtime_primitives::generic::BlockId;
use runtime_primitives::ApplyError;
use transaction_pool::txpool::{self, Pool as TransactionPool};
use inherents::InherentData;
use inherents::{InherentData, pool::InherentsPool};
use substrate_telemetry::{telemetry, CONSENSUS_INFO};

/// Build new blocks.
Expand Down Expand Up @@ -115,6 +114,8 @@ pub struct ProposerFactory<C, A> where A: txpool::ChainApi {
pub client: Arc<C>,
/// The transaction pool.
pub transaction_pool: Arc<TransactionPool<A>>,
/// The inherents pool
pub inherents_pool: Arc<InherentsPool<<A::Block as BlockT>::Extrinsic>>,
}

impl<C, A> consensus_common::Environment<<C as AuthoringApi>::Block> for ProposerFactory<C, A> where
Expand Down Expand Up @@ -144,6 +145,7 @@ impl<C, A> consensus_common::Environment<<C as AuthoringApi>::Block> for Propose
parent_id: id,
parent_number: *parent_header.number(),
transaction_pool: self.transaction_pool.clone(),
inherents_pool: self.inherents_pool.clone(),
now: Box::new(time::Instant::now),
};

Expand All @@ -158,6 +160,7 @@ pub struct Proposer<Block: BlockT, C, A: txpool::ChainApi> {
parent_id: BlockId<Block>,
parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
transaction_pool: Arc<TransactionPool<A>>,
inherents_pool: Arc<InherentsPool<<Block as BlockT>::Extrinsic>>,
now: Box<Fn() -> time::Instant>,
}

Expand Down Expand Up @@ -201,11 +204,23 @@ impl<Block, C, A> Proposer<Block, C, A> where
&self.parent_id,
inherent_data,
|block_builder| {
// Add inherents from the internal pool

let inherents = self.inherents_pool.drain();
debug!("Pushing {} queued inherents.", inherents.len());
for i in inherents {
if let Err(e) = block_builder.push_extrinsic(i) {
warn!("Error while pushing inherent extrinsic from the pool: {:?}", e);
}
}

// proceed with transactions
let mut is_first = true;
let mut skipped = 0;
let mut unqueue_invalid = Vec::new();
let pending_iterator = self.transaction_pool.ready();

debug!("Attempting to push transactions from the pool.");
for pending in pending_iterator {
if (self.now)() > deadline {
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
Expand Down Expand Up @@ -303,6 +318,7 @@ mod tests {
let proposer_factory = ProposerFactory {
client: client.clone(),
transaction_pool: txpool.clone(),
inherents_pool: Default::default(),
};

let mut proposer = proposer_factory.init(
Expand All @@ -325,4 +341,32 @@ mod tests {
assert_eq!(txpool.ready().count(), 2);
}

#[test]
fn should_include_inherents_from_the_pool() {
// given
let client = Arc::new(test_client::new());
let chain_api = transaction_pool::ChainApi::new(client.clone());
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
let inpool = Arc::new(InherentsPool::default());

let proposer_factory = ProposerFactory {
client: client.clone(),
transaction_pool: txpool.clone(),
inherents_pool: inpool.clone(),
};

inpool.add(extrinsic(0));

let proposer = proposer_factory.init(
&client.header(&BlockId::number(0)).unwrap().unwrap(),
&[]
).unwrap();

// when
let deadline = time::Duration::from_secs(3);
let block = proposer.propose(Default::default(), deadline).unwrap();

// then
assert_eq!(block.extrinsics().len(), 1);
}
}
17 changes: 13 additions & 4 deletions core/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,20 @@ where
service::Roles::FULL
};

let exec = cli.execution_strategies;
config.execution_strategies = ExecutionStrategies {
syncing: cli.syncing_execution.into(),
importing: cli.importing_execution.into(),
block_construction: cli.block_construction_execution.into(),
other: cli.other_execution.into(),
syncing: exec.syncing_execution.into(),
importing: exec.importing_execution.into(),
block_construction: exec.block_construction_execution.into(),
offchain_worker: exec.offchain_worker_execution.into(),
other: exec.other_execution.into(),
};

config.offchain_worker = match (cli.offchain_worker, role) {
(params::OffchainWorkerEnabled::WhenValidating, service::Roles::AUTHORITY) => true,
(params::OffchainWorkerEnabled::Always, _) => true,
(params::OffchainWorkerEnabled::Never, _) => false,
(params::OffchainWorkerEnabled::WhenValidating, _) => false,
};

config.roles = role;
Expand Down
125 changes: 84 additions & 41 deletions core/cli/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ impl Into<client::ExecutionStrategy> for ExecutionStrategy {
}
}

arg_enum! {
/// How to execute blocks
#[derive(Debug, Clone)]
pub enum OffchainWorkerEnabled {
Always,
Never,
WhenValidating,
}
}

/// Shared parameters used by all `CoreParams`.
#[derive(Debug, StructOpt, Clone)]
pub struct SharedParams {
Expand Down Expand Up @@ -205,6 +215,70 @@ pub struct TransactionPoolParams {
pub pool_kbytes: usize,
}

/// Execution strategies parameters.
#[derive(Debug, StructOpt, Clone)]
pub struct ExecutionStrategies {
/// The means of execution used when calling into the runtime while syncing blocks.
#[structopt(
long = "syncing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub syncing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while importing blocks.
#[structopt(
long = "importing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub importing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while constructing blocks.
#[structopt(
long = "block-construction-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
)
)]
pub block_construction_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while constructing blocks.
#[structopt(
long = "offchain-worker-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeWhenPossible""#
)
)]
pub offchain_worker_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while not syncing, importing or constructing blocks.
#[structopt(
long = "other-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
)
)]
pub other_execution: ExecutionStrategy,
}

/// The `run` command used to run a node.
#[derive(Debug, StructOpt, Clone)]
pub struct RunCmd {
Expand Down Expand Up @@ -266,53 +340,22 @@ pub struct RunCmd {
#[structopt(long = "telemetry-url", value_name = "URL VERBOSITY", parse(try_from_str = "parse_telemetry_endpoints"))]
pub telemetry_endpoints: Vec<(String, u8)>,

/// The means of execution used when calling into the runtime while syncing blocks.
#[structopt(
long = "syncing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub syncing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while importing blocks.
#[structopt(
long = "importing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub importing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while constructing blocks.
/// Should execute offchain workers on every block. By default it's only enabled for nodes that are authoring new
/// blocks.
#[structopt(
long = "block-construction-execution",
value_name = "STRATEGY",
long = "offchain-worker",
value_name = "ENABLED",
raw(
possible_values = "&ExecutionStrategy::variants()",
possible_values = "&OffchainWorkerEnabled::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
default_value = r#""WhenValidating""#
)
)]
pub block_construction_execution: ExecutionStrategy,
pub offchain_worker: OffchainWorkerEnabled,

/// The means of execution used when calling into the runtime while not syncing, importing or constructing blocks.
#[structopt(
long = "other-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
)
)]
pub other_execution: ExecutionStrategy,
#[allow(missing_docs)]
#[structopt(flatten)]
pub execution_strategies: ExecutionStrategies,

#[allow(missing_docs)]
#[structopt(flatten)]
Expand Down
8 changes: 4 additions & 4 deletions core/client/src/block_builder/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
use super::api::BlockBuilder as BlockBuilderApi;
use std::vec::Vec;
use parity_codec::Encode;
use crate::blockchain::HeaderBackend;
use runtime_primitives::ApplyOutcome;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{
Header as HeaderT, Hash, Block as BlockT, One, HashFor, ProvideRuntimeApi, ApiRef
};
use primitives::H256;
use runtime_primitives::generic::BlockId;
use primitives::{H256, ExecutionContext};
use crate::blockchain::HeaderBackend;
use crate::runtime_api::Core;
use crate::error;
use runtime_primitives::{ApplyOutcome, ExecutionContext};


/// Utility for building new (valid) blocks from a stream of extrinsics.
Expand Down
Loading

0 comments on commit d60b533

Please sign in to comment.