Skip to content

Commit

Permalink
Collator overseer builder unification (#3335)
Browse files Browse the repository at this point in the history
resolve #3116

a follow-up on
#3061 (review):

- [x] reuse collator overseer builder for polkadot-node and collator
- [x] run zombienet test (0001-parachains-smoke-test.toml)
- [x] make wasm build errors more user-friendly for an easier problem
detection when using different toolchains in Rust

---------

Co-authored-by: ordian <write@reusable.software>
Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 28, 2024
1 parent 95da658 commit 7ec0b87
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 278 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/relay-chain-minimal-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ polkadot-node-collation-generation = { path = "../../../polkadot/node/collation-
polkadot-node-core-runtime-api = { path = "../../../polkadot/node/core/runtime-api" }
polkadot-node-core-chain-api = { path = "../../../polkadot/node/core/chain-api" }
polkadot-node-core-prospective-parachains = { path = "../../../polkadot/node/core/prospective-parachains" }
polkadot-service = { path = "../../../polkadot/node/service" }

# substrate deps
sc-authority-discovery = { path = "../../../substrate/client/authority-discovery" }
Expand Down
144 changes: 7 additions & 137 deletions cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,167 +15,37 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use futures::{select, StreamExt};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
use polkadot_network_bridge::{
Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
NetworkBridgeTx as NetworkBridgeTxSubsystem,
};
use polkadot_node_collation_generation::CollationGenerationSubsystem;
use polkadot_node_core_chain_api::ChainApiSubsystem;
use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_node_network_protocol::{
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1::{self, AvailableDataFetchingRequest},
v2, IncomingRequestReceiver, ReqProtocolNames,
},
};
use polkadot_node_subsystem_util::metrics::{prometheus::Registry, Metrics};
use polkadot_overseer::{
BlockInfo, DummySubsystem, Handle, Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
UnpinHandle,
BlockInfo, Handle, Overseer, OverseerConnector, OverseerHandle, SpawnGlue, UnpinHandle,
};
use polkadot_primitives::CollatorPair;
use polkadot_service::overseer::{collator_overseer_builder, OverseerGenArgs};

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::{NetworkStateInfo, NotificationService};
use sc_service::TaskManager;
use sc_utils::mpsc::tracing_unbounded;

use cumulus_primitives_core::relay_chain::{Block, Hash as PHash};
use cumulus_relay_chain_interface::RelayChainError;

use crate::BlockChainRpcClient;

/// Arguments passed for overseer construction.
pub(crate) struct CollatorOverseerGenArgs<'a> {
/// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others.
pub runtime_client: Arc<BlockChainRpcClient>,
/// Underlying network service implementation.
pub network_service: Arc<sc_network::NetworkService<Block, PHash>>,
/// Syncing oracle.
pub sync_oracle: Box<dyn sp_consensus::SyncOracle + Send>,
/// Underlying authority discovery service.
pub authority_discovery_service: AuthorityDiscoveryService,
/// Receiver for collation request protocol v1.
pub collation_req_receiver_v1: IncomingRequestReceiver<v1::CollationFetchingRequest>,
/// Receiver for collation request protocol v2.
pub collation_req_receiver_v2: IncomingRequestReceiver<v2::CollationFetchingRequest>,
/// Receiver for availability request protocol
pub available_data_req_receiver: IncomingRequestReceiver<AvailableDataFetchingRequest>,
/// Prometheus registry, commonly used for production systems, less so for test.
pub registry: Option<&'a Registry>,
/// Task spawner to be used throughout the overseer and the APIs it provides.
pub spawner: sc_service::SpawnTaskHandle,
/// Determines the behavior of the collator.
pub collator_pair: CollatorPair,
/// Request response protocols
pub req_protocol_names: ReqProtocolNames,
/// Peerset protocols name mapping
pub peer_set_protocol_names: PeerSetProtocolNames,
/// Notification services for validation/collation protocols.
pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
}

fn build_overseer(
connector: OverseerConnector,
CollatorOverseerGenArgs {
runtime_client,
network_service,
sync_oracle,
authority_discovery_service,
collation_req_receiver_v1,
collation_req_receiver_v2,
available_data_req_receiver,
registry,
spawner,
collator_pair,
req_protocol_names,
peer_set_protocol_names,
notification_services,
}: CollatorOverseerGenArgs<'_>,
args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
) -> Result<
(Overseer<SpawnGlue<sc_service::SpawnTaskHandle>, Arc<BlockChainRpcClient>>, OverseerHandle),
RelayChainError,
> {
let spawner = SpawnGlue(spawner);
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let notification_sinks = Arc::new(Mutex::new(HashMap::new()));

let builder = Overseer::builder()
.availability_distribution(DummySubsystem)
.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
available_data_req_receiver,
Metrics::register(registry)?,
))
.availability_store(DummySubsystem)
.bitfield_distribution(DummySubsystem)
.bitfield_signing(DummySubsystem)
.candidate_backing(DummySubsystem)
.candidate_validation(DummySubsystem)
.pvf_checker(DummySubsystem)
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
.collator_protocol({
let side = ProtocolSide::Collator {
peer_id: network_service.local_peer_id(),
collator_pair,
request_receiver_v1: collation_req_receiver_v1,
request_receiver_v2: collation_req_receiver_v2,
metrics: Metrics::register(registry)?,
};
CollatorProtocolSubsystem::new(side)
})
.network_bridge_rx(NetworkBridgeRxSubsystem::new(
network_service.clone(),
authority_discovery_service.clone(),
sync_oracle,
network_bridge_metrics.clone(),
peer_set_protocol_names.clone(),
notification_services,
notification_sinks.clone(),
))
.network_bridge_tx(NetworkBridgeTxSubsystem::new(
network_service,
authority_discovery_service,
network_bridge_metrics,
req_protocol_names,
peer_set_protocol_names,
notification_sinks,
))
.provisioner(DummySubsystem)
.runtime_api(RuntimeApiSubsystem::new(
runtime_client.clone(),
Metrics::register(registry)?,
spawner.clone(),
))
.statement_distribution(DummySubsystem)
.prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
.approval_distribution(DummySubsystem)
.approval_voting(DummySubsystem)
.gossip_support(DummySubsystem)
.dispute_coordinator(DummySubsystem)
.dispute_distribution(DummySubsystem)
.chain_selection(DummySubsystem)
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.supports_parachains(runtime_client)
.metrics(Metrics::register(registry)?)
.spawner(spawner);
let builder =
collator_overseer_builder(args).map_err(|e| RelayChainError::Application(e.into()))?;

builder
.build_with_connector(connector)
.map_err(|e| RelayChainError::Application(e.into()))
}

pub(crate) fn spawn_overseer(
overseer_args: CollatorOverseerGenArgs,
overseer_args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
task_manager: &TaskManager,
relay_chain_rpc_client: Arc<BlockChainRpcClient>,
) -> Result<polkadot_overseer::Handle, RelayChainError> {
Expand Down
30 changes: 16 additions & 14 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use collator_overseer::{CollatorOverseerGenArgs, NewMinimalNode};
use collator_overseer::NewMinimalNode;

use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterface, Url};
Expand All @@ -29,6 +29,7 @@ use polkadot_node_network_protocol::{

use polkadot_node_subsystem_util::metrics::prometheus::Registry;
use polkadot_primitives::CollatorPair;
use polkadot_service::{overseer::OverseerGenArgs, IsParachainNode};

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::{config::FullNetworkConfiguration, Event, NetworkEventStream, NetworkService};
Expand Down Expand Up @@ -172,10 +173,10 @@ async fn new_minimal_relay_chain(
}

let genesis_hash = relay_chain_rpc_client.block_get_hash(Some(0)).await?.unwrap_or_default();
let peer_set_protocol_names =
let peerset_protocol_names =
PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };
let notification_services = peer_sets_info(is_authority, &peer_set_protocol_names)
let notification_services = peer_sets_info(is_authority, &peerset_protocol_names)
.into_iter()
.map(|(config, (peerset, service))| {
net_config.add_notification_protocol(config);
Expand All @@ -184,14 +185,14 @@ async fn new_minimal_relay_chain(
.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();

let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let (collation_req_receiver_v1, collation_req_receiver_v2, available_data_req_receiver) =
let (collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver) =
build_request_response_protocol_receivers(&request_protocol_names, &mut net_config);

let best_header = relay_chain_rpc_client
.chain_get_header(None)
.await?
.ok_or_else(|| RelayChainError::RpcCallError("Unable to fetch best header".to_string()))?;
let (network, network_starter, sync_oracle) = build_collator_network(
let (network, network_starter, sync_service) = build_collator_network(
&config,
net_config,
task_manager.spawn_handle(),
Expand All @@ -208,19 +209,20 @@ async fn new_minimal_relay_chain(
prometheus_registry.cloned(),
);

let overseer_args = CollatorOverseerGenArgs {
let overseer_args = OverseerGenArgs {
runtime_client: relay_chain_rpc_client.clone(),
network_service: network,
sync_oracle,
sync_service,
authority_discovery_service,
collation_req_receiver_v1,
collation_req_receiver_v2,
collation_req_v1_receiver,
collation_req_v2_receiver,
available_data_req_receiver,
registry: prometheus_registry,
spawner: task_manager.spawn_handle(),
collator_pair,
is_parachain_node: IsParachainNode::Collator(collator_pair),
overseer_message_channel_capacity_override: None,
req_protocol_names: request_protocol_names,
peer_set_protocol_names,
peerset_protocol_names,
notification_services,
};

Expand All @@ -240,16 +242,16 @@ fn build_request_response_protocol_receivers(
IncomingRequestReceiver<v2::CollationFetchingRequest>,
IncomingRequestReceiver<v1::AvailableDataFetchingRequest>,
) {
let (collation_req_receiver_v1, cfg) =
let (collation_req_v1_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
config.add_request_response_protocol(cfg);
let (collation_req_receiver_v2, cfg) =
let (collation_req_v2_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
config.add_request_response_protocol(cfg);
let (available_data_req_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
config.add_request_response_protocol(cfg);
let cfg = Protocol::ChunkFetchingV1.get_outbound_only_config(request_protocol_names);
config.add_request_response_protocol(cfg);
(collation_req_receiver_v1, collation_req_receiver_v2, available_data_req_receiver)
(collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver)
}
8 changes: 6 additions & 2 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ pub(crate) fn build_collator_network(
genesis_hash: Hash,
best_header: Header,
) -> Result<
(Arc<NetworkService<Block, Hash>>, NetworkStarter, Box<dyn sp_consensus::SyncOracle + Send>),
(
Arc<NetworkService<Block, Hash>>,
NetworkStarter,
Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
),
Error,
> {
let protocol_id = config.protocol_id();
Expand Down Expand Up @@ -112,7 +116,7 @@ pub(crate) fn build_collator_network(

let network_starter = NetworkStarter::new(network_start_tx);

Ok((network_service, network_starter, Box::new(SyncOracle {})))
Ok((network_service, network_starter, Arc::new(SyncOracle {})))
}

fn adjust_network_config_light_in_peers(config: &mut NetworkConfiguration) {
Expand Down
15 changes: 5 additions & 10 deletions polkadot/node/malus/src/variants/back_garbage_candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
use polkadot_cli::{
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, ExtendedOverseerGenArgs,
HeaderBackend, Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle,
ParachainHost, ProvideRuntimeApi,
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_subsystem::SpawnGlue;
use polkadot_node_subsystem_types::DefaultSubsystemClient;
use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
use sp_core::traits::SpawnNamed;

use crate::{
Expand Down Expand Up @@ -63,13 +62,9 @@ impl OverseerGen for BackGarbageCandidates {
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
Error,
>
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let spawner = args.spawner.clone();
Expand Down
4 changes: 0 additions & 4 deletions polkadot/node/malus/src/variants/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ use crate::{

use polkadot_node_core_candidate_validation::find_validation_data;
use polkadot_node_primitives::{InvalidCandidate, ValidationResult};
use polkadot_node_subsystem::{
messages::{CandidateValidationMessage, ValidationFailed},
overseer,
};

use polkadot_primitives::{
CandidateCommitments, CandidateDescriptor, CandidateReceipt, PersistedValidationData,
Expand Down
17 changes: 6 additions & 11 deletions polkadot/node/malus/src/variants/dispute_finalized_candidates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@
use futures::channel::oneshot;
use polkadot_cli::{
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, ExtendedOverseerGenArgs,
HeaderBackend, Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle,
ParachainHost, ProvideRuntimeApi,
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_subsystem::{messages::ApprovalVotingMessage, SpawnGlue};
use polkadot_node_subsystem_types::{DefaultSubsystemClient, OverseerSignal};
use polkadot_node_subsystem::SpawnGlue;
use polkadot_node_subsystem_types::{ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient};
use polkadot_node_subsystem_util::request_candidate_events;
use polkadot_primitives::CandidateEvent;
use sp_core::traits::SpawnNamed;
Expand Down Expand Up @@ -237,13 +236,9 @@ impl OverseerGen for DisputeFinalizedCandidates {
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
Error,
>
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
gum::info!(
Expand Down
Loading

0 comments on commit 7ec0b87

Please sign in to comment.