Skip to content

Commit

Permalink
Add an async collector plugin for tc stats
Browse files Browse the repository at this point in the history
Tc stats is made best effort as netlink operations could be blocking.
  • Loading branch information
mmynk committed Apr 1, 2024
1 parent f62de30 commit 83f218f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 9 deletions.
20 changes: 11 additions & 9 deletions below/model/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub struct CollectorOptions {
pub cgroup_re: Option<Regex>,
pub gpu_stats_receiver:
Option<collector_plugin::Consumer<crate::gpu_stats_collector_plugin::SampleType>>,
pub tc_stats_receiver:
Option<collector_plugin::Consumer<crate::tc_collector_plugin::SampleType>>,
}

impl Default for CollectorOptions {
Expand All @@ -54,6 +56,7 @@ impl Default for CollectorOptions {
btrfs_min_pct: btrfs::DEFAULT_MIN_PCT,
cgroup_re: None,
gpu_stats_receiver: None,
tc_stats_receiver: None,
}
}
}
Expand Down Expand Up @@ -308,16 +311,15 @@ fn collect_sample(
}
}
},
tc: if !options.enable_tc_stats {
None
tc: if let Some(tc_stats_receiver) = &options.tc_stats_receiver {
Some(
tc_stats_receiver
.try_take()
.context("TC stats collector had an error")?
.unwrap_or_default(),
)
} else {
match tc::tc_stats() {
Ok(tc_stats) => Some(tc_stats),
Err(e) => {
error!(logger, "{:#}", e);
Default::default()
}
}
None
},
})
}
Expand Down
1 change: 1 addition & 0 deletions below/model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod sample;
mod sample_model;
pub mod system;
pub mod tc_model;
pub mod tc_collector_plugin;

open_source_shim!(pub);

Expand Down
35 changes: 35 additions & 0 deletions below/model/src/tc_collector_plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use anyhow::Result;
use async_trait::async_trait;
use slog::error;
use tc::TcStats;

use crate::collector_plugin::AsyncCollectorPlugin;

pub type SampleType = TcStats;

pub struct TcStatsCollectorPlugin {
logger: slog::Logger,
}

impl TcStatsCollectorPlugin {
pub fn new(logger: slog::Logger) -> Result<Self> {
Ok(Self { logger })
}
}

#[async_trait]
impl AsyncCollectorPlugin for TcStatsCollectorPlugin {
type T = TcStats;

async fn try_collect(&mut self) -> Result<Option<SampleType>> {
let stats = match tc::tc_stats() {
Ok(tc_stats) => Some(tc_stats),
Err(e) => {
error!(self.logger, "{:#}", e);
Default::default()
}
};

Ok(stats)
}
}
71 changes: 71 additions & 0 deletions below/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,70 @@ pub fn start_gpu_stats_thread_and_get_stats_receiver(
Ok(receiver)
}

fn start_tc_stats_thread_and_get_stats_receiver(
logger: slog::Logger,
interval: Duration,
) -> Result<model::collector_plugin::Consumer<model::tc_collector_plugin::SampleType>> {
let tc_collector = model::tc_collector_plugin::TcStatsCollectorPlugin::new(logger.clone())
.context("Failed to initialize TC stats collector")?;
let (mut collector, receiver) = model::collector_plugin::collector_consumer(tc_collector);

// Start a thread to collect TC stats
let target_interval = interval.clone();
thread::Builder::new()
.name("tc_stats_collector".to_owned())
.spawn(move || {
// Exponential backoff on unrecoverable errors
const EXP_BACKOFF_FACTOR: u32 = 2;
const MAX_BACKOFF_SECS: u64 = 900;
let max_backoff = Duration::from_secs(MAX_BACKOFF_SECS);
let mut interval = target_interval;
loop {
let collect_instant = Instant::now();
let rt = TB::new_current_thread()
.thread_name("create_fburl")
.build()
.expect("Failed to build tokio runtime.");
match rt.block_on(collector.collect_and_update()) {
Ok(_) => {
interval = target_interval;
}
Err(e) => {
interval = std::cmp::min(
interval
.saturating_mul(EXP_BACKOFF_FACTOR),
max_backoff,
);
error!(
logger,
"TC stats collection backing off {:?} because of unrecoverable error: {:?}",
interval,
e
);
}
}
let collect_duration = Instant::now().duration_since(collect_instant);

const COLLECT_DURATION_WARN_THRESHOLD: u64 = 2;
if collect_duration > Duration::from_secs(COLLECT_DURATION_WARN_THRESHOLD) {
warn!(
logger,
"TC collection took {} > {}",
collect_duration.as_secs_f64(),
COLLECT_DURATION_WARN_THRESHOLD
);
}
if interval > collect_duration {
let sleep_duration = interval - collect_duration;
std::thread::sleep(sleep_duration);
}
}
})
.expect("Failed to spawn thread");

Ok(receiver)
}

/// Returns true if other end disconnected, false otherwise
fn check_for_exitstat_errors(logger: &slog::Logger, receiver: &Receiver<Error>) -> bool {
// Print an error but don't exit on bpf issues. Do this b/c we can't always
Expand Down Expand Up @@ -1002,6 +1066,12 @@ fn record(
None
};

let tc_stats_receiver = if below_config.enable_tc_stats {
Some(start_tc_stats_thread_and_get_stats_receiver(logger.clone(), interval)?)
} else {
None
};

let mut collector = model::Collector::new(
logger.clone(),
model::CollectorOptions {
Expand All @@ -1017,6 +1087,7 @@ fn record(
btrfs_min_pct: below_config.btrfs_min_pct,
cgroup_re,
gpu_stats_receiver,
tc_stats_receiver,
},
);

Expand Down

0 comments on commit 83f218f

Please sign in to comment.