-
Notifications
You must be signed in to change notification settings - Fork 744
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
core: fix missed
register_callsite
error (#2938)
There are 2 triggers which will cause a subscriber to receive a call to `Subscriber::register_callsite` for a specific callsite. 1. The first time the event or span at that callsite is executed. 2. When a new subscriber is added or removed (for example, calls to `set_default` or `with_default`) It is trigger (2) that will cause a new subscriber to receive `Subscriber::register_callsite` for all the callsites which had already been registered before it became active. When a callsite is registered for trigger (1), the callsite starts in state `UNREGISTERED`. The first thread to encounter the callsite will transition it to `REGISTERING` and determine the overall interest for the callsite by registering with all known dispatchers (which will call into `Subscriber::register_callsite`). Once that is complete, the callsite is added to the list of all known callsites and its state is transitioned to `REGISTERED`. is (re)built for all known dispatchers. The callsite starts in state `UNREGISTERED`. The This calls down into `Subscriber::register_callsite` for each subscriber. Once that is complete, the callsite is added to the global list of known callsites. While the callsite interest is being rebuilt, other threads that encounter the callsite will be given `Interest::sometimes()` until the registration is complete. However, if a new subscriber is added during this window, all the interest for all callsites will be rebuilt, but because the new callsite (in state `REGISTERING`) won't be included because it isn't yet in the global list of callsites. This can cause a case where that new subscriber being added won't receive `Subscriber::register_callsite` before it receives the subsequent call to `Subscriber::event` or `Subscriber::new_span`. The documentation on [Registering Callsites] is not very explicit on this point, but it does suggest that `Subscriber::register_callsite` will be called before the call to either `Subscriber::event` or `Subscriber::new_span`, and the current behavior can break this implicit contract. [Registering Callsites]: https://docs.rs/tracing-core/0.1.32/tracing_core/callsite/index.html#registering-callsites This change swaps the order of rebuilding the callsite interest and adding the callsite to the global list so that the callsite gets pushed first, avoiding this window in which a subscriber won't get a call to `register_callsite`. As such, a callsite may have its interest read before it is set. In this case, the existing implementation will return `Interest::sometimes()` for the `DefaultCallsite` implementation. Other implementations (outside of the `tracing` project) may perform this differently, but in this case, there is no documented guarantee regarding the ordering. A regression test is included which provokes the race condition 100% of the time before the changes in this fix. Fixes: #2743 Co-authored-by: David Barsky <me@davidbarsky.com>
- Loading branch information
1 parent
6f08af0
commit 8a25a16
Showing
3 changed files
with
238 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
use std::{ | ||
ptr, | ||
sync::atomic::{AtomicPtr, Ordering}, | ||
thread::{self, JoinHandle}, | ||
time::Duration, | ||
}; | ||
|
||
use tracing_core::{ | ||
callsite::{Callsite as _, DefaultCallsite}, | ||
dispatcher::set_default, | ||
field::{FieldSet, Value}, | ||
span, Dispatch, Event, Kind, Level, Metadata, Subscriber, | ||
}; | ||
|
||
struct TestSubscriber { | ||
sleep: Duration, | ||
callsite: AtomicPtr<Metadata<'static>>, | ||
} | ||
|
||
impl TestSubscriber { | ||
fn new(sleep_micros: u64) -> Self { | ||
Self { | ||
sleep: Duration::from_micros(sleep_micros), | ||
callsite: AtomicPtr::new(ptr::null_mut()), | ||
} | ||
} | ||
} | ||
|
||
impl Subscriber for TestSubscriber { | ||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { | ||
if !self.sleep.is_zero() { | ||
thread::sleep(self.sleep); | ||
} | ||
|
||
self.callsite | ||
.store(metadata as *const _ as *mut _, Ordering::SeqCst); | ||
|
||
tracing_core::Interest::always() | ||
} | ||
|
||
fn event(&self, event: &tracing_core::Event<'_>) { | ||
let stored_callsite = self.callsite.load(Ordering::SeqCst); | ||
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; | ||
|
||
// This assert is the actual test. | ||
assert_eq!( | ||
stored_callsite, event_callsite, | ||
"stored callsite: {stored_callsite:#?} does not match event \ | ||
callsite: {event_callsite:#?}. Was `event` called before \ | ||
`register_callsite`?" | ||
); | ||
} | ||
|
||
fn enabled(&self, _metadata: &Metadata<'_>) -> bool { | ||
true | ||
} | ||
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { | ||
span::Id::from_u64(0) | ||
} | ||
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} | ||
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} | ||
fn enter(&self, _span: &tracing_core::span::Id) {} | ||
fn exit(&self, _span: &tracing_core::span::Id) {} | ||
} | ||
|
||
fn subscriber_thread(idx: usize, register_sleep_micros: u64) -> JoinHandle<()> { | ||
thread::Builder::new() | ||
.name(format!("subscriber-{idx}")) | ||
.spawn(move || { | ||
// We use a sleep to ensure the starting order of the 2 threads. | ||
let subscriber = TestSubscriber::new(register_sleep_micros); | ||
let _dispatch_guard = set_default(&Dispatch::new(subscriber)); | ||
|
||
static CALLSITE: DefaultCallsite = { | ||
// The values of the metadata are unimportant | ||
static META: Metadata<'static> = Metadata::new( | ||
"event ", | ||
"module::path", | ||
Level::INFO, | ||
None, | ||
None, | ||
None, | ||
FieldSet::new(&["message"], tracing_core::callsite::Identifier(&CALLSITE)), | ||
Kind::EVENT, | ||
); | ||
DefaultCallsite::new(&META) | ||
}; | ||
let _interest = CALLSITE.interest(); | ||
|
||
let meta = CALLSITE.metadata(); | ||
let field = meta.fields().field("message").unwrap(); | ||
let message = format!("event-from-{idx}", idx = idx); | ||
let values = [(&field, Some(&message as &dyn Value))]; | ||
let value_set = CALLSITE.metadata().fields().value_set(&values); | ||
|
||
Event::dispatch(meta, &value_set); | ||
|
||
// Wait a bit for everything to end (we don't want to remove the subscriber | ||
// immediately because that will influence the test). | ||
thread::sleep(Duration::from_millis(10)); | ||
}) | ||
.expect("failed to spawn thread") | ||
} | ||
|
||
/// Regression test for missing register_callsite call (#2743) | ||
/// | ||
/// This test provokes the race condition which causes the second subscriber to not receive a | ||
/// call to `register_callsite` before it receives a call to `event`. | ||
/// | ||
/// Because the test depends on the interaction of multiple dispatchers in different threads, | ||
/// it needs to be in a test file by itself. | ||
#[test] | ||
fn event_before_register() { | ||
let subscriber_1_register_sleep_micros = 100; | ||
let subscriber_2_register_sleep_micros = 0; | ||
|
||
let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros); | ||
|
||
// This delay ensures that the event callsite has interest() called first. | ||
thread::sleep(Duration::from_micros(50)); | ||
let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros); | ||
|
||
jh1.join().expect("failed to join thread"); | ||
jh2.join().expect("failed to join thread"); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
use std::{ | ||
ptr, | ||
sync::atomic::{AtomicPtr, Ordering}, | ||
thread::{self, JoinHandle}, | ||
time::Duration, | ||
}; | ||
|
||
use tracing::Subscriber; | ||
use tracing_core::{span, Metadata}; | ||
|
||
struct TestSubscriber { | ||
creator_thread: String, | ||
sleep: Duration, | ||
callsite: AtomicPtr<Metadata<'static>>, | ||
} | ||
|
||
impl TestSubscriber { | ||
fn new(sleep_micros: u64) -> Self { | ||
let creator_thread = thread::current() | ||
.name() | ||
.unwrap_or("<unknown thread>") | ||
.to_owned(); | ||
Self { | ||
creator_thread, | ||
sleep: Duration::from_micros(sleep_micros), | ||
callsite: AtomicPtr::new(ptr::null_mut()), | ||
} | ||
} | ||
} | ||
|
||
impl Subscriber for TestSubscriber { | ||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { | ||
if !self.sleep.is_zero() { | ||
thread::sleep(self.sleep); | ||
} | ||
|
||
self.callsite | ||
.store(metadata as *const _ as *mut _, Ordering::SeqCst); | ||
println!( | ||
"{creator} from {thread:?}: register_callsite: {callsite:#?}", | ||
creator = self.creator_thread, | ||
callsite = metadata as *const _, | ||
thread = thread::current().name(), | ||
); | ||
tracing_core::Interest::always() | ||
} | ||
|
||
fn event(&self, event: &tracing_core::Event<'_>) { | ||
let stored_callsite = self.callsite.load(Ordering::SeqCst); | ||
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; | ||
|
||
println!( | ||
"{creator} from {thread:?}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})", | ||
creator = self.creator_thread, | ||
thread = thread::current().name(), | ||
); | ||
|
||
// This assert is the actual test. | ||
assert_eq!( | ||
stored_callsite, event_callsite, | ||
"stored callsite: {stored_callsite:#?} does not match event \ | ||
callsite: {event_callsite:#?}. Was `event` called before \ | ||
`register_callsite`?" | ||
); | ||
} | ||
|
||
fn enabled(&self, _metadata: &Metadata<'_>) -> bool { | ||
true | ||
} | ||
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { | ||
span::Id::from_u64(0) | ||
} | ||
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} | ||
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} | ||
fn enter(&self, _span: &tracing_core::span::Id) {} | ||
fn exit(&self, _span: &tracing_core::span::Id) {} | ||
} | ||
|
||
fn subscriber_thread(idx: usize, register_sleep_micros: u64) -> JoinHandle<()> { | ||
thread::Builder::new() | ||
.name(format!("subscriber-{idx}")) | ||
.spawn(move || { | ||
// We use a sleep to ensure the starting order of the 2 threads. | ||
let subscriber = TestSubscriber::new(register_sleep_micros); | ||
let _subscriber_guard = tracing::subscriber::set_default(subscriber); | ||
|
||
tracing::info!("event-from-{idx}", idx = idx); | ||
|
||
// Wait a bit for everything to end (we don't want to remove the subscriber | ||
// immediately because that will mix up the test). | ||
thread::sleep(Duration::from_millis(100)); | ||
}) | ||
.expect("failed to spawn thread") | ||
} | ||
|
||
#[test] | ||
fn event_before_register() { | ||
let subscriber_1_register_sleep_micros = 100; | ||
let subscriber_2_register_sleep_micros = 0; | ||
|
||
let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros); | ||
|
||
// This delay ensures that the event!() in the first thread is executed first. | ||
thread::sleep(Duration::from_micros(50)); | ||
let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros); | ||
|
||
jh1.join().expect("failed to join thread"); | ||
jh2.join().expect("failed to join thread"); | ||
} |