Skip to content

Commit

Permalink
sync: add WatchStream::from_changes (tokio-rs#5432)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Brody authored and amab8901 committed Feb 27, 2023
1 parent 04087b9 commit 552fa7e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
34 changes: 32 additions & 2 deletions tokio-stream/src/wrappers/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError;

/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
/// This stream will always start by yielding the current value when the WatchStream is polled,
/// regardless of whether it was the initial value or sent afterwards.
/// This stream will start by yielding the current value when the WatchStream is polled,
/// regardless of whether it was the initial value or sent afterwards,
/// unless you use [`WatchStream<T>::from_changes`].
///
/// # Examples
///
Expand Down Expand Up @@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError;
/// let (tx, rx) = watch::channel("hello");
/// let mut rx = WatchStream::new(rx);
///
/// // existing rx output with "hello" is ignored here
///
/// tx.send("goodbye").unwrap();
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
/// ```
///
/// Example with [`WatchStream<T>::from_changes`]:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use futures::future::FutureExt;
/// use tokio::sync::watch;
/// use tokio_stream::{StreamExt, wrappers::WatchStream};
///
/// let (tx, rx) = watch::channel("hello");
/// let mut rx = WatchStream::from_changes(rx);
///
/// // no output from rx is available at this point - let's check this:
/// assert!(rx.next().now_or_never().is_none());
///
/// tx.send("goodbye").unwrap();
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
Expand All @@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}

/// Create a new `WatchStream` that waits for the value to be changed.
pub fn from_changes(rx: Receiver<T>) -> Self {
Self {
inner: ReusableBoxFuture::new(make_future(rx)),
}
}
}

impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
Expand Down
30 changes: 29 additions & 1 deletion tokio-stream/tests/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
use tokio_test::assert_pending;
use tokio_test::task::spawn;

#[tokio::test]
async fn message_not_twice() {
async fn watch_stream_message_not_twice() {
let (tx, rx) = watch::channel("hello");

let mut counter = 0;
Expand All @@ -27,3 +29,29 @@ async fn message_not_twice() {
drop(tx);
task.await.unwrap();
}

#[tokio::test]
async fn watch_stream_from_rx() {
let (tx, rx) = watch::channel("hello");

let mut stream = WatchStream::from(rx);

assert_eq!(stream.next().await.unwrap(), "hello");

tx.send("bye").unwrap();

assert_eq!(stream.next().await.unwrap(), "bye");
}

#[tokio::test]
async fn watch_stream_from_changes() {
let (tx, rx) = watch::channel("hello");

let mut stream = WatchStream::from_changes(rx);

assert_pending!(spawn(&mut stream).poll_next());

tx.send("bye").unwrap();

assert_eq!(stream.next().await.unwrap(), "bye");
}

0 comments on commit 552fa7e

Please sign in to comment.