Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

consensus: handle justification sync for blocks authored locally #8698

Merged
16 commits merged into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
let can_author_with =
sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone());

let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _,_>(
let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _, _>(
StartAuraParams {
slot_duration: sc_consensus_aura::slot_duration(&*client)?,
client: client.clone(),
Expand All @@ -233,6 +233,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
keystore: keystore_container.sync_keystore(),
can_author_with,
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
telemetry: telemetry.as_ref().map(|x| x.handle()),
},
Expand Down
1 change: 1 addition & 0 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ pub fn new_full_base(
env: proposer,
block_import,
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
inherent_data_providers: inherent_data_providers.clone(),
force_authoring,
backoff_authoring_blocks,
Expand Down
22 changes: 15 additions & 7 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl SlotCompatible for AuraSlotCompatible {
}

/// Parameters of [`start_aura`].
pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW> {
pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, L> {
/// The duration of a slot.
pub slot_duration: SlotDuration,
/// The client to interact with the chain.
Expand All @@ -143,6 +143,8 @@ pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW> {
pub keystore: SyncCryptoStorePtr,
/// Can we author a block with this node?
pub can_author_with: CAW,
/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,
/// The proportion of the slot dedicated to proposing.
///
/// The block proposing will be limited to this proportion of the slot from the starting of the
Expand All @@ -154,7 +156,7 @@ pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW> {
}

/// Start the aura worker. The returned future should be run in a futures executor.
pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error>(
pub fn start_aura<P, B, C, SC, PF, I, SO, BS, CAW, L, Error>(
StartAuraParams {
slot_duration,
client,
Expand All @@ -167,10 +169,12 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error>(
backoff_authoring_blocks,
keystore,
can_author_with,
justification_sync_link,
block_proposal_slot_portion,
telemetry,
}: StartAuraParams<C, SC, I, PF, SO, BS, CAW>,
) -> Result<impl Future<Output = ()>, sp_consensus::Error> where
}: StartAuraParams<C, SC, I, PF, SO, BS, CAW, L>,
) -> Result<impl Future<Output = ()>, sp_consensus::Error>
where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + AuxStore + HeaderBackend<B> + Send + Sync,
C::Api: AuraApi<B, AuthorityId<P>>,
Expand All @@ -183,8 +187,9 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error>(
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
CAW: CanAuthorWith<B> + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
CAW: CanAuthorWith<B> + Send,
L: sp_consensus::JustificationSyncLink<B> + 'static,
{
let worker = build_aura_worker::<P, _, _, _, _, _, _, _>(BuildAuraWorkerParams {
client: client.clone(),
Expand All @@ -203,11 +208,12 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error>(
slot_duration.slot_duration()
)?;

Ok(sc_consensus_slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible, _, _>(
Ok(sc_consensus_slots::start_slot_worker(
slot_duration,
select_chain,
worker,
sync_oracle,
justification_sync_link,
inherent_data_providers,
AuraSlotCompatible,
can_author_with,
Expand Down Expand Up @@ -751,7 +757,7 @@ mod tests {
&inherent_data_providers, slot_duration.slot_duration()
).expect("Registers aura inherent data provider");

aura_futures.push(start_aura::<AuthorityPair, _, _, _, _, _, _, _, _, _>(StartAuraParams {
aura_futures.push(start_aura::<AuthorityPair, _, _, _, _, _, _, _, _, _, _>(StartAuraParams {
slot_duration,
block_import: client.clone(),
select_chain,
Expand All @@ -763,6 +769,7 @@ mod tests {
backoff_authoring_blocks: Some(BackoffAuthoringOnFinalizedHeadLagging::default()),
keystore,
can_author_with: sp_consensus::AlwaysCanAuthor,
justification_sync_link: (),
block_proposal_slot_portion: SlotProportion::new(0.5),
telemetry: None,
}).expect("Starts aura"));
Expand Down Expand Up @@ -890,6 +897,7 @@ mod tests {
duration: Duration::from_millis(1000),
block_size_limit: None,
},
&mut (),
)).unwrap();

// The returned block should be imported and we should be able to get its header by now.
Expand Down
64 changes: 39 additions & 25 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl std::ops::Deref for Config {
}

/// Parameters for BABE.
pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS> {
pub struct BabeParams<B: BlockT, C, E, I, SO, SC, BS, CAW, L> {
/// The keystore that manages the keys of the node.
pub keystore: SyncCryptoStorePtr,

Expand Down Expand Up @@ -396,6 +396,9 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS> {
/// Checks if the current native implementation can author with a runtime at a given block.
pub can_author_with: CAW,

/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,
andresilva marked this conversation as resolved.
Show resolved Hide resolved

/// The proportion of the slot dedicated to proposing.
///
/// The block proposing will be limited to this proportion of the slot from the starting of the
Expand All @@ -408,38 +411,48 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS> {
}

/// Start the babe worker.
pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
keystore,
client,
select_chain,
env,
block_import,
sync_oracle,
inherent_data_providers,
force_authoring,
backoff_authoring_blocks,
babe_link,
can_author_with,
block_proposal_slot_portion,
telemetry,
}: BabeParams<B, C, E, I, SO, SC, CAW, BS>) -> Result<
BabeWorker<B>,
sp_consensus::Error,
> where
pub fn start_babe<B, C, SC, E, I, SO, BS, CAW, L, Error>(
BabeParams {
keystore,
client,
select_chain,
env,
block_import,
sync_oracle,
inherent_data_providers,
force_authoring,
backoff_authoring_blocks,
babe_link,
can_author_with,
justification_sync_link,
block_proposal_slot_portion,
telemetry,
}: BabeParams<B, C, E, I, SO, SC, BS, CAW, L>,
) -> Result<BabeWorker<B>, sp_consensus::Error>
where
B: BlockT,
C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>
+ Send + Sync + 'static,
C: ProvideRuntimeApi<B>
+ ProvideCache<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
+ Sync
+ 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>> + Send
+ Sync + 'static,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>>
+ Send
+ Sync
+ 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
L: sp_consensus::JustificationSyncLink<B> + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;

Expand Down Expand Up @@ -474,6 +487,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
select_chain,
worker,
sync_oracle,
justification_sync_link,
inherent_data_providers,
babe_link.time_source,
can_author_with,
Expand Down
5 changes: 2 additions & 3 deletions client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,7 @@ fn rejects_empty_block() {
})
}

fn run_one_test(
mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static,
) {
fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static) {
sp_tracing::try_init_simple();
let mutator = Arc::new(mutator) as Mutator;

Expand Down Expand Up @@ -446,6 +444,7 @@ fn run_one_test(
babe_link: data.link.clone(),
keystore,
can_author_with: sp_consensus::AlwaysCanAuthor,
justification_sync_link: (),
block_proposal_slot_portion: SlotProportion::new(0.5),
telemetry: None,
}).expect("Starts babe"));
Expand Down
9 changes: 6 additions & 3 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW, L>(
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
client: Arc<C>,
select_chain: S,
Expand All @@ -551,8 +551,9 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
timeout: Duration,
build_time: Duration,
can_author_with: CAW,
justification_sync_link: L,
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, <E::Proposer as Proposer<Block>>::Proof>>>,
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
impl Future<Output = ()>,
) where
Block: BlockT,
Expand All @@ -565,16 +566,18 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
SO: SyncOracle + Clone + Send + Sync + 'static,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
L: sp_consensus::JustificationSyncLink<Block>,
{
if let Err(_) = register_pow_inherent_data_provider(&inherent_data_providers) {
warn!("Registering inherent data provider for timestamp failed");
}

let timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker::<Block, Algorithm, C, _> {
let worker = Arc::new(Mutex::new(MiningWorker {
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker_ret = worker.clone();

Expand Down
19 changes: 15 additions & 4 deletions client/consensus/pow/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

use std::{pin::Pin, time::Duration, collections::HashMap, borrow::Cow};
use sc_client_api::ImportNotifications;
use sp_runtime::{DigestItem, traits::Block as BlockT, generic::BlockId};
use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, import_queue::BoxBlockImport};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
DigestItem,
};
use futures::{prelude::*, task::{Context, Poll}};
use futures_timer::Delay;
use log::*;
Expand Down Expand Up @@ -57,18 +61,22 @@ pub struct MiningWorker<
Block: BlockT,
Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>,
Proof
L: sp_consensus::JustificationSyncLink<Block>,
Proof,
> {
pub(crate) build: Option<MiningBuild<Block, Algorithm, C, Proof>>,
pub(crate) algorithm: Algorithm,
pub(crate) block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
pub(crate) justification_sync_link: L,
}

impl<Block, Algorithm, C, Proof> MiningWorker<Block, Algorithm, C, Proof> where
impl<Block, Algorithm, C, L, Proof> MiningWorker<Block, Algorithm, C, L, Proof>
where
Block: BlockT,
C: sp_api::ProvideRuntimeApi<Block>,
Algorithm: PowAlgorithm<Block>,
Algorithm::Difficulty: 'static + Send,
L: sp_consensus::JustificationSyncLink<Block>,
sp_api::TransactionFor<C, Block>: Send + 'static,
{
/// Get the current best hash. `None` if the worker has just started or the client is doing
Expand Down Expand Up @@ -139,8 +147,11 @@ impl<Block, Algorithm, C, Proof> MiningWorker<Block, Algorithm, C, Proof> where
Box::new(intermediate) as Box<_>,
);

let header = import_block.post_header();
match self.block_import.import_block(import_block, HashMap::default()).await {
Ok(_) => {
Ok(res) => {
res.handle_justification(&header.hash(), *header.number(), &mut self.justification_sync_link);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
Expand Down
Loading