Skip to content

Commit

Permalink
Fix PeriodicReader shutdown to invoke shutdown on exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas committed Dec 26, 2024
1 parent 36f9caf commit 58e33a3
Showing 1 changed file with 53 additions and 3 deletions.
56 changes: 53 additions & 3 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,12 @@ impl PeriodicReader {
{
let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
mpsc::channel();
let exporter_arc = Arc::new(exporter);
let reader = PeriodicReader {
inner: Arc::new(PeriodicReaderInner {
message_sender: Arc::new(message_sender),
producer: Mutex::new(None),
exporter: Arc::new(exporter),
exporter: exporter_arc.clone(),
}),
};
let cloned_reader = reader.clone();
Expand Down Expand Up @@ -213,7 +214,13 @@ impl PeriodicReader {
Ok(Message::Shutdown(response_sender)) => {
// Perform final export and break out of loop and exit the thread
otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
if let Err(_e) = cloned_reader.collect_and_export(timeout) {
let export_result = cloned_reader.collect_and_export(timeout);
let shutdown_result = exporter_arc.shutdown();
otel_debug!(
name: "PeriodReaderInvokedExporterShutdown",
shutdown_result = format!("{:?}", shutdown_result)
);
if export_result.is_err() || shutdown_result.is_err() {
response_sender.send(false).unwrap();
} else {
response_sender.send(true).unwrap();
Expand Down Expand Up @@ -474,7 +481,7 @@ mod tests {
use opentelemetry::metrics::MeterProvider;
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc, Arc,
},
time::Duration,
Expand Down Expand Up @@ -525,6 +532,31 @@ mod tests {
}
}

#[derive(Debug, Clone, Default)]
struct MockMetricExporter {
is_shutdown: Arc<AtomicBool>,
}

#[async_trait]
impl PushMetricExporter for MockMetricExporter {
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
Ok(())
}

async fn force_flush(&self) -> MetricResult<()> {
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
self.is_shutdown.store(true, Ordering::Relaxed);
Ok(())
}

fn temporality(&self) -> Temporality {
Temporality::Cumulative
}
}

#[test]
fn collection_triggered_by_interval_multiple() {
// Arrange
Expand Down Expand Up @@ -687,6 +719,24 @@ mod tests {
assert!(exporter.get_count() >= 2);
}

#[test]
fn shutdown_passed_to_exporter() {
// Arrange
let exporter = MockMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter.u64_counter("sync_counter").build();
counter.add(1, &[]);

// shutdown the provider, which should call shutdown on periodic reader
// which in turn should call shutdown on exporter.
let result = meter_provider.shutdown();
assert!(result.is_ok());
assert!(exporter.is_shutdown.load(Ordering::Relaxed));
}

#[test]
fn collection() {
collection_triggered_by_interval_helper();
Expand Down

0 comments on commit 58e33a3

Please sign in to comment.