Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Evict inactive peers from SyncingEngine (#13829)
Browse files Browse the repository at this point in the history
* Evict inactive peers from `SyncingEngine`

If both halves of the block announce notification stream have been
inactive for 2 minutes, report the peer and disconnect it, allowing
`SyncingEngine` to free up a slot for some other peer that hopefully
is more active.

This needs to be done because the node may falsely believe it has open
connections to peers because the inbound substream can be closed without
any notification and closed outbound substream is noticed only when node
attempts to write to it which may not happen if the node has nothing to
send.

* zzz

* wip

* Evict peers only when timeout expires

* Use `debug!()`

---------

Co-authored-by: parity-processbot <>
  • Loading branch information
altonen authored Apr 21, 2023
1 parent 9f0d54a commit e44b43e
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use std::{
Arc,
},
task::Poll,
time::{Duration, Instant},
};

/// Interval at which we perform time based maintenance
Expand All @@ -75,12 +76,19 @@ const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100)
/// Maximum number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead

/// If the block announces stream to peer has been inactive for two minutes meaning local node
/// has not sent or received block announcements to/from the peer, report the node for inactivity,
/// disconnect it and attempt to establish connection to some other peer.
const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30);

mod rep {
use sc_peerset::ReputationChange as Rep;
/// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer send us a block announcement that failed at validation.
pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
/// Block announce substream with the peer has been inactive too long
pub const INACTIVE_SUBSTREAM: Rep = Rep::new(-(1 << 10), "Inactive block announce substream");
}

struct Metrics {
Expand Down Expand Up @@ -160,6 +168,10 @@ pub struct Peer<B: BlockT> {
pub known_blocks: LruHashSet<B::Hash>,
/// Notification sink.
sink: NotificationsSink,
/// Instant when the last notification was sent to peer.
last_notification_sent: Instant,
/// Instant when the last notification was received from peer.
last_notification_received: Instant,
}

pub struct SyncingEngine<B: BlockT, Client> {
Expand Down Expand Up @@ -200,6 +212,9 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,

/// Evicted peers
evicted: HashSet<PeerId>,

/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
Expand Down Expand Up @@ -353,6 +368,7 @@ where
chain_sync,
network_service,
peers: HashMap::new(),
evicted: HashSet::new(),
block_announce_data_cache: LruCache::new(cache_capacity),
block_announce_protocol_name,
num_connected: num_connected.clone(),
Expand Down Expand Up @@ -516,6 +532,7 @@ where
},
};
peer.known_blocks.insert(hash);
peer.last_notification_received = Instant::now();

if peer.info.roles.is_full() {
let is_best = match announce.state.unwrap_or(BlockState::Best) {
Expand Down Expand Up @@ -566,6 +583,7 @@ where
data: Some(data.clone()),
};

peer.last_notification_sent = Instant::now();
peer.sink.send_sync_notification(message.encode());
}
}
Expand Down Expand Up @@ -596,6 +614,35 @@ where

while let Poll::Ready(()) = self.tick_timeout.poll_unpin(cx) {
self.report_metrics();

// go over all connected peers and check if any of them have been idle for a while. Idle
// in this case means that we haven't sent or received block announcements to/from this
// peer. If that is the case, because of #5685, it could be that the block announces
// substream is not actually open and and this peer is just wasting a slot and is should
// be replaced with some other node that is willing to send us block announcements.
for (id, peer) in self.peers.iter() {
// because of a delay between disconnecting a peer in `SyncingEngine` and getting
// the response back from `Protocol`, a peer might be reported and disconnect
// multiple times. To prevent this from happening (until the underlying issue is
// fixed), keep track of evicted peers and report and disconnect them only once.
if self.evicted.contains(id) {
continue
}

let last_received_late =
peer.last_notification_received.elapsed() > INACTIVITY_EVICT_THRESHOLD;
let last_sent_late =
peer.last_notification_sent.elapsed() > INACTIVITY_EVICT_THRESHOLD;

if last_received_late && last_sent_late {
log::debug!(target: "sync", "evict peer {id} since it has been idling for too long");
self.network_service.report_peer(*id, rep::INACTIVE_SUBSTREAM);
self.network_service
.disconnect_peer(*id, self.block_announce_protocol_name.clone());
self.evicted.insert(*id);
}
}

self.tick_timeout.reset(TICK_TIMEOUT);
}

Expand Down Expand Up @@ -692,6 +739,7 @@ where
},
},
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
self.evicted.remove(&remote);
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
Expand Down Expand Up @@ -844,6 +892,8 @@ where
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
),
sink,
last_notification_sent: Instant::now(),
last_notification_received: Instant::now(),
};

let req = if peer.info.roles.is_full() {
Expand Down

0 comments on commit e44b43e

Please sign in to comment.