From 51c6986774b25462b6b5dbfe1bffbcc54c60771b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin?= <94377405+kevin-legion@users.noreply.github.com> Date: Wed, 1 Jun 2022 10:04:04 -0400 Subject: [PATCH] Analytics: Fix and improve async span fn support --- Cargo.lock | 11 ++ crates/lgn-tracing/Cargo.toml | 1 + crates/lgn-tracing/proc-macros/Cargo.toml | 4 +- crates/lgn-tracing/proc-macros/src/lib.rs | 70 +++++++++-- crates/lgn-tracing/src/macros.rs | 32 +++++ .../lgn-tracing/src/spans/instrumentation.rs | 68 +++++++++++ crates/lgn-tracing/src/spans/mod.rs | 3 + tests/async-span-fn/Cargo.toml | 13 ++ tests/async-span-fn/src/main.rs | 114 ++++++++++++++++++ 9 files changed, 306 insertions(+), 10 deletions(-) create mode 100644 crates/lgn-tracing/src/spans/instrumentation.rs create mode 100644 tests/async-span-fn/Cargo.toml create mode 100644 tests/async-span-fn/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index a48d634fb3..8096a65a26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5357,6 +5357,7 @@ dependencies = [ "lgn-tracing-transit", "log", "memoffset", + "pin-project", "raw-cpuid", "thiserror", "thread-id", @@ -8566,6 +8567,16 @@ version = "0.4.0" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "95059e91184749cb66be6dc994f67f182b6d897cb3df74a5bf66b5e709295fd8" +[[package]] +name = "test-async-span-fn" +version = "0.1.0" +dependencies = [ + "futures", + "lgn-telemetry-sink", + "lgn-tracing", + "tokio", +] + [[package]] name = "testcontainers" version = "0.12.0" diff --git a/crates/lgn-tracing/Cargo.toml b/crates/lgn-tracing/Cargo.toml index 40d071c990..d8d0b130f4 100644 --- a/crates/lgn-tracing/Cargo.toml +++ b/crates/lgn-tracing/Cargo.toml @@ -20,6 +20,7 @@ chrono = "0.4" memoffset = "0.6" thread-id = "4.0" lazy_static = "1.4" +pin-project = "1.0.10" [dev-dependencies] criterion = "0.3" diff --git a/crates/lgn-tracing/proc-macros/Cargo.toml b/crates/lgn-tracing/proc-macros/Cargo.toml index da36752916..944485b649 100644 --- a/crates/lgn-tracing/proc-macros/Cargo.toml +++ b/crates/lgn-tracing/proc-macros/Cargo.toml @@ -9,6 +9,6 @@ license = "MIT OR Apache-2.0" proc-macro = true [dependencies] -syn = "1.0" -quote = "1.0" proc-macro2 = "1.0" +quote = "1.0" +syn = { version = "1.0", features = ["full", "visit-mut"] } diff --git a/crates/lgn-tracing/proc-macros/src/lib.rs b/crates/lgn-tracing/proc-macros/src/lib.rs index 2f96f11e60..54941d4de2 100644 --- a/crates/lgn-tracing/proc-macros/src/lib.rs +++ b/crates/lgn-tracing/proc-macros/src/lib.rs @@ -9,12 +9,13 @@ use std::collections::HashSet; use proc_macro2::Literal; -use quote::quote; +use quote::{quote, ToTokens}; use syn::{ parse::{Parse, ParseStream, Result}, parse_macro_input, parse_quote, punctuated::Punctuated, - Ident, ItemFn, Token, + visit_mut::VisitMut, + ExprAwait, Ident, ItemFn, Token, }; struct TraceArgs { @@ -35,6 +36,29 @@ impl Parse for TraceArgs { } } +struct AwaitVisitor; + +impl VisitMut for AwaitVisitor { + fn visit_expr_await_mut(&mut self, expr: &mut ExprAwait) { + // TODO: Use attrs + let ExprAwait { attrs: _, base, .. } = expr; + + let text = base.to_token_stream().to_string(); + + *expr = parse_quote! { + { + lgn_tracing::span!(_AWAIT, #text); + + lgn_tracing::spans::Instrumentation::new( + #base, + &_AWAIT, + &__lgn_tracing_is_idle + ) + }.await + }; + } +} + #[proc_macro_attribute] pub fn span_fn( args: proc_macro::TokenStream, @@ -55,12 +79,42 @@ pub fn span_fn( .alternative_name .map_or(function.sig.ident.to_string(), |n| n.to_string()); - function.block.stmts.insert( - 0, - parse_quote! { - lgn_tracing::span_scope!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name)); - }, - ); + AwaitVisitor.visit_block_mut(&mut function.block); + + if let Some((last_stmt, stmts)) = function.block.stmts.split_last() { + function.block.stmts = vec![ + parse_quote! { + let __lgn_tracing_output = { + lgn_tracing::span!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name)); + let __lgn_tracing_is_idle = std::sync::atomic::AtomicBool::new(false); + lgn_tracing::dispatch::on_begin_scope(&_METADATA_FUNC); + + #(#stmts)* + + let __lgn_tracing_output = { #last_stmt }; + lgn_tracing::dispatch::on_end_scope(&_METADATA_FUNC); + __lgn_tracing_output + }; + }, + parse_quote! { + return __lgn_tracing_output; + }, + ]; + } else { + function.block.stmts = vec![ + parse_quote! { + let __lgn_tracing_output = { + lgn_tracing::span!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name)); + let __lgn_tracing_is_idle = std::sync::atomic::AtomicBool::new(false); + lgn_tracing::dispatch::on_begin_scope(&_METADATA_FUNC); + lgn_tracing::dispatch::on_end_scope(&_METADATA_FUNC); + }; + }, + parse_quote! { + return __lgn_tracing_output; + }, + ]; + } proc_macro::TokenStream::from(quote! { #function diff --git a/crates/lgn-tracing/src/macros.rs b/crates/lgn-tracing/src/macros.rs index 8e77725b83..bc0e46b611 100644 --- a/crates/lgn-tracing/src/macros.rs +++ b/crates/lgn-tracing/src/macros.rs @@ -1,3 +1,35 @@ +/// Creates a span. +/// +/// # Examples +/// +/// ``` +/// use lgn_tracing::span; +/// +/// # fn main() { +/// span!("scope", _MY_SPAN_NAME); +/// # } +/// ``` +/// +/// A static variable `_MY_SPAN_NAME` has been created +#[macro_export] +macro_rules! span { + ($scope_name:ident, $name:expr) => { + static $scope_name: $crate::spans::SpanMetadata = $crate::spans::SpanMetadata { + name: $name, + location: $crate::spans::SpanLocation { + lod: $crate::Verbosity::Max, + target: module_path!(), + module_path: module_path!(), + file: file!(), + line: line!(), + }, + }; + }; + ($name:expr) => { + $crate::span_scope!(_METADATA_NAMED, $name); + }; +} + /// Records a integer metric. /// /// # Examples diff --git a/crates/lgn-tracing/src/spans/instrumentation.rs b/crates/lgn-tracing/src/spans/instrumentation.rs new file mode 100644 index 0000000000..d0b636f00e --- /dev/null +++ b/crates/lgn-tracing/src/spans/instrumentation.rs @@ -0,0 +1,68 @@ +//! Async instrumentation + +#![allow(clippy::use_self)] + +use std::{ + future::Future, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + task::{Context, Poll}, +}; + +use pin_project::pin_project; + +use crate::dispatch::{on_begin_scope, on_end_scope}; + +use super::SpanMetadata; + +#[pin_project] +pub struct Instrumentation<'a, T> { + #[pin] + inner: T, + span: &'static SpanMetadata, + is_idle: &'a AtomicBool, +} + +impl<'a, T> Instrumentation<'a, T> { + pub fn new(inner: T, span: &'static SpanMetadata, is_idle: &'a AtomicBool) -> Self { + Instrumentation { + inner, + span, + is_idle, + } + } + + pub fn begin(&self) { + on_begin_scope(self.span); + } + + pub fn end(&self) { + on_end_scope(self.span); + } +} + +impl<'a, T: Future> Future for Instrumentation<'a, T> { + type Output = T::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().project(); + + let is_idle = this.is_idle.load(Ordering::Relaxed); + + if !is_idle { + on_end_scope(this.span); + + this.is_idle.store(true, Ordering::Relaxed); + } + + let output = this.inner.poll(cx); + + if output.is_ready() && is_idle { + on_begin_scope(this.span); + + this.is_idle.store(false, Ordering::Relaxed); + } + + output + } +} diff --git a/crates/lgn-tracing/src/spans/mod.rs b/crates/lgn-tracing/src/spans/mod.rs index 8a40a1e748..2b2dd15166 100644 --- a/crates/lgn-tracing/src/spans/mod.rs +++ b/crates/lgn-tracing/src/spans/mod.rs @@ -4,4 +4,7 @@ pub use block::*; mod events; pub use events::*; +mod instrumentation; +pub use instrumentation::*; + // todo: implement non thread based perf spans for other systems to be used diff --git a/tests/async-span-fn/Cargo.toml b/tests/async-span-fn/Cargo.toml new file mode 100644 index 0000000000..21c6943e4b --- /dev/null +++ b/tests/async-span-fn/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "test-async-span-fn" +version = "0.1.0" +edition = "2021" +homepage = "https://legionengine.com" +repository = "/~https://github.com/legion-labs/legion" +license = "MIT OR Apache-2.0" + +[dependencies] +futures = "0.3.21" +lgn-telemetry-sink = { path = "../../crates/lgn-telemetry-sink", version = "0.1.0" } +lgn-tracing = { path = "../../crates/lgn-tracing", version = "0.1.0" } +tokio = { version = "1", features = ["full"] } diff --git a/tests/async-span-fn/src/main.rs b/tests/async-span-fn/src/main.rs new file mode 100644 index 0000000000..f26b332350 --- /dev/null +++ b/tests/async-span-fn/src/main.rs @@ -0,0 +1,114 @@ +//! Dumb binary to test async span fn + +#![allow(clippy::never_loop)] + +use std::time::Duration; + +use futures::future::join_all; +use lgn_telemetry_sink::TelemetryGuard; +use lgn_tracing::{info, span_fn}; +use tokio::{fs::File, io::AsyncReadExt, time::sleep}; + +#[span_fn] +async fn empty_return() { + sleep(Duration::from_millis(1)).await; +} + +#[span_fn] +async fn iteration_with_cond() { + let a = 3; + + loop { + if a == 3 { + println!("a was 3"); + sleep(Duration::from_millis(1)).await; + } + + break; + } + + info!("inside my_function!"); +} + +#[span_fn] +async fn delayed_value() -> String { + sleep(Duration::from_millis(1)).await; + + let msg = "After".into(); + + sleep(Duration::from_millis(1)).await; + + msg +} + +#[span_fn] +fn consume_delayed_value(_: String) { + println!("Consumed a delayed value"); +} + +#[span_fn] +async fn delayed() { + println!("First"); + + sleep(Duration::from_millis(1)).await; + + println!("Second"); + + let msg = delayed_value().await; + + println!("{}", msg); + + consume_delayed_value(delayed_value().await); +} + +#[span_fn] +async fn read_txt() { + delayed().await; + + let mut file = File::open("./test.txt").await.unwrap(); + + let mut buffer = Vec::new(); + + file.read_to_end(&mut buffer).await.unwrap(); + + sleep(Duration::from_millis(100)).await; + + let _len = file.metadata().await.unwrap().len(); +} + +#[span_fn] +async fn read_all_txt() { + let mut counter = 0; + + let mut futures = Vec::new(); + + while counter < 3 { + let handle = async move { + read_txt().await; + }; + + futures.push(handle); + + counter += 1; + } + + join_all(futures).await; +} + +#[tokio::main] +async fn main() { + let _telemetry_guard = TelemetryGuard::default().unwrap(); + + delayed_value().await; + delayed_value().await; + + read_txt().await; + + delayed().await; + + iteration_with_cond().await; + + read_all_txt().await; + + empty_return().await; +}