diff --git a/crates/worker/src/metric_definitions.rs b/crates/worker/src/metric_definitions.rs index 8cfe3bdc3..42276aac1 100644 --- a/crates/worker/src/metric_definitions.rs +++ b/crates/worker/src/metric_definitions.rs @@ -10,7 +10,7 @@ /// Optional to have but adds description/help message to the metrics emitted to /// the metrics' sink. -use metrics::{describe_counter, Unit}; +use metrics::{describe_counter, describe_histogram, Unit}; pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.total"; pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total"; @@ -18,6 +18,15 @@ pub const PARTITION_TIMER_DUE_HANDLED: &str = "restate.partition.timer_due_handl pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_created.total"; pub const PARTITION_STORAGE_TX_COMMITTED: &str = "restate.partition.storage_tx_committed.total"; +pub const PP_LOG_READ_NEXT_DURATION: &str = "restate.partition.log_read_next_duration.seconds"; + +pub const PP_APPLY_RECORD_DURATION: &str = "restate.partition.apply_record_duration.seconds"; +pub const PP_WAIT_OR_IDLE_DURATION: &str = "restate.partition.wait_or_idle_duration.seconds"; +pub const PP_APPLY_EFFECTS_DURATION: &str = "restate.partition.apply_effects_duration.seconds"; +pub const PP_APPLY_TIMERS_DURATION: &str = "restate.partition.apply_timers_duration.seconds"; + +pub const PARTITION_LABEL: &str = "partition"; + pub(crate) fn describe_metrics() { describe_counter!( PARTITION_APPLY_COMMAND, @@ -44,4 +53,28 @@ pub(crate) fn describe_metrics() { Unit::Count, "Storage transactions committed by applying partition state machine commands" ); + describe_histogram!( + PP_LOG_READ_NEXT_DURATION, + Unit::Seconds, + "Time spent attempting to read the next record off bifrost, this is inclusive of wait time if not records are available to read"); + describe_histogram!( + PP_APPLY_RECORD_DURATION, + Unit::Seconds, + "Time spent processing a single bifrost message" + ); + describe_histogram!( + PP_WAIT_OR_IDLE_DURATION, + Unit::Seconds, + "Time spent since last activity on this partition processor" + ); + describe_histogram!( + PP_APPLY_EFFECTS_DURATION, + Unit::Seconds, + "Time spent applying effects in a single iteration" + ); + describe_histogram!( + PP_APPLY_TIMERS_DURATION, + Unit::Seconds, + "Time spent applying effects in a single iteration" + ); } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 905a19c0b..16dd73206 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -8,13 +8,17 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions::{PARTITION_ACTUATOR_HANDLED, PARTITION_TIMER_DUE_HANDLED}; +use crate::metric_definitions::{ + PARTITION_ACTUATOR_HANDLED, PARTITION_LABEL, PARTITION_TIMER_DUE_HANDLED, + PP_APPLY_EFFECTS_DURATION, PP_APPLY_RECORD_DURATION, PP_APPLY_TIMERS_DURATION, + PP_LOG_READ_NEXT_DURATION, PP_WAIT_OR_IDLE_DURATION, +}; use crate::partition::leadership::{ActionEffect, LeadershipState}; use crate::partition::state_machine::{ActionCollector, Effects, StateMachine}; use crate::partition::storage::{DedupSequenceNumberResolver, PartitionStorage, Transaction}; use assert2::let_assert; use futures::StreamExt; -use metrics::counter; +use metrics::{counter, histogram}; use restate_core::metadata; use restate_network::Networking; use restate_partition_store::{PartitionStore, RocksDBTransaction}; @@ -22,6 +26,7 @@ use restate_types::identifiers::{PartitionId, PartitionKey}; use std::fmt::Debug; use std::marker::PhantomData; use std::ops::RangeInclusive; +use std::time::Instant; use tracing::{debug, instrument, trace, Span}; mod action_effect_handler; @@ -132,10 +137,15 @@ where networking, ); + let mut cancellation = std::pin::pin!(cancellation_watcher()); + let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string())); loop { + let iteration_start = Instant::now(); tokio::select! { - _ = cancellation_watcher() => break, + _ = &mut cancellation => break, record = log_reader.read_next() => { + let command_start = Instant::now(); + histogram!(PP_WAIT_OR_IDLE_DURATION, PARTITION_LABEL => partition_id_str).record(iteration_start.elapsed()); let record = record?; trace!(lsn = %record.0, "Processing bifrost record for '{}': {:?}", record.1.command.name(), record.1.header); @@ -187,15 +197,22 @@ where transaction.commit().await?; state.handle_actions(action_collector.drain(..)).await?; } + histogram!(PP_APPLY_RECORD_DURATION, PARTITION_LABEL => partition_id_str).record(command_start.elapsed()); }, action_effect = action_effect_stream.next() => { + histogram!(PP_WAIT_OR_IDLE_DURATION, PARTITION_LABEL => partition_id_str).record(iteration_start.elapsed()); + let effect_start = Instant::now(); counter!(PARTITION_ACTUATOR_HANDLED).increment(1); let action_effect = action_effect.ok_or_else(|| anyhow::anyhow!("action effect stream is closed"))?; state.handle_action_effect(action_effect).await?; + histogram!(PP_APPLY_EFFECTS_DURATION).record(effect_start.elapsed()); }, timer = state.run_timer() => { + histogram!(PP_WAIT_OR_IDLE_DURATION, PARTITION_LABEL => partition_id_str).record(iteration_start.elapsed()); + let timer_start = Instant::now(); counter!(PARTITION_TIMER_DUE_HANDLED).increment(1); state.handle_action_effect(ActionEffect::Timer(timer)).await?; + histogram!(PP_APPLY_TIMERS_DURATION, PARTITION_LABEL => partition_id_str).record(timer_start.elapsed()); }, } } @@ -342,18 +359,24 @@ async fn is_outdated_or_duplicate( struct LogReader { log_reader: LogReadStream, + log_id: LogId, } impl LogReader { fn new(bifrost: &Bifrost, log_id: LogId, lsn: Lsn) -> Self { Self { log_reader: bifrost.create_reader(log_id, lsn), + log_id, } } async fn read_next(&mut self) -> anyhow::Result<(Lsn, Envelope)> { + let start = Instant::now(); let LogRecord { record, offset } = self.log_reader.read_next().await?; - Self::deserialize_record(record).map(|envelope| (offset, envelope)) + let res = Self::deserialize_record(record).map(|envelope| (offset, envelope)); + histogram!(PP_LOG_READ_NEXT_DURATION, "log_id" => self.log_id.to_string()) + .record(start.elapsed()); + res } #[allow(dead_code)]