diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index af7cefb97..fe08aaac1 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -18,6 +18,7 @@ use std::time::{Duration, Instant}; use futures::{Future, FutureExt}; use metrics::counter; use restate_types::config::CommonOptions; +use tokio::runtime::RuntimeMetrics; use tokio::task::JoinHandle; use tokio::task_local; use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; @@ -142,6 +143,10 @@ pub struct TaskCenter { static_assertions::assert_impl_all!(TaskCenter: Send, Sync, Clone); impl TaskCenter { + pub fn default_runtime_metrics(&self) -> RuntimeMetrics { + self.inner.default_runtime_handle.metrics() + } + /// Use to monitor an on-going shutdown when requested pub fn watch_shutdown(&self) -> WaitForCancellationFutureOwned { self.inner.global_cancel_token.clone().cancelled_owned() diff --git a/crates/node/src/network_server/handler/mod.rs b/crates/node/src/network_server/handler/mod.rs index ce3086a2d..02e375b0d 100644 --- a/crates/node/src/network_server/handler/mod.rs +++ b/crates/node/src/network_server/handler/mod.rs @@ -25,6 +25,8 @@ use crate::network_server::prometheus_helpers::{ }; use crate::network_server::state::NodeCtrlHandlerState; +use super::prometheus_helpers::submit_tokio_metrics; + const ROCKSDB_TICKERS: &[Ticker] = &[ Ticker::BlockCacheBytesRead, Ticker::BlockCacheBytesWrite, @@ -176,6 +178,9 @@ pub async fn render_metrics(State(state): State) -> String let default_cf = CfName::new("default"); let mut out = String::new(); + // Default tokio runtime metrics + submit_tokio_metrics("default", state.task_center.default_runtime_metrics()); + // Response content type is plain/text and that's expected. if let Some(prometheus_handle) = state.prometheus_handle { // Internal system metrics diff --git a/crates/node/src/network_server/prometheus_helpers.rs b/crates/node/src/network_server/prometheus_helpers.rs index 11f249929..d81e9588a 100644 --- a/crates/node/src/network_server/prometheus_helpers.rs +++ b/crates/node/src/network_server/prometheus_helpers.rs @@ -10,9 +10,11 @@ use std::fmt::Write; +use metrics::gauge; use metrics_exporter_prometheus::formatting; use restate_rocksdb::RocksDb; use rocksdb::statistics::{HistogramData, Ticker}; +use tokio::runtime::RuntimeMetrics; static PREFIX: &str = "restate"; @@ -167,3 +169,35 @@ pub fn format_rocksdb_histogram_for_prometheus( ); let _ = writeln!(out); } + +pub fn submit_tokio_metrics(runtime: &'static str, stats: RuntimeMetrics) { + gauge!("restate.tokio.num_workers", "runtime" => runtime).set(stats.num_workers() as f64); + gauge!("restate.tokio.blocking_threads", "runtime" => runtime) + .set(stats.num_blocking_threads() as f64); + gauge!("restate.tokio.blocking_queue_depth", "runtime" => runtime) + .set(stats.blocking_queue_depth() as f64); + gauge!("restate.tokio.active_tasks_count", "runtime" => runtime) + .set(stats.active_tasks_count() as f64); + gauge!("restate.tokio.io_driver_ready_count", "runtime" => runtime) + .set(stats.io_driver_ready_count() as f64); + gauge!("restate.tokio.remote_schedule_count", "runtime" => runtime) + .set(stats.remote_schedule_count() as f64); + // per worker stats + for idx in 0..stats.num_workers() { + gauge!("restate.tokio.worker_overflow_count", "runtime" => runtime, "worker" => + idx.to_string()) + .set(stats.worker_overflow_count(idx) as f64); + gauge!("restate.tokio.worker_poll_count", "runtime" => runtime, "worker" => idx.to_string()) + .set(stats.worker_poll_count(idx) as f64); + gauge!("restate.tokio.worker_park_count", "runtime" => runtime, "worker" => idx.to_string()) + .set(stats.worker_park_count(idx) as f64); + gauge!("restate.tokio.worker_noop_count", "runtime" => runtime, "worker" => idx.to_string()) + .set(stats.worker_noop_count(idx) as f64); + gauge!("restate.tokio.worker_steal_count", "runtime" => runtime, "worker" => idx.to_string()) + .set(stats.worker_steal_count(idx) as f64); + gauge!("restate.tokio.worker_total_busy_duration_seconds", "runtime" => runtime, "worker" => idx.to_string()) + .set(stats.worker_total_busy_duration(idx).as_secs_f64()); + gauge!("restate.tokio.worker_mean_poll_time", "runtime" => runtime, "worker" => idx.to_string()) + .set(stats.worker_mean_poll_time(idx).as_secs_f64()); + } +} diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index c45991f1b..87465a51b 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -51,8 +51,10 @@ impl NetworkServer { } pub async fn run(self, options: CommonOptions) -> Result<(), anyhow::Error> { + let tc = task_center(); // Configure Metric Exporter let mut state_builder = NodeCtrlHandlerStateBuilder::default(); + state_builder.task_center(tc.clone()); if !options.disable_prometheus { state_builder.prometheus_handle(Some(install_global_prometheus_recorder(&options))); @@ -89,7 +91,7 @@ impl NetworkServer { let server_builder = tonic::transport::Server::builder() .layer(TraceLayer::new_for_grpc().make_span_with(span_factory)) .add_service(NodeSvcServer::new(NodeSvcHandler::new( - task_center(), + tc, self.worker_deps, self.connection_manager, ))) diff --git a/crates/node/src/network_server/state.rs b/crates/node/src/network_server/state.rs index ab9cf4f38..b6640365b 100644 --- a/crates/node/src/network_server/state.rs +++ b/crates/node/src/network_server/state.rs @@ -9,9 +9,11 @@ // by the Apache License, Version 2.0. use metrics_exporter_prometheus::PrometheusHandle; +use restate_core::TaskCenter; #[derive(Clone, derive_builder::Builder)] pub struct NodeCtrlHandlerState { #[builder(default)] pub prometheus_handle: Option, + pub task_center: TaskCenter, }