Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analytics: Fix and improve async span fn support #1946

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/lgn-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ chrono = "0.4"
memoffset = "0.6"
thread-id = "4.0"
lazy_static = "1.4"
pin-project = "1.0.10"

[dev-dependencies]
criterion = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions crates/lgn-tracing/proc-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ license = "MIT OR Apache-2.0"
proc-macro = true

[dependencies]
syn = "1.0"
quote = "1.0"
proc-macro2 = "1.0"
quote = "1.0"
syn = { version = "1.0", features = ["full", "visit-mut"] }
70 changes: 62 additions & 8 deletions crates/lgn-tracing/proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
use std::collections::HashSet;

use proc_macro2::Literal;
use quote::quote;
use quote::{quote, ToTokens};
use syn::{
parse::{Parse, ParseStream, Result},
parse_macro_input, parse_quote,
punctuated::Punctuated,
Ident, ItemFn, Token,
visit_mut::VisitMut,
ExprAwait, Ident, ItemFn, Token,
};

struct TraceArgs {
Expand All @@ -35,6 +36,29 @@ impl Parse for TraceArgs {
}
}

struct AwaitVisitor;

impl VisitMut for AwaitVisitor {
fn visit_expr_await_mut(&mut self, expr: &mut ExprAwait) {
// TODO: Use attrs
let ExprAwait { attrs: _, base, .. } = expr;

let text = base.to_token_stream().to_string();

*expr = parse_quote! {
{
lgn_tracing::span!(_AWAIT, #text);

lgn_tracing::spans::Instrumentation::new(
#base,
&_AWAIT,
&__lgn_tracing_is_idle
)
}.await
};
}
}

#[proc_macro_attribute]
pub fn span_fn(
args: proc_macro::TokenStream,
Expand All @@ -55,12 +79,42 @@ pub fn span_fn(
.alternative_name
.map_or(function.sig.ident.to_string(), |n| n.to_string());

function.block.stmts.insert(
0,
parse_quote! {
lgn_tracing::span_scope!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name));
},
);
AwaitVisitor.visit_block_mut(&mut function.block);

if let Some((last_stmt, stmts)) = function.block.stmts.split_last() {
function.block.stmts = vec![
parse_quote! {
let __lgn_tracing_output = {
lgn_tracing::span!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name));
let __lgn_tracing_is_idle = std::sync::atomic::AtomicBool::new(false);
lgn_tracing::dispatch::on_begin_scope(&_METADATA_FUNC);

#(#stmts)*

let __lgn_tracing_output = { #last_stmt };
lgn_tracing::dispatch::on_end_scope(&_METADATA_FUNC);
__lgn_tracing_output
};
},
parse_quote! {
return __lgn_tracing_output;
},
];
} else {
function.block.stmts = vec![
parse_quote! {
let __lgn_tracing_output = {
lgn_tracing::span!(_METADATA_FUNC, concat!(module_path!(), "::", #function_name));
let __lgn_tracing_is_idle = std::sync::atomic::AtomicBool::new(false);
lgn_tracing::dispatch::on_begin_scope(&_METADATA_FUNC);
lgn_tracing::dispatch::on_end_scope(&_METADATA_FUNC);
};
},
parse_quote! {
return __lgn_tracing_output;
},
];
}

proc_macro::TokenStream::from(quote! {
#function
Expand Down
32 changes: 32 additions & 0 deletions crates/lgn-tracing/src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,35 @@
/// Creates a span.
///
/// # Examples
///
/// ```
/// use lgn_tracing::span;
///
/// # fn main() {
/// span!("scope", _MY_SPAN_NAME);
/// # }
/// ```
///
/// A static variable `_MY_SPAN_NAME` has been created
#[macro_export]
macro_rules! span {
($scope_name:ident, $name:expr) => {
static $scope_name: $crate::spans::SpanMetadata = $crate::spans::SpanMetadata {
name: $name,
location: $crate::spans::SpanLocation {
lod: $crate::Verbosity::Max,
target: module_path!(),
module_path: module_path!(),
file: file!(),
line: line!(),
},
};
};
($name:expr) => {
$crate::span_scope!(_METADATA_NAMED, $name);
};
}

/// Records a integer metric.
///
/// # Examples
Expand Down
68 changes: 68 additions & 0 deletions crates/lgn-tracing/src/spans/instrumentation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! Async instrumentation

#![allow(clippy::use_self)]

use std::{
future::Future,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll},
};

use pin_project::pin_project;

use crate::dispatch::{on_begin_scope, on_end_scope};

use super::SpanMetadata;

#[pin_project]
pub struct Instrumentation<'a, T> {
#[pin]
inner: T,
span: &'static SpanMetadata,
is_idle: &'a AtomicBool,
}

impl<'a, T> Instrumentation<'a, T> {
pub fn new(inner: T, span: &'static SpanMetadata, is_idle: &'a AtomicBool) -> Self {
Instrumentation {
inner,
span,
is_idle,
}
}

pub fn begin(&self) {
on_begin_scope(self.span);
}

pub fn end(&self) {
on_end_scope(self.span);
}
}

impl<'a, T: Future> Future for Instrumentation<'a, T> {
type Output = T::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();

let is_idle = this.is_idle.load(Ordering::Relaxed);

if !is_idle {
on_end_scope(this.span);

this.is_idle.store(true, Ordering::Relaxed);
}

let output = this.inner.poll(cx);

if output.is_ready() && is_idle {
on_begin_scope(this.span);

this.is_idle.store(false, Ordering::Relaxed);
}

output
}
}
3 changes: 3 additions & 0 deletions crates/lgn-tracing/src/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ pub use block::*;
mod events;
pub use events::*;

mod instrumentation;
pub use instrumentation::*;

// todo: implement non thread based perf spans for other systems to be used
13 changes: 13 additions & 0 deletions tests/async-span-fn/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "test-async-span-fn"
version = "0.1.0"
edition = "2021"
homepage = "https://legionengine.com"
repository = "/~https://github.com/legion-labs/legion"
license = "MIT OR Apache-2.0"

[dependencies]
futures = "0.3.21"
lgn-telemetry-sink = { path = "../../crates/lgn-telemetry-sink", version = "0.1.0" }
lgn-tracing = { path = "../../crates/lgn-tracing", version = "0.1.0" }
tokio = { version = "1", features = ["full"] }
114 changes: 114 additions & 0 deletions tests/async-span-fn/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Dumb binary to test async span fn

#![allow(clippy::never_loop)]

use std::time::Duration;

use futures::future::join_all;
use lgn_telemetry_sink::TelemetryGuard;
use lgn_tracing::{info, span_fn};
use tokio::{fs::File, io::AsyncReadExt, time::sleep};

#[span_fn]
async fn empty_return() {
sleep(Duration::from_millis(1)).await;
}

#[span_fn]
async fn iteration_with_cond() {
let a = 3;

loop {
if a == 3 {
println!("a was 3");
sleep(Duration::from_millis(1)).await;
}

break;
}

info!("inside my_function!");
}

#[span_fn]
async fn delayed_value() -> String {
sleep(Duration::from_millis(1)).await;

let msg = "After".into();

sleep(Duration::from_millis(1)).await;

msg
}

#[span_fn]
fn consume_delayed_value(_: String) {
println!("Consumed a delayed value");
}

#[span_fn]
async fn delayed() {
println!("First");

sleep(Duration::from_millis(1)).await;

println!("Second");

let msg = delayed_value().await;

println!("{}", msg);

consume_delayed_value(delayed_value().await);
}

#[span_fn]
async fn read_txt() {
delayed().await;

let mut file = File::open("./test.txt").await.unwrap();

let mut buffer = Vec::new();

file.read_to_end(&mut buffer).await.unwrap();

sleep(Duration::from_millis(100)).await;

let _len = file.metadata().await.unwrap().len();
}

#[span_fn]
async fn read_all_txt() {
let mut counter = 0;

let mut futures = Vec::new();

while counter < 3 {
let handle = async move {
read_txt().await;
};

futures.push(handle);

counter += 1;
}

join_all(futures).await;
}

#[tokio::main]
async fn main() {
let _telemetry_guard = TelemetryGuard::default().unwrap();

delayed_value().await;
delayed_value().await;

read_txt().await;

delayed().await;

iteration_with_cond().await;

read_all_txt().await;

empty_return().await;
}