diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 292ab0b8f2..43bfd0912e 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -782,4 +782,134 @@ mod tests { "Metrics should be available in exporter." ); } + + async fn some_async_function() -> u64 { + // No dependency on any particular async runtime. + std::thread::sleep(std::time::Duration::from_millis(1)); + 1 + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() { + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() { + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "current_thread")] + async fn async_inside_observable_callback_from_tokio_current_thread() { + async_inside_observable_callback_helper(); + } + + #[test] + fn async_inside_observable_callback_from_regular_main() { + async_inside_observable_callback_helper(); + } + + fn async_inside_observable_callback_helper() { + let interval = std::time::Duration::from_millis(10); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let _gauge = meter + .u64_observable_gauge("my_observable_gauge") + .with_callback(|observer| { + // using futures_executor::block_on intentionally and avoiding + // any particular async runtime. + let value = futures_executor::block_on(some_async_function()); + observer.observe(value, &[]); + }) + .build(); + + meter_provider.force_flush().expect("flush should succeed"); + let exported_metrics = exporter + .get_finished_metrics() + .expect("this should not fail"); + assert!( + !exported_metrics.is_empty(), + "Metrics should be available in exporter." + ); + } + + async fn some_tokio_async_function() -> u64 { + // Tokio specific async function + tokio::time::sleep(Duration::from_millis(1)).await; + 1 + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + + async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() { + tokio_async_inside_observable_callback_helper(true); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() { + tokio_async_inside_observable_callback_helper(true); + } + + #[tokio::test(flavor = "current_thread")] + #[ignore] //TODO: Investigate if this can be fixed. + async fn tokio_async_inside_observable_callback_from_tokio_current_thread() { + tokio_async_inside_observable_callback_helper(true); + } + + #[test] + fn tokio_async_inside_observable_callback_from_regular_main() { + tokio_async_inside_observable_callback_helper(false); + } + + fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) { + let interval = std::time::Duration::from_millis(10); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + + if use_current_tokio_runtime { + let rt = tokio::runtime::Handle::current().clone(); + let _gauge = meter + .u64_observable_gauge("my_observable_gauge") + .with_callback(move |observer| { + // call tokio specific async function from here + let value = rt.block_on(some_tokio_async_function()); + observer.observe(value, &[]); + }) + .build(); + // rt here is a reference to the current tokio runtime. + // Droppng it occurs when the tokio::main itself ends. + } else { + let rt = tokio::runtime::Runtime::new().unwrap(); + let _gauge = meter + .u64_observable_gauge("my_observable_gauge") + .with_callback(move |observer| { + // call tokio specific async function from here + let value = rt.block_on(some_tokio_async_function()); + observer.observe(value, &[]); + }) + .build(); + // rt is not dropped here as it is moved to the closure, + // and is dropped only when MeterProvider itself is dropped. + // This works when called from normal main. + }; + + meter_provider.force_flush().expect("flush should succeed"); + let exported_metrics = exporter + .get_finished_metrics() + .expect("this should not fail"); + assert!( + !exported_metrics.is_empty(), + "Metrics should be available in exporter." + ); + } }