Skip to content

Commit

Permalink
Add Condvar::wait_while convenience methods
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanhitc committed May 29, 2022
1 parent 6f6e021 commit 686db47
Showing 1 changed file with 249 additions and 0 deletions.
249 changes: 249 additions & 0 deletions src/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use core::{
};
use lock_api::RawMutex as RawMutex_;
use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
use std::ops::DerefMut;
use std::time::{Duration, Instant};

/// A type indicating whether a timed wait on a condition variable returned
Expand All @@ -29,6 +30,17 @@ impl WaitTimeoutResult {
}
}

/// A type indicating how many times a thread was blocked during wait_while.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct WaitWhileResult(u32);

impl WaitWhileResult {
#[inline]
pub fn num_iters(self) -> u32 {
self.0
}
}

/// A Condition Variable
///
/// Condition variables represent the ability to block a thread such that it
Expand Down Expand Up @@ -383,6 +395,134 @@ impl Condvar {
let deadline = util::to_deadline(timeout);
self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
}

#[inline]
fn wait_while_until_internal<T, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
mut condition: F,
timeout: Option<Instant>,
) -> (WaitWhileResult, WaitTimeoutResult)
where
T: ?Sized,
F: FnMut(&mut T) -> bool,
{
let mut result = WaitWhileResult(0);
let mut timeout_result = WaitTimeoutResult(false);

while !timeout_result.timed_out() && condition(mutex_guard.deref_mut()) {
result.0 += 1;
timeout_result =
self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
}

(result, timeout_result)
}
/// Blocks the current thread until this condition variable receives a
/// notification. If the provided condition evaluates to `false`, then the
/// thread is no longer blocked and the operation is completed. If the
/// condition evaluates to `true`, then the thread is blocked again and
/// waits for another notification before repeating this process.
///
/// This function will atomically unlock the mutex specified (represented by
/// `mutex_guard`) and block the current thread. This means that any calls
/// to `notify_*()` which happen logically after the mutex is unlocked are
/// candidates to wake this thread up. When this function call returns, the
/// lock specified will have been re-acquired.
///
/// # Panics
///
/// This function will panic if another thread is waiting on the `Condvar`
/// with a different `Mutex` object.
#[inline]
pub fn wait_while<T, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
condition: F,
) -> WaitWhileResult
where
T: ?Sized,
F: FnMut(&mut T) -> bool,
{
self.wait_while_until_internal(mutex_guard, condition, None)
.0
}

/// Waits on this condition variable for a notification, timing out after
/// the specified time instant. If the provided condition evaluates to
/// `false`, then the thread is no longer blocked and the operation is
/// completed. If the condition evaluates to `true`, then the thread is
/// blocked again and waits for another notification before repeating
/// this process.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked roughly until `timeout` is reached. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `timeout`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned `WaitTimeoutResult` value indicates if the timeout is
/// known to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
///
/// # Panics
///
/// This function will panic if another thread is waiting on the `Condvar`
/// with a different `Mutex` object.
#[inline]
pub fn wait_while_until<T, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
condition: F,
timeout: Instant,
) -> (WaitWhileResult, WaitTimeoutResult)
where
T: ?Sized,
F: FnMut(&mut T) -> bool,
{
self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
}

/// Waits on this condition variable for a notification, timing out after a
/// specified duration. If the provided condition evaluates to `false`,
/// then the thread is no longer blocked and the operation is completed.
/// If the condition evaluates to `true`, then the thread is blocked again
/// and waits for another notification before repeating this process.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked for roughly no longer than `timeout`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `timeout`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned `WaitTimeoutResult` value indicates if the timeout is
/// known to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
#[inline]
pub fn wait_while_for<T: ?Sized, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
condition: F,
timeout: Duration,
) -> (WaitWhileResult, WaitTimeoutResult)
where
F: FnMut(&mut T) -> bool,
{
let deadline = util::to_deadline(timeout);
self.wait_while_until_internal(mutex_guard, condition, deadline)
}
}

impl Default for Condvar {
Expand All @@ -404,6 +544,8 @@ mod tests {
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::Instant;

Expand Down Expand Up @@ -572,6 +714,113 @@ mod tests {
drop(g);
}

fn spawn_wait_while_notifier(
mutex: Arc<Mutex<u32>>,
cv: Arc<Condvar>,
num_iters: u32,
timeout: Option<Instant>,
) -> JoinHandle<()> {
thread::spawn(move || {
for epoch in 1..=num_iters {
// spin to wait for main test thread to block
// before notifying it to wake back up and check
// its condition.
let mut sleep_backoff = Duration::from_millis(1);
let _mutex_guard = loop {
let mutex_guard = mutex.lock();

if let Some(timeout) = timeout {
if Instant::now() >= timeout {
return;
}
}

if *mutex_guard == epoch {
break mutex_guard;
}

drop(mutex_guard);

// give main test thread a good chance to
// acquire the lock before this thread does.
sleep(sleep_backoff);
sleep_backoff *= 2;
};

cv.notify_one();
}
})
}

#[test]
fn wait_while_until_internal_does_not_wait_if_initially_false() {
let mutex = Arc::new(Mutex::new(()));
let cv = Arc::new(Condvar::new());

let mut mutex_guard = mutex.lock();

let result = cv
.wait_while_until_internal(&mut mutex_guard, |_| false, None)
.0;

assert!(result.num_iters() == 0);
}

#[test]
fn wait_while_until_internal_times_out_before_false() {
let mutex = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());

let condition = |counter: &mut u32| {
*counter += 1;
true
};

let mut mutex_guard = mutex.lock();
let timeout = Some(Instant::now() + Duration::from_millis(50));
let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), u32::MAX, timeout);

let (result, timeout_result) =
cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);

assert!(timeout_result.timed_out());
assert!(result.num_iters() > 0);
assert!(result.num_iters() < u32::MAX);

// prevent deadlock with notifier
drop(mutex_guard);
handle.join().unwrap();
}

#[test]
fn wait_while_until_internal() {
let mutex = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());

let num_iters = 4;

let condition = |counter: &mut u32| {
*counter += 1;
*counter <= num_iters
};

let mut mutex_guard = mutex.lock();
let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);

let (result, timeout_result) =
cv.wait_while_until_internal(&mut mutex_guard, condition, None);

assert!(!timeout_result.timed_out());
assert!(result.num_iters() == num_iters);
assert!(*mutex_guard == num_iters + 1);

let result = cv.wait_while(&mut mutex_guard, condition);
handle.join().unwrap();

assert!(result.num_iters() == 0);
assert!(*mutex_guard == num_iters + 2);
}

#[test]
#[should_panic]
fn two_mutexes() {
Expand Down

0 comments on commit 686db47

Please sign in to comment.