Skip to content

Commit

Permalink
feat(o11y): Configure opentelemetry filter at runtime (#7701)
Browse files Browse the repository at this point in the history
Enabling or disabling tracing at runtime can be useful to debug issues with nodes running too slow. This allows us to avoid restarting a node, which is known to have a rather large impact on the node.

Tested that all combinations of logs verbosity and opentelemetry verbosity work well together.
Tested by reconfiguring logging as follows:

```
echo '{"verbose_module":"", "opentelemetry_level": "DEBUG"}' > ~/.near/betanet/log_config.json ; killall -SIGHUP neard
```
  • Loading branch information
nikurt authored Sep 28, 2022
1 parent 8c2a379 commit c6f0059
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 67 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/o11y/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ opentelemetry.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry-semantic-conventions.workspace = true
prometheus.workspace = true
serde.workspace = true
strum.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
178 changes: 119 additions & 59 deletions core/o11y/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use opentelemetry::sdk::trace::{self, IdGenerator, Sampler, Tracer};
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::path::PathBuf;
use tracing::level_filters::LevelFilter;
use tracing::subscriber::DefaultGuard;
use tracing_appender::non_blocking::NonBlocking;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::filter::{Filtered, ParseError};
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::layer::{Layered, SubscriberExt};
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::reload::{Error, Handle};
use tracing_subscriber::{EnvFilter, Layer, Registry};
use tracing_subscriber::{fmt, reload, EnvFilter, Layer, Registry};

/// Custom tracing subscriber implementation that produces IO traces.
mod io_tracer;
Expand All @@ -41,16 +41,26 @@ macro_rules! io_trace {
($($fields:tt)*) => {};
}

static LOG_LAYER_RELOAD_HANDLE: OnceCell<
Handle<
Filtered<
tracing_subscriber::fmt::Layer<Registry, DefaultFields, Format, NonBlocking>,
EnvFilter,
Registry,
>,
Registry,
static LOG_LAYER_RELOAD_HANDLE: OnceCell<reload::Handle<EnvFilter, Registry>> = OnceCell::new();
static OTLP_LAYER_RELOAD_HANDLE: OnceCell<reload::Handle<LevelFilter, LogLayer<Registry>>> =
OnceCell::new();

type LogLayer<Inner> = Layered<
Filtered<
fmt::Layer<Inner, fmt::format::DefaultFields, fmt::format::Format, NonBlocking>,
reload::Layer<EnvFilter, Inner>,
Inner,
>,
> = OnceCell::new();
Inner,
>;

type TracingLayer<Inner> = Layered<
Filtered<OpenTelemetryLayer<Inner, Tracer>, reload::Layer<LevelFilter, Inner>, Inner>,
Inner,
>;

// Records the level of opentelemetry tracing verbosity configured via command-line flags at the startup.
static DEFAULT_OTLP_LEVEL: OnceCell<OpenTelemetryLevel> = OnceCell::new();

/// The default value for the `RUST_LOG` environment variable if one isn't specified otherwise.
pub const DEFAULT_RUST_LOG: &'static str = "tokio_reactor=info,\
Expand Down Expand Up @@ -78,15 +88,15 @@ pub struct DefaultSubscriberGuard<S> {
// other way around, the events/spans generated while the subscriber drop guard runs would be
// lost.
subscriber: Option<S>,
local_subscriber_guard: Option<tracing::subscriber::DefaultGuard>,
local_subscriber_guard: Option<DefaultGuard>,
#[allow(dead_code)] // This field is never read, but has semantic purpose as a drop guard.
writer_guard: tracing_appender::non_blocking::WorkerGuard,
#[allow(dead_code)] // This field is never read, but has semantic purpose as a drop guard.
io_trace_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}

// Doesn't define WARN and ERROR, because the highest verbosity of spans is INFO.
#[derive(Copy, Clone, Debug, clap::ArgEnum)]
#[derive(Copy, Clone, Debug, clap::ArgEnum, Serialize, Deserialize)]
pub enum OpenTelemetryLevel {
OFF,
INFO,
Expand Down Expand Up @@ -165,36 +175,41 @@ fn is_terminal() -> bool {
atty::is(atty::Stream::Stderr)
}

fn make_log_layer<S>(
fn add_log_layer<S>(
filter: EnvFilter,
writer: NonBlocking,
ansi: bool,
) -> Filtered<tracing_subscriber::fmt::Layer<S, DefaultFields, Format, NonBlocking>, EnvFilter, S>
subscriber: S,
) -> (LogLayer<S>, reload::Handle<EnvFilter, S>)
where
S: tracing::Subscriber + for<'span> LookupSpan<'span>,
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let layer = tracing_subscriber::fmt::layer()
let (filter, handle) = reload::Layer::<EnvFilter, S>::new(filter);

let layer = fmt::layer()
.with_ansi(ansi)
// Synthesizing ENTER and CLOSE events lets us log durations of spans to the log.
.with_span_events(
tracing_subscriber::fmt::format::FmtSpan::ENTER
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
)
.with_span_events(fmt::format::FmtSpan::ENTER | fmt::format::FmtSpan::CLOSE)
.with_writer(writer)
.with_filter(filter);
layer
let subscriber = subscriber.with(layer);
(subscriber, handle)
}

/// Constructs an OpenTelemetryConfig which sends span data to an external collector.
//
// NB: this function is `async` because `install_batch(Tokio)` requires a tokio context to
// register timers and channels and whatnot.
async fn make_opentelemetry_layer<S>(
config: &Options,
) -> Filtered<OpenTelemetryLayer<S, Tracer>, LevelFilter, S>
async fn add_opentelemetry_layer<S>(
opentelemetry_level: OpenTelemetryLevel,
subscriber: S,
) -> (TracingLayer<S>, reload::Handle<LevelFilter, S>)
where
S: tracing::Subscriber + for<'span> LookupSpan<'span>,
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let filter = get_opentelemetry_filter(opentelemetry_level);
let (filter, handle) = reload::Layer::<LevelFilter, S>::new(filter);

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
Expand All @@ -206,13 +221,13 @@ where
)
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let filter = get_opentelemetry_filter(config);
let layer = tracing_opentelemetry::layer().with_tracer(tracer).with_filter(filter);
layer
let subscriber = subscriber.with(layer);
(subscriber, handle)
}

fn get_opentelemetry_filter(config: &Options) -> LevelFilter {
match config.opentelemetry {
pub fn get_opentelemetry_filter(opentelemetry_level: OpenTelemetryLevel) -> LevelFilter {
match opentelemetry_level {
OpenTelemetryLevel::OFF => LevelFilter::OFF,
OpenTelemetryLevel::INFO => LevelFilter::INFO,
OpenTelemetryLevel::DEBUG => LevelFilter::DEBUG,
Expand Down Expand Up @@ -270,13 +285,20 @@ pub async fn default_subscriber(
ColorOutput::Auto => std::env::var_os("NO_COLOR").is_none() && is_terminal(),
};

let log_layer = make_log_layer(env_filter, writer, ansi);
let (log_layer, handle) = tracing_subscriber::reload::Layer::new(log_layer);
LOG_LAYER_RELOAD_HANDLE.set(handle).unwrap();

let subscriber = tracing_subscriber::registry();
let subscriber = subscriber.with(log_layer);
let subscriber = subscriber.with(make_opentelemetry_layer(options).await);
// Record the initial OTLP level specified as a command-line flag. Use this recorded value to
// reset opentelemetry filter when the LogConfig file gets deleted.
DEFAULT_OTLP_LEVEL.set(options.opentelemetry).unwrap();

let (subscriber, handle) = add_log_layer(env_filter, writer, ansi, subscriber);
LOG_LAYER_RELOAD_HANDLE
.set(handle)
.unwrap_or_else(|_| panic!("Failed to set Log Layer Filter"));

let (subscriber, handle) = add_opentelemetry_layer(options.opentelemetry, subscriber).await;
OTLP_LAYER_RELOAD_HANDLE
.set(handle)
.unwrap_or_else(|_| panic!("Failed to set OTLP Layer Filter"));

#[allow(unused_mut)]
let mut io_trace_guard = None;
Expand All @@ -301,42 +323,80 @@ pub async fn default_subscriber(
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum ReloadError {
#[error("env_filter reload handle is not available")]
NoLogReloadHandle,
#[error("opentelemetry reload handle is not available")]
NoOpentelemetryReloadHandle,
#[error("could not set the new log filter")]
Reload(#[source] Error),
ReloadLogLayer(#[source] reload::Error),
#[error("could not set the new opentelemetry filter")]
ReloadOpentelemetryLayer(#[source] reload::Error),
#[error("could not create the log filter")]
Parse(#[source] BuildEnvFilterError),
#[error("env_filter reload handle is not available")]
NoReloadHandle,
}

/// Constructs an `EnvFilter` and sets it as the active filter in the default tracing subscriber.
/// Constructs new filters for the logging and opentelemetry layers.
///
/// Attempts to reload all available errors. Returns errors for each layer that failed to reload.
///
/// The newly constructed `EnvFilter` provides behavior equivalent to what can be obtained via
/// setting `RUST_LOG` environment variable and the `--verbose` command-line flag.
/// `rust_log` is equivalent to setting `RUST_LOG` environment variable.
/// `verbose` indicates whether `--verbose` command-line flag is present.
/// `verbose_module` is equivalent to the value of the `--verbose` command-line flag.
pub fn reload_log_layer(
pub fn reload(
rust_log: Option<&str>,
verbose_module: Option<&str>,
) -> Result<(), ReloadError> {
LOG_LAYER_RELOAD_HANDLE.get().map_or(Err(ReloadError::NoReloadHandle), |reload_handle| {
let mut builder = rust_log.map_or_else(
|| EnvFilterBuilder::from_env(),
|rust_log| EnvFilterBuilder::new(rust_log),
);
if let Some(module) = verbose_module {
builder = builder.verbose(Some(module));
}
let env_filter = builder.finish().map_err(ReloadError::Parse)?;
opentelemetry_level: Option<OpenTelemetryLevel>,
) -> Result<(), Vec<ReloadError>> {
let log_reload_result = LOG_LAYER_RELOAD_HANDLE.get().map_or(
Err(ReloadError::NoLogReloadHandle),
|reload_handle| {
let mut builder = rust_log.map_or_else(
|| EnvFilterBuilder::from_env(),
|rust_log| EnvFilterBuilder::new(rust_log),
);
if let Some(module) = verbose_module {
builder = builder.verbose(Some(module));
}
let env_filter = builder.finish().map_err(ReloadError::Parse)?;

reload_handle
.modify(|log_filter| {
*log_filter = env_filter;
})
.map_err(ReloadError::ReloadLogLayer)?;
Ok(())
},
);

let opentelemetry_level = opentelemetry_level
.unwrap_or(*DEFAULT_OTLP_LEVEL.get().unwrap_or(&OpenTelemetryLevel::OFF));
let opentelemetry_reload_result = OTLP_LAYER_RELOAD_HANDLE.get().map_or(
Err(ReloadError::NoOpentelemetryReloadHandle),
|reload_handle| {
reload_handle
.modify(|otlp_filter| {
*otlp_filter = get_opentelemetry_filter(opentelemetry_level);
})
.map_err(ReloadError::ReloadOpentelemetryLayer)?;
Ok(())
},
);

let mut errors: Vec<ReloadError> = vec![];
if let Err(err) = log_reload_result {
errors.push(err);
}
if let Err(err) = opentelemetry_reload_result {
errors.push(err);
}

reload_handle
.modify(|log_layer| {
*log_layer.filter_mut() = env_filter;
})
.map_err(ReloadError::Reload)?;
if errors.is_empty() {
Ok(())
})
} else {
Err(errors)
}
}

#[non_exhaustive]
Expand Down
11 changes: 7 additions & 4 deletions neard/src/log_config_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use near_o11y::{reload_log_layer, ReloadError};
use near_o11y::{reload, OpenTelemetryLevel, ReloadError};
use serde::{Deserialize, Serialize};
use std::io;
use std::io::ErrorKind;
Expand All @@ -13,6 +13,8 @@ struct LogConfig {
/// Some("") enables global debug logging.
/// Some("module") enables debug logging for "module".
pub verbose_module: Option<String>,
/// Verbosity level of collected traces.
pub opentelemetry_level: Option<OpenTelemetryLevel>,
}

pub(crate) struct LogConfigWatcher {
Expand All @@ -28,7 +30,7 @@ pub(crate) enum UpdateBehavior {
#[non_exhaustive]
enum LogConfigError {
#[error("Failed to reload the logging config")]
Reload(#[source] ReloadError),
Reload(Vec<ReloadError>),
#[error("Failed to reload the logging config")]
Parse(#[source] serde_json::Error),
#[error("Can't open or read the logging config file")]
Expand All @@ -42,17 +44,18 @@ impl LogConfigWatcher {
let log_config = serde_json::from_str::<LogConfig>(&log_config_str)
.map_err(LogConfigError::Parse)?;
info!(target: "neard", log_config=?log_config, "Changing the logging config.");
return reload_log_layer(
return reload(
log_config.rust_log.as_deref(),
log_config.verbose_module.as_deref(),
log_config.opentelemetry_level,
)
.map_err(LogConfigError::Reload);
}
Err(err) => match err.kind() {
ErrorKind::NotFound => {
if let UpdateBehavior::UpdateOrReset = update_behavior {
info!(target: "neard", logging_config_path=%self.watched_path.display(), ?err, "Reset the logging config because the logging config file doesn't exist.");
return reload_log_layer(None, None).map_err(LogConfigError::Reload);
return reload(None, None, None).map_err(LogConfigError::Reload);
}
Ok(())
}
Expand Down

0 comments on commit c6f0059

Please sign in to comment.