Skip to content

Commit

Permalink
Analytics: Fix and improve async span fn support
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-legion committed Jun 1, 2022
1 parent 1d3f87f commit cb88007
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/lgn-tracing/proc-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ license = "MIT OR Apache-2.0"
proc-macro = true

[dependencies]
syn = "1.0"
quote = "1.0"
proc-macro2 = "1.0"
quote = "1.0"
syn = { version = "1.0", features = ["full", "visit-mut"] }
83 changes: 50 additions & 33 deletions crates/lgn-tracing/proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
use std::collections::HashSet;

use proc_macro2::Literal;
use quote::quote;
use quote::{quote, ToTokens};
use syn::{
parse::{Parse, ParseStream, Result},
parse_macro_input, parse_quote,
punctuated::Punctuated,
Ident, ItemFn, Token,
visit_mut::VisitMut,
ExprAwait, Ident, ItemFn, Token,
};

struct TraceArgs {
Expand All @@ -35,6 +36,29 @@ impl Parse for TraceArgs {
}
}

struct AwaitVisitor;

impl VisitMut for AwaitVisitor {
fn visit_expr_await_mut(&mut self, expr: &mut ExprAwait) {
// TODO: Use attrs
let ExprAwait { attrs: _, base, .. } = expr;

let text = base.to_token_stream().to_string();

*expr = parse_quote! {
{
lgn_tracing::span!(_AWAIT, #text);

lgn_tracing::spans::Instrumentation::new(
#base,
&_AWAIT,
&_IS_IDLE
)
}.await
};
}
}

#[proc_macro_attribute]
pub fn span_fn(
args: proc_macro::TokenStream,
Expand All @@ -57,39 +81,32 @@ pub fn span_fn(
});
}

let stmts = function.block.stmts;
AwaitVisitor.visit_block_mut(&mut function.block);

if let Some((last_stmt, stmts)) = function.block.stmts.split_last() {
function.block.stmts = vec![parse_quote! {
{
lgn_tracing::span!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name));
let __lgn_tracing_is_idle = std::sync::atomic::AtomicBool::new(false);
lgn_tracing::dispatch::on_begin_scope(&_METADATA_FUNC);

function.block.stmts = vec![
parse_quote! {
static _METADATA_FUNC: lgn_tracing::spans::SpanMetadata = lgn_tracing::spans::SpanMetadata {
name: concat!(module_path!(), "::", #function_name),
location: lgn_tracing::spans::SpanLocation {
lod: lgn_tracing::Verbosity::Max,
target: module_path!(),
module_path: module_path!(),
file: file!(),
line: line!()
}
};
},
parse_quote! {
lgn_tracing::dispatch::on_begin_scope(&_METADATA_FUNC);
},
parse_quote! {
let __lgn_tracing_future = async move {
#(#stmts)*
};
},
parse_quote! {
let __lgn_tracing_output = lgn_tracing::spans::Instrumentation::new(__lgn_tracing_future, &_METADATA_FUNC).await;
},
parse_quote! {
lgn_tracing::dispatch::on_end_scope(&_METADATA_FUNC);
},
parse_quote! {
return __lgn_tracing_output;
},
];

let __lgn_tracing_output = { #last_stmt };
lgn_tracing::dispatch::on_end_scope(&_METADATA_FUNC);
__lgn_tracing_output
}
}];
} else {
function.block.stmts = vec![parse_quote! {
{
lgn_tracing::span!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name));
let __lgn_tracing_is_idle = std::sync::atomic::AtomicBool::new(false);
lgn_tracing::dispatch::on_begin_scope(&_METADATA_FUNC);
lgn_tracing::dispatch::on_end_scope(&_METADATA_FUNC);
}
}];
}

proc_macro::TokenStream::from(quote! {
#function
Expand Down
4 changes: 4 additions & 0 deletions crates/lgn-tracing/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ pub fn flush_thread_buffer() {

#[inline(always)]
pub fn on_begin_scope(scope: &'static SpanMetadata) {
println!("begin {}", scope.name);

on_thread_event(BeginThreadSpanEvent {
time: now(),
thread_span_desc: scope,
Expand All @@ -186,6 +188,8 @@ pub fn on_begin_scope(scope: &'static SpanMetadata) {

#[inline(always)]
pub fn on_end_scope(scope: &'static SpanMetadata) {
println!("end {}", scope.name);

on_thread_event(EndThreadSpanEvent {
time: now(),
thread_span_desc: scope,
Expand Down
32 changes: 32 additions & 0 deletions crates/lgn-tracing/src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,35 @@
/// Creates a span.
///
/// # Examples
///
/// ```
/// use lgn_tracing::span;
///
/// # fn main() {
/// span!("scope", _MY_SPAN_NAME);
/// # }
/// ```
///
/// A static variable `_MY_SPAN_NAME` has been created
#[macro_export]
macro_rules! span {
($scope_name:ident, $name:expr) => {
static $scope_name: $crate::spans::SpanMetadata = $crate::spans::SpanMetadata {
name: $name,
location: $crate::spans::SpanLocation {
lod: $crate::Verbosity::Max,
target: module_path!(),
module_path: module_path!(),
file: file!(),
line: line!(),
},
};
};
($name:expr) => {
$crate::span_scope!(_METADATA_NAMED, $name);
};
}

/// Records a integer metric.
///
/// # Examples
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{
future::Future,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll},
};

Expand All @@ -19,41 +20,48 @@ pin_project! {
#[pin]
inner: T,
span: &'static SpanMetadata,
// An [`Instrumentation`] is idle when it has been polled at least once
// and that the inner Future returned `Poll::Pending`.
// As soon as the inner Future returns `Poll::Ready` this attribute is set to `false`.
is_idle: bool
is_idle: &'static AtomicBool,
}
}

impl<T> Instrumentation<T> {
pub fn new(inner: T, span: &'static SpanMetadata) -> Self {
pub fn new(inner: T, span: &'static SpanMetadata, is_idle: &'static AtomicBool) -> Self {
Instrumentation {
inner,
span,
is_idle: false,
is_idle,
}
}

pub fn begin(&self) {
on_begin_scope(self.span);
}

pub fn end(&self) {
on_end_scope(self.span);
}
}

impl<T: Future> Future for Instrumentation<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();

let output = this.inner.poll(cx);
let is_idle = this.is_idle.load(Ordering::Relaxed);

if output.is_pending() && !*this.is_idle {
if !is_idle {
on_end_scope(this.span);

*this.is_idle = true;
this.is_idle.store(true, Ordering::Relaxed);
}

if output.is_ready() {
let output = this.inner.poll(cx);

if output.is_ready() && is_idle {
on_begin_scope(this.span);

*this.is_idle = false;
this.is_idle.store(false, Ordering::Relaxed);
}

output
Expand Down
5 changes: 3 additions & 2 deletions crates/lgn-tracing/src/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ pub use block::*;

mod events;
pub use events::*;
mod instrument;
pub use instrument::*;

mod instrumentation;
pub use instrumentation::*;

// todo: implement non thread based perf spans for other systems to be used
1 change: 1 addition & 0 deletions tests/async-span-fn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repository = "/~https://github.com/legion-labs/legion"
license = "MIT OR Apache-2.0"

[dependencies]
futures = "0.3.21"
lgn-telemetry-sink = { path = "../../crates/lgn-telemetry-sink", version = "0.1.0" }
lgn-tracing = { path = "../../crates/lgn-tracing", version = "0.1.0" }
tokio = { version = "1", features = ["full"] }
Loading

0 comments on commit cb88007

Please sign in to comment.