From 58e33a332448405a0d9913a6ca22de54d16b2e2b Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Thu, 26 Dec 2024 13:06:53 -0800 Subject: [PATCH] Fix PeriodicReader shutdown to invoke shutdown on exporter --- .../src/metrics/periodic_reader.rs | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index a5cd70fd99..2cee6c4d0d 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -154,11 +154,12 @@ impl PeriodicReader { { let (message_sender, message_receiver): (Sender, Receiver) = mpsc::channel(); + let exporter_arc = Arc::new(exporter); let reader = PeriodicReader { inner: Arc::new(PeriodicReaderInner { message_sender: Arc::new(message_sender), producer: Mutex::new(None), - exporter: Arc::new(exporter), + exporter: exporter_arc.clone(), }), }; let cloned_reader = reader.clone(); @@ -213,7 +214,13 @@ impl PeriodicReader { Ok(Message::Shutdown(response_sender)) => { // Perform final export and break out of loop and exit the thread otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown"); - if let Err(_e) = cloned_reader.collect_and_export(timeout) { + let export_result = cloned_reader.collect_and_export(timeout); + let shutdown_result = exporter_arc.shutdown(); + otel_debug!( + name: "PeriodReaderInvokedExporterShutdown", + shutdown_result = format!("{:?}", shutdown_result) + ); + if export_result.is_err() || shutdown_result.is_err() { response_sender.send(false).unwrap(); } else { response_sender.send(true).unwrap(); @@ -474,7 +481,7 @@ mod tests { use opentelemetry::metrics::MeterProvider; use std::{ sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, mpsc, Arc, }, time::Duration, @@ -525,6 +532,31 @@ mod tests { } } + #[derive(Debug, Clone, Default)] + struct MockMetricExporter { + is_shutdown: Arc, + } + + #[async_trait] + impl PushMetricExporter for MockMetricExporter { + async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> { + Ok(()) + } + + async fn force_flush(&self) -> MetricResult<()> { + Ok(()) + } + + fn shutdown(&self) -> MetricResult<()> { + self.is_shutdown.store(true, Ordering::Relaxed); + Ok(()) + } + + fn temporality(&self) -> Temporality { + Temporality::Cumulative + } + } + #[test] fn collection_triggered_by_interval_multiple() { // Arrange @@ -687,6 +719,24 @@ mod tests { assert!(exporter.get_count() >= 2); } + #[test] + fn shutdown_passed_to_exporter() { + // Arrange + let exporter = MockMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()).build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let counter = meter.u64_counter("sync_counter").build(); + counter.add(1, &[]); + + // shutdown the provider, which should call shutdown on periodic reader + // which in turn should call shutdown on exporter. + let result = meter_provider.shutdown(); + assert!(result.is_ok()); + assert!(exporter.is_shutdown.load(Ordering::Relaxed)); + } + #[test] fn collection() { collection_triggered_by_interval_helper();