Skip to content

Commit

Permalink
Reactive syncing metrics (#5410)
Browse files Browse the repository at this point in the history
This PR untangles syncing metrics and makes them reactive, the way
metrics are supposed to be in general.

Syncing metrics were bundled in a way that caused coupling across
multiple layers: justifications metrics were defined and managed by
`ChainSync`, but only updated periodically on tick in `SyncingEngine`,
while actual values were queried from `ExtraRequests`. This convoluted
architecture was hard to follow when I was looking into
#5333.

Now metrics that correspond to each component are owned by that
component and updated as changes are made instead of on tick every
1100ms.

This does add some annoying boilerplate that is a bit harder to
maintain, but it separates metrics more nicely and if someone queries
them more frequently will give arbitrary resolution. Since metrics
updates are just atomic operations I do not expect any performance
impact of these changes.

Will add prdoc if changes look good otherwise.

P.S. I noticed that importing requests (and corresponding metrics) were
not cleared ever since corresponding code was introduced in
dc41558#r145518721
and I left it as is to not change the behavior, but it might be
something worth fixing.

cc @dmitry-markin

---------

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
  • Loading branch information
nazar-pc and dmitry-markin authored Aug 23, 2024
1 parent e4ffba6 commit 4057ccd
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 105 deletions.
11 changes: 11 additions & 0 deletions prdoc/pr_5410.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
title: Reactive syncing metrics

doc:
- audience: Node Dev
description: |
Syncing metrics are now updated immediate as changes happen rather than every 1100ms as it was happening before.
This resulted in minor, but breaking API changes.

crates:
- name: sc-network-sync
bump: major
28 changes: 13 additions & 15 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,6 @@ where
))
}

/// Report Prometheus metrics.
pub fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
metrics.peers.set(n);
}
self.strategy.report_metrics();
}

fn update_peer_info(
&mut self,
peer_id: &PeerId,
Expand Down Expand Up @@ -606,7 +597,11 @@ where
pub async fn run(mut self) {
loop {
tokio::select! {
_ = self.tick_timeout.tick() => self.perform_periodic_actions(),
_ = self.tick_timeout.tick() => {
// TODO: This tick should not be necessary, but
// `self.process_strategy_actions()` is not called in some cases otherwise and
// some tests fail because of this
},
command = self.service_rx.select_next_some() =>
self.process_service_command(command),
notification_event = self.notification_service.next_event() => match notification_event {
Expand Down Expand Up @@ -724,10 +719,6 @@ where
Ok(())
}

fn perform_periodic_actions(&mut self) {
self.report_metrics();
}

fn process_service_command(&mut self, command: ToServiceCommand<B>) {
match command {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
Expand Down Expand Up @@ -873,6 +864,9 @@ where
log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
return
};
if let Some(metrics) = &self.metrics {
metrics.peers.dec();
}

if self.important_peers.contains(&peer_id) {
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
Expand Down Expand Up @@ -1048,7 +1042,11 @@ where

log::debug!(target: LOG_TARGET, "Connected {peer_id}");

self.peers.insert(peer_id, peer);
if self.peers.insert(peer_id, peer).is_none() {
if let Some(metrics) = &self.metrics {
metrics.peers.inc();
}
}
self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());

if self.default_peers_set_no_slot_peers.contains(&peer_id) {
Expand Down
128 changes: 109 additions & 19 deletions substrate/client/network/sync/src/justification_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
//! that don't make sense after one of the forks is finalized).
use crate::{
request_metrics::Metrics,
strategy::chain_sync::{PeerSync, PeerSyncState},
LOG_TARGET,
};
use fork_tree::ForkTree;
use log::{debug, trace, warn};
use prometheus_endpoint::{
prometheus::core::GenericGauge, register, GaugeVec, Opts, PrometheusError, Registry, U64,
};
use sc_network_types::PeerId;
use sp_blockchain::Error as ClientError;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
Expand All @@ -41,6 +43,34 @@ const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);
/// Pending extra data request for the given block (hash and number).
type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);

#[derive(Debug)]
struct Metrics {
pending: GenericGauge<U64>,
active: GenericGauge<U64>,
failed: GenericGauge<U64>,
importing: GenericGauge<U64>,
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
let justifications = GaugeVec::<U64>::new(
Opts::new(
"substrate_sync_extra_justifications",
"Number of extra justifications requests",
),
&["status"],
)?;
let justifications = register(justifications, registry)?;

Ok(Self {
pending: justifications.with_label_values(&["pending"]),
active: justifications.with_label_values(&["active"]),
failed: justifications.with_label_values(&["failed"]),
importing: justifications.with_label_values(&["importing"]),
})
}
}

/// Manages pending block extra data (e.g. justification) requests.
///
/// Multiple extras may be requested for competing forks, or for the same branch
Expand All @@ -62,10 +92,14 @@ pub(crate) struct ExtraRequests<B: BlockT> {
importing_requests: HashSet<ExtraRequest<B>>,
/// the name of this type of extra request (useful for logging.)
request_type_name: &'static str,
metrics: Option<Metrics>,
}

impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn new(request_type_name: &'static str) -> Self {
pub(crate) fn new(
request_type_name: &'static str,
metrics_registry: Option<&Registry>,
) -> Self {
Self {
tree: ForkTree::new(),
best_seen_finalized_number: Zero::zero(),
Expand All @@ -74,6 +108,16 @@ impl<B: BlockT> ExtraRequests<B> {
failed_requests: HashMap::new(),
importing_requests: HashSet::new(),
request_type_name,
metrics: metrics_registry.and_then(|registry| {
Metrics::register(registry)
.inspect_err(|error| {
log::error!(
target: LOG_TARGET,
"Failed to register `ExtraRequests` metrics {error}",
);
})
.ok()
}),
}
}

Expand All @@ -83,6 +127,12 @@ impl<B: BlockT> ExtraRequests<B> {
self.pending_requests.clear();
self.active_requests.clear();
self.failed_requests.clear();

if let Some(metrics) = &self.metrics {
metrics.pending.set(0);
metrics.active.set(0);
metrics.failed.set(0);
}
}

/// Returns an iterator-like struct that yields peers which extra
Expand All @@ -100,6 +150,9 @@ impl<B: BlockT> ExtraRequests<B> {
Ok(true) => {
// this is a new root so we add it to the current `pending_requests`
self.pending_requests.push_back((request.0, request.1));
if let Some(metrics) = &self.metrics {
metrics.pending.inc();
}
},
Err(fork_tree::Error::Revert) => {
// we have finalized further than the given request, presumably
Expand All @@ -117,6 +170,10 @@ impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn peer_disconnected(&mut self, who: &PeerId) {
if let Some(request) = self.active_requests.remove(who) {
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.active.dec();
metrics.pending.inc();
}
}
}

Expand All @@ -130,13 +187,21 @@ impl<B: BlockT> ExtraRequests<B> {
// currently enforced by the outer network protocol before passing on
// messages to chain sync.
if let Some(request) = self.active_requests.remove(&who) {
if let Some(metrics) = &self.metrics {
metrics.active.dec();
}

if let Some(r) = resp {
trace!(target: LOG_TARGET,
"Queuing import of {} from {:?} for {:?}",
self.request_type_name, who, request,
);

self.importing_requests.insert(request);
if self.importing_requests.insert(request) {
if let Some(metrics) = &self.metrics {
metrics.importing.inc();
}
}
return Some((who, request.0, request.1, r))
} else {
trace!(target: LOG_TARGET,
Expand All @@ -146,6 +211,10 @@ impl<B: BlockT> ExtraRequests<B> {
}
self.failed_requests.entry(request).or_default().push((who, Instant::now()));
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
metrics.pending.inc();
}
} else {
trace!(target: LOG_TARGET,
"No active {} request to {:?}",
Expand Down Expand Up @@ -194,6 +263,11 @@ impl<B: BlockT> ExtraRequests<B> {
self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
if let Some(metrics) = &self.metrics {
metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
metrics.active.set(self.active_requests.len().try_into().unwrap_or(u64::MAX));
metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
}

Ok(())
}
Expand All @@ -210,12 +284,18 @@ impl<B: BlockT> ExtraRequests<B> {
if !self.importing_requests.remove(&request) {
return false
}
if let Some(metrics) = &self.metrics {
metrics.importing.dec();
}

let (finalized_hash, finalized_number) = match result {
Ok(req) => (req.0, req.1),
Err(_) => {
if reschedule_on_failure {
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.pending.inc();
}
}
return true
},
Expand All @@ -233,6 +313,11 @@ impl<B: BlockT> ExtraRequests<B> {
self.active_requests.clear();
self.pending_requests.clear();
self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n)));
if let Some(metrics) = &self.metrics {
metrics.failed.set(0);
metrics.active.set(0);
metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
}
self.best_seen_finalized_number = finalized_number;

true
Expand All @@ -249,16 +334,6 @@ impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
self.pending_requests.iter()
}

/// Get some key metrics.
pub(crate) fn metrics(&self) -> Metrics {
Metrics {
pending_requests: self.pending_requests.len().try_into().unwrap_or(std::u32::MAX),
active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX),
failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX),
importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX),
}
}
}

/// Matches peers with pending extra requests.
Expand Down Expand Up @@ -301,8 +376,17 @@ impl<'a, B: BlockT> Matcher<'a, B> {
for requests in self.extras.failed_requests.values_mut() {
requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT);
}
if let Some(metrics) = &self.extras.metrics {
metrics
.failed
.set(self.extras.failed_requests.len().try_into().unwrap_or(u64::MAX));
}

while let Some(request) = self.extras.pending_requests.pop_front() {
if let Some(metrics) = &self.extras.metrics {
metrics.pending.dec();
}

for (peer, sync) in
peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available)
{
Expand All @@ -326,6 +410,9 @@ impl<'a, B: BlockT> Matcher<'a, B> {
continue
}
self.extras.active_requests.insert(*peer, request);
if let Some(metrics) = &self.extras.metrics {
metrics.active.inc();
}

trace!(target: LOG_TARGET,
"Sending {} request to {:?} for {:?}",
Expand All @@ -336,6 +423,9 @@ impl<'a, B: BlockT> Matcher<'a, B> {
}

self.extras.pending_requests.push_back(request);
if let Some(metrics) = &self.extras.metrics {
metrics.pending.inc();
}
self.remaining -= 1;

if self.remaining == 0 {
Expand All @@ -359,7 +449,7 @@ mod tests {
#[test]
fn requests_are_processed_in_order() {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
Expand All @@ -385,7 +475,7 @@ mod tests {
#[test]
fn new_roots_schedule_new_request() {
fn property(data: Vec<BlockNumber>) {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);
for (i, number) in data.into_iter().enumerate() {
let hash = [i as u8; 32].into();
let pending = requests.pending_requests.len();
Expand All @@ -402,7 +492,7 @@ mod tests {
#[test]
fn disconnecting_implies_rescheduling() {
fn property(mut peers: ArbitraryPeers) -> bool {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
Expand Down Expand Up @@ -438,7 +528,7 @@ mod tests {
#[test]
fn no_response_reschedules() {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
Expand Down Expand Up @@ -480,7 +570,7 @@ mod tests {
fn request_is_rescheduled_when_earlier_block_is_finalized() {
sp_tracing::try_init_simple();

let mut finality_proofs = ExtraRequests::<Block>::new("test");
let mut finality_proofs = ExtraRequests::<Block>::new("test", None);

let hash4 = [4; 32].into();
let hash5 = [5; 32].into();
Expand Down Expand Up @@ -521,7 +611,7 @@ mod tests {

#[test]
fn ancestor_roots_are_finalized_when_finality_notification_is_missed() {
let mut finality_proofs = ExtraRequests::<Block>::new("test");
let mut finality_proofs = ExtraRequests::<Block>::new("test", None);

let hash4 = [4; 32].into();
let hash5 = [5; 32].into();
Expand Down
Loading

0 comments on commit 4057ccd

Please sign in to comment.