From 44998291834b3a36a11c8dede63ce60362f1a3a5 Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Thu, 30 Mar 2023 14:59:58 +0300 Subject: [PATCH] Attempt to relieve pressure on `mpsc_network_worker` (#13725) * Attempt to relieve pressure on `mpsc_network_worker` `SyncingEngine` interacting with `NetworkWorker` can put a lot of strain on the channel if the number of inbound connections is high. This is because `SyncingEngine` is notified of each inbound substream which it then can either accept or reject and this causes a lot of message exchange on the already busy channel. Use a direct channel pair between `Protocol` and `SyncingEngine` to exchange notification events. It is a temporary change to alleviate the problems caused by syncing being an independent protocol and the fix will be removed once `NotificationService` is implemented. * Apply review comments * fixes * trigger ci * Fix tests Verify that both peers have a connection now that the validation goes through `SyncingEngine`. Depending on how the tasks are scheduled, one of them might not have the peer registered in `SyncingEngine` at which point the test won't make any progress because block announcement received from an unknown peer is discarded. Move polling of `ChainSync` at the end of the function so that if a block announcement causes a block request to be sent, that can be sent in the same call to `SyncingEngine::poll()`. --------- Co-authored-by: parity-processbot <> --- Cargo.lock | 1 + client/network/src/config.rs | 11 +- client/network/src/event.rs | 47 ++++++- client/network/src/lib.rs | 6 +- client/network/src/protocol.rs | 108 +++++++++++++--- client/network/src/service.rs | 6 +- client/network/sync/src/engine.rs | 199 +++++++++++++---------------- client/network/test/Cargo.toml | 1 + client/network/test/src/lib.rs | 12 +- client/network/test/src/service.rs | 6 +- client/network/test/src/sync.rs | 2 +- client/service/src/builder.rs | 13 +- 12 files changed, 259 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a49f8fc4208a2..7fd0122f3cc0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9128,6 +9128,7 @@ dependencies = [ "sc-network-light", "sc-network-sync", "sc-service", + "sc-utils", "sp-blockchain", "sp-consensus", "sp-consensus-babe", diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 3c6801f22a35c..781ae9c786694 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -22,6 +22,7 @@ //! See the documentation of [`Params`]. pub use crate::{ + protocol::NotificationsSink, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, @@ -31,7 +32,12 @@ pub use crate::{ use codec::Encode; use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; -pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT}; +pub use sc_network_common::{ + role::{Role, Roles}, + sync::warp::WarpSyncProvider, + ExHashT, +}; +use sc_utils::mpsc::TracingUnboundedSender; use zeroize::Zeroize; use sp_runtime::traits::Block as BlockT; @@ -714,6 +720,9 @@ pub struct Params { /// Block announce protocol configuration pub block_announce_config: NonDefaultSetConfig, + /// TX channel for direct communication with `SyncingEngine` and `Protocol`. + pub tx: TracingUnboundedSender>, + /// Request response protocol configurations pub request_response_protocol_configs: Vec, } diff --git a/client/network/src/event.rs b/client/network/src/event.rs index 3ecd8f9311429..975fde0e40a28 100644 --- a/client/network/src/event.rs +++ b/client/network/src/event.rs @@ -19,12 +19,14 @@ //! Network event types. These are are not the part of the protocol, but rather //! events that happen on the network like DHT get/put results received. -use crate::types::ProtocolName; +use crate::{types::ProtocolName, NotificationsSink}; use bytes::Bytes; +use futures::channel::oneshot; use libp2p::{core::PeerId, kad::record::Key}; -use sc_network_common::role::ObservedRole; +use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake}; +use sp_runtime::traits::Block as BlockT; /// Events generated by DHT as a response to get_value and put_value requests. #[derive(Debug, Clone)] @@ -90,3 +92,44 @@ pub enum Event { messages: Vec<(ProtocolName, Bytes)>, }, } + +/// Event sent to `SyncingEngine` +// TODO: remove once `NotificationService` is implemented. +pub enum SyncEvent { + /// Opened a substream with the given node with the given notifications protocol. + /// + /// The protocol is always one of the notification protocols that have been registered. + NotificationStreamOpened { + /// Node we opened the substream with. + remote: PeerId, + /// Received handshake. + received_handshake: BlockAnnouncesHandshake, + /// Notification sink. + sink: NotificationsSink, + /// Channel for reporting accept/reject of the substream. + tx: oneshot::Sender, + }, + + /// Closed a substream with the given node. Always matches a corresponding previous + /// `NotificationStreamOpened` message. + NotificationStreamClosed { + /// Node we closed the substream with. + remote: PeerId, + }, + + /// Notification sink was replaced. + NotificationSinkReplaced { + /// Node we closed the substream with. + remote: PeerId, + /// Notification sink. + sink: NotificationsSink, + }, + + /// Received one or more messages from the given node using the given protocol. + NotificationsReceived { + /// Node we received the message from. + remote: PeerId, + /// Concerned protocol and associated message. + messages: Vec, + }, +} diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index c290f4b94db53..5374ac13435be 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -259,7 +259,7 @@ pub mod request_responses; pub mod types; pub mod utils; -pub use event::{DhtEvent, Event}; +pub use event::{DhtEvent, Event, SyncEvent}; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig}; @@ -278,8 +278,8 @@ pub use service::{ NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT, NotificationSenderError, NotificationSenderReady, }, - DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure, - PublicKey, + DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink, + OutboundFailure, PublicKey, }; pub use types::ProtocolName; diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 06ca02c0ca8d5..a7e6f36ef6215 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -24,6 +24,7 @@ use crate::{ use bytes::Bytes; use codec::{DecodeAll, Encode}; +use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; use libp2p::{ core::connection::ConnectionId, swarm::{ @@ -35,11 +36,14 @@ use libp2p::{ use log::{debug, error, warn}; use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake}; +use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; use std::{ collections::{HashMap, HashSet, VecDeque}, + future::Future, iter, + pin::Pin, task::Poll, }; @@ -68,6 +72,9 @@ mod rep { pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); } +type PendingSyncSubstreamValidation = + Pin> + Send>>; + // Lock must always be taken in order declared here. pub struct Protocol { /// Pending list of messages to return from `poll` as a priority. @@ -87,6 +94,8 @@ pub struct Protocol { bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>, /// Connected peers. peers: HashMap, + sync_substream_validations: FuturesUnordered, + tx: TracingUnboundedSender>, _marker: std::marker::PhantomData, } @@ -96,6 +105,7 @@ impl Protocol { roles: Roles, network_config: &config::NetworkConfiguration, block_announces_protocol: config::NonDefaultSetConfig, + tx: TracingUnboundedSender>, ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let mut known_addresses = Vec::new(); @@ -179,6 +189,8 @@ impl Protocol { .collect(), bad_handshake_substreams: Default::default(), peers: HashMap::new(), + sync_substream_validations: FuturesUnordered::new(), + tx, // TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol` _marker: Default::default(), }; @@ -418,6 +430,23 @@ impl NetworkBehaviour for Protocol { return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }), }; + while let Poll::Ready(Some(validation_result)) = + self.sync_substream_validations.poll_next_unpin(cx) + { + match validation_result { + Ok((peer, roles)) => { + self.peers.insert(peer, roles); + }, + Err(peer) => { + log::debug!( + target: "sub-libp2p", + "`SyncingEngine` rejected stream" + ); + self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); + }, + } + } + let outcome = match event { NotificationsOut::CustomProtocolOpen { peer_id, @@ -440,16 +469,29 @@ impl NetworkBehaviour for Protocol { best_hash: handshake.best_hash, genesis_hash: handshake.genesis_hash, }; - self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)].clone(), - negotiated_fallback, - received_handshake: handshake.encode(), - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake: handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + + CustomMessageOutcome::None }, Ok(msg) => { debug!( @@ -469,15 +511,27 @@ impl NetworkBehaviour for Protocol { let roles = handshake.roles; self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)] - .clone(), - negotiated_fallback, - received_handshake, - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake: handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + CustomMessageOutcome::None }, Err(err2) => { log::debug!( @@ -535,6 +589,12 @@ impl NetworkBehaviour for Protocol { NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } => if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced { + remote: peer_id, + sink: notifications_sink, + }); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamReplaced { remote: peer_id, @@ -548,6 +608,12 @@ impl NetworkBehaviour for Protocol { // handshake. The outer layers have never received an opening event about this // substream, and consequently shouldn't receive a closing event either. CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed { + remote: peer_id, + }); + self.peers.remove(&peer_id); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, @@ -558,6 +624,12 @@ impl NetworkBehaviour for Protocol { NotificationsOut::Notification { peer_id, set_id, message } => { if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived { + remote: peer_id, + messages: vec![message.freeze()], + }); + CustomMessageOutcome::None } else { let protocol_name = self.notification_protocols[usize::from(set_id)].clone(); CustomMessageOutcome::NotificationsReceived { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 5f60f51321b42..9708b24d29b52 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -36,7 +36,7 @@ use crate::{ network_state::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, - protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready}, + protocol::{self, NotifsHandlerError, Protocol, Ready}, request_responses::{IfDisconnected, RequestFailure}, service::{ signature::{Signature, SigningError}, @@ -91,6 +91,7 @@ use std::{ pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; +pub use protocol::NotificationsSink; mod metrics; mod out_events; @@ -146,7 +147,7 @@ where /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(mut params: Params) -> Result { + pub fn new(mut params: Params) -> Result { // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); @@ -227,6 +228,7 @@ where From::from(¶ms.role), ¶ms.network_config, params.block_announce_config, + params.tx, )?; // List of multiaddresses that we know in the network. diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index b4c1a2ed05bb0..e3d45a980a0b4 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -24,8 +24,8 @@ use crate::{ ChainSync, ClientError, SyncingService, }; -use codec::{Decode, DecodeAll, Encode}; -use futures::{FutureExt, Stream, StreamExt}; +use codec::{Decode, Encode}; +use futures::{FutureExt, StreamExt}; use futures_timer::Delay; use libp2p::PeerId; use lru::LruCache; @@ -39,9 +39,8 @@ use sc_network::{ config::{ NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode, }, - event::Event, utils::LruHashSet, - ProtocolName, + NotificationsSink, ProtocolName, }; use sc_network_common::{ role::Roles, @@ -63,7 +62,6 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; use std::{ collections::{HashMap, HashSet}, num::NonZeroUsize, - pin::Pin, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -79,8 +77,6 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead mod rep { use sc_peerset::ReputationChange as Rep; - /// We received a message that failed to decode. - pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); /// Peer has different genesis. pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); /// Peer send us a block announcement that failed at validation. @@ -162,6 +158,8 @@ pub struct Peer { pub info: ExtendedPeerInfo, /// Holds a set of blocks known to this peer. pub known_blocks: LruHashSet, + /// Notification sink. + sink: NotificationsSink, } pub struct SyncingEngine { @@ -184,6 +182,9 @@ pub struct SyncingEngine { /// Channel for receiving service commands service_rx: TracingUnboundedReceiver>, + /// Channel for receiving inbound connections from `Protocol`. + rx: sc_utils::mpsc::TracingUnboundedReceiver>, + /// Assigned roles. roles: Roles, @@ -254,6 +255,7 @@ where block_request_protocol_name: ProtocolName, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, + rx: sc_utils::mpsc::TracingUnboundedReceiver>, ) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> { let mode = match network_config.sync_mode { SyncOperationMode::Full => SyncMode::Full, @@ -347,6 +349,7 @@ where num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), service_rx, + rx, genesis_hash, important_peers, default_peers_set_no_slot_connected_peers: HashSet::new(), @@ -554,11 +557,7 @@ where data: Some(data.clone()), }; - self.network_service.write_notification( - *who, - self.block_announce_protocol_name.clone(), - message.encode(), - ); + peer.sink.send_sync_notification(message.encode()); } } } @@ -575,17 +574,13 @@ where ) } - pub async fn run(mut self, mut stream: Pin + Send>>) { + pub async fn run(mut self) { loop { - futures::future::poll_fn(|cx| self.poll(cx, &mut stream)).await; + futures::future::poll_fn(|cx| self.poll(cx)).await; } } - pub fn poll( - &mut self, - cx: &mut std::task::Context, - event_stream: &mut Pin + Send>>, - ) -> Poll<()> { + pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { self.num_connected.store(self.peers.len(), Ordering::Relaxed); self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); @@ -595,84 +590,6 @@ where self.tick_timeout.reset(TICK_TIMEOUT); } - while let Poll::Ready(Some(event)) = event_stream.poll_next_unpin(cx) { - match event { - Event::NotificationStreamOpened { - remote, protocol, received_handshake, .. - } => { - if protocol != self.block_announce_protocol_name { - continue - } - - match as DecodeAll>::decode_all( - &mut &received_handshake[..], - ) { - Ok(handshake) => { - if self.on_sync_peer_connected(remote, handshake).is_err() { - log::debug!( - target: "sync", - "Failed to register peer {remote:?}: {received_handshake:?}", - ); - } - }, - Err(err) => { - log::debug!( - target: "sync", - "Couldn't decode handshake sent by {}: {:?}: {}", - remote, - received_handshake, - err, - ); - self.network_service.report_peer(remote, rep::BAD_MESSAGE); - }, - } - }, - Event::NotificationStreamClosed { remote, protocol } => { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.on_sync_peer_disconnected(remote).is_err() { - log::trace!( - target: "sync", - "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", - remote - ); - } - }, - Event::NotificationsReceived { remote, messages } => { - for (protocol, message) in messages { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.peers.contains_key(&remote) { - if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { - self.push_block_announce_validation(remote, announce); - - // Make sure that the newly added block announce validation future - // was polled once to be registered in the task. - if let Poll::Ready(res) = - self.chain_sync.poll_block_announce_validation(cx) - { - self.process_block_announce_validation_result(res) - } - } else { - log::warn!(target: "sub-libp2p", "Failed to decode block announce"); - } - } else { - log::trace!( - target: "sync", - "Received sync for peer earlier refused by sync layer: {}", - remote - ); - } - } - }, - _ => {}, - } - } - while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { match event { ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { @@ -746,6 +663,70 @@ where } } + while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) { + match event { + sc_network::SyncEvent::NotificationStreamOpened { + remote, + received_handshake, + sink, + tx, + } => match self.on_sync_peer_connected(remote, &received_handshake, sink) { + Ok(()) => { + let _ = tx.send(true); + }, + Err(()) => { + log::debug!( + target: "sync", + "Failed to register peer {remote:?}: {received_handshake:?}", + ); + let _ = tx.send(false); + }, + }, + sc_network::SyncEvent::NotificationStreamClosed { remote } => { + if self.on_sync_peer_disconnected(remote).is_err() { + log::trace!( + target: "sync", + "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", + remote + ); + } + }, + sc_network::SyncEvent::NotificationsReceived { remote, messages } => { + for message in messages { + if self.peers.contains_key(&remote) { + if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { + self.push_block_announce_validation(remote, announce); + + // Make sure that the newly added block announce validation future + // was polled once to be registered in the task. + if let Poll::Ready(res) = + self.chain_sync.poll_block_announce_validation(cx) + { + self.process_block_announce_validation_result(res) + } + } else { + log::warn!(target: "sub-libp2p", "Failed to decode block announce"); + } + } else { + log::trace!( + target: "sync", + "Received sync for peer earlier refused by sync layer: {}", + remote + ); + } + } + }, + sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => { + if let Some(peer) = self.peers.get_mut(&remote) { + peer.sink = sink; + } + }, + } + } + + // poll `ChainSync` last because of a block announcement was received through the + // event stream between `SyncingEngine` and `Protocol` and the validation finished + // right after it as queued, the resulting block request (if any) can be sent right away. while let Poll::Ready(result) = self.chain_sync.poll(cx) { self.process_block_announce_validation_result(result); } @@ -757,13 +738,13 @@ where /// /// Returns a result if the handshake of this peer was indeed accepted. pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { - if self.important_peers.contains(&peer) { - log::warn!(target: "sync", "Reserved peer {} disconnected", peer); - } else { - log::debug!(target: "sync", "{} disconnected", peer); - } - if self.peers.remove(&peer).is_some() { + if self.important_peers.contains(&peer) { + log::warn!(target: "sync", "Reserved peer {} disconnected", peer); + } else { + log::debug!(target: "sync", "{} disconnected", peer); + } + self.chain_sync.peer_disconnected(&peer); self.default_peers_set_no_slot_connected_peers.remove(&peer); self.event_streams @@ -782,7 +763,8 @@ where pub fn on_sync_peer_connected( &mut self, who: PeerId, - status: BlockAnnouncesHandshake, + status: &BlockAnnouncesHandshake, + sink: NotificationsSink, ) -> Result<(), ()> { log::trace!(target: "sync", "New peer {} {:?}", who, status); @@ -794,8 +776,6 @@ where if status.genesis_hash != self.genesis_hash { self.network_service.report_peer(who, rep::GENESIS_MISMATCH); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); if self.important_peers.contains(&who) { log::error!( @@ -834,8 +814,6 @@ where this_peer_reserved_slot { log::debug!(target: "sync", "Too many full nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -844,8 +822,6 @@ where { // Make sure that not all slots are occupied by light clients. log::debug!(target: "sync", "Too many light nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -858,14 +834,13 @@ where known_blocks: LruHashSet::new( NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"), ), + sink, }; let req = if peer.info.roles.is_full() { match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { Ok(req) => req, Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); return Err(()) }, diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 8368fa278712a..9763feed5ea54 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -26,6 +26,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-network = { version = "0.10.0-dev", path = "../" } sc-network-common = { version = "0.10.0-dev", path = "../common" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sc-network-light = { version = "0.10.0-dev", path = "../light" } sc-network-sync = { version = "0.10.0-dev", path = "../sync" } sc-service = { version = "0.10.0-dev", default-features = false, features = ["test-helpers"], path = "../../service" } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 6a99304035ab8..f85d6ed63c247 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -55,8 +55,8 @@ use sc_network::{ }, request_responses::ProtocolConfig as RequestResponseConfig, types::ProtocolName, - Multiaddr, NetworkBlock, NetworkEventStream, NetworkService, NetworkStateInfo, - NetworkSyncForkRequest, NetworkWorker, + Multiaddr, NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest, + NetworkWorker, }; use sc_network_common::{ role::Roles, @@ -896,6 +896,7 @@ where let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, sync_service, block_announce_config) = sc_network_sync::engine::SyncingEngine::new( Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), @@ -911,6 +912,7 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), Some(warp_protocol_config.name.clone()), + rx, ) .unwrap(); let sync_service_import_queue = Box::new(sync_service.clone()); @@ -918,7 +920,7 @@ where let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); - let network = NetworkWorker::new::(sc_network::config::Params { + let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: Box::new(|f| { tokio::spawn(f); @@ -929,6 +931,7 @@ where fork_id, metrics_registry: None, block_announce_config, + tx, request_response_protocol_configs: [ block_request_protocol_config, state_request_protocol_config, @@ -950,9 +953,8 @@ where import_queue.run(sync_service_import_queue).await; }); - let service = network.service().clone(); tokio::spawn(async move { - engine.run(service.event_stream("syncing")).await; + engine.run().await; }); self.mut_peers(move |peers| { diff --git a/client/network/test/src/service.rs b/client/network/test/src/service.rs index 67915c38637ed..5871860a7c4a6 100644 --- a/client/network/test/src/service.rs +++ b/client/network/test/src/service.rs @@ -177,6 +177,7 @@ impl TestNetworkBuilder { let (chain_sync_network_provider, chain_sync_network_handle) = self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), @@ -192,6 +193,7 @@ impl TestNetworkBuilder { block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), None, + rx, ) .unwrap(); let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone())); @@ -217,6 +219,7 @@ impl TestNetworkBuilder { light_client_request_protocol_config, ] .to_vec(), + tx, }) .unwrap(); @@ -234,8 +237,7 @@ impl TestNetworkBuilder { tokio::time::sleep(std::time::Duration::from_millis(250)).await; } }); - let stream = worker.service().event_stream("syncing"); - tokio::spawn(engine.run(stream)); + tokio::spawn(engine.run()); TestNetwork::new(worker) } diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index d87b03fb3a78c..af46d15a2bacd 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -414,7 +414,7 @@ async fn can_sync_small_non_best_forks() { // poll until the two nodes connect, otherwise announcing the block will not work futures::future::poll_fn::<(), _>(|cx| { net.poll(cx); - if net.peer(0).num_peers() == 0 { + if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { Poll::Pending } else { Poll::Ready(()) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 7cbbf2a4dda0f..b04228e6bfc34 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -38,9 +38,7 @@ use sc_client_db::{Backend, DatabaseSettings}; use sc_consensus::import_queue::ImportQueue; use sc_executor::RuntimeVersionOf; use sc_keystore::LocalKeystore; -use sc_network::{ - config::SyncMode, NetworkEventStream, NetworkService, NetworkStateInfo, NetworkStatusProvider, -}; +use sc_network::{config::SyncMode, NetworkService, NetworkStateInfo, NetworkStatusProvider}; use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{role::Roles, sync::warp::WarpSyncParams}; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; @@ -825,6 +823,7 @@ where protocol_config }; + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (engine, sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config.role), @@ -840,6 +839,7 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), + rx, )?; let sync_service_import_queue = sync_service.clone(); let sync_service = Arc::new(sync_service); @@ -865,6 +865,7 @@ where fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, + tx, request_response_protocol_configs: request_response_protocol_configs .into_iter() .chain([ @@ -904,15 +905,13 @@ where )?; spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); - spawn_handle.spawn( + spawn_handle.spawn_blocking( "chain-sync-network-service-provider", Some("networking"), chain_sync_network_provider.run(network.clone()), ); spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue))); - - let event_stream = network.event_stream("syncing"); - spawn_handle.spawn("syncing", None, engine.run(event_stream)); + spawn_handle.spawn_blocking("syncing", None, engine.run()); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); spawn_handle.spawn(