From afc73fb3dea0f1f4982319eb7fbe46ae7c999614 Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Mon, 6 Feb 2023 13:50:11 -0500 Subject: [PATCH 01/15] stream: add `WatchStream::new_on_changed`, add testing, etc. add a test case for existing `WatchStream::from` update existing WatchStream test function name apply minor documentation updates --- tokio-stream/src/wrappers/watch.rs | 12 +++++++++-- tokio-stream/tests/watch.rs | 33 +++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index c682c9c271d..d615036debf 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardless of whether it was the initial value or sent afterwards. +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream::new_on_changed`]. /// /// # Examples /// @@ -66,6 +67,13 @@ impl WatchStream { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn new_on_changed(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } } impl Stream for WatchStream { diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index a56254edefd..0cd6bc5a94e 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -5,7 +5,7 @@ use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; #[tokio::test] -async fn message_not_twice() { +async fn watch_stream_message_not_twice() { let (tx, rx) = watch::channel("hello"); let mut counter = 0; @@ -27,3 +27,34 @@ async fn message_not_twice() { drop(tx); task.await.unwrap(); } + +#[tokio::test] +async fn watch_stream_from_rx() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from(rx); + + assert_eq!(stream.next().await.unwrap(), "hello"); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} + +#[tokio::test] +async fn watch_stream_new_on_change() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::new_on_changed(rx); + + let task = tokio::spawn(async move { + let value = stream.next().await.unwrap(); + if (value != "bye") { + panic!("received unexpected value: {:?}", value); + } + }); + + tx.send("bye").unwrap(); + + task.await.unwrap(); +} From 83888af690fe80bded5619eb757683fdfc63298b Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Mon, 6 Feb 2023 14:10:25 -0500 Subject: [PATCH 02/15] FIXUP: remove extra parens in test case --- tokio-stream/tests/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index 0cd6bc5a94e..09750693c24 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -49,7 +49,7 @@ async fn watch_stream_new_on_change() { let task = tokio::spawn(async move { let value = stream.next().await.unwrap(); - if (value != "bye") { + if value != "bye" { panic!("received unexpected value: {:?}", value); } }); From bdc09c08a63589dfdd472a92ff37cd9fd71690cf Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Fri, 10 Feb 2023 11:22:21 -0500 Subject: [PATCH 03/15] rework test --- tokio-stream/tests/watch.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index 09750693c24..c76c4153e03 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -1,8 +1,13 @@ #![cfg(feature = "sync")] +use std::pin::Pin; +use std::task::Context; +use futures::task::noop_waker; + use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -use tokio_stream::StreamExt; +use tokio_stream::{Stream, StreamExt}; +use tokio_test::{assert_pending, task}; #[tokio::test] async fn watch_stream_message_not_twice() { @@ -47,14 +52,11 @@ async fn watch_stream_new_on_change() { let mut stream = WatchStream::new_on_changed(rx); - let task = tokio::spawn(async move { - let value = stream.next().await.unwrap(); - if value != "bye" { - panic!("received unexpected value: {:?}", value); - } - }); + assert_pending!( + Pin::new(&mut stream).poll_next(&mut Context::from_waker(&noop_waker())) + ); tx.send("bye").unwrap(); - task.await.unwrap(); + assert_eq!(stream.next().await.unwrap(), "bye"); } From 88d2feb03d598750eaf81f7af2e7a64be9a72df7 Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Fri, 10 Feb 2023 11:33:15 -0500 Subject: [PATCH 04/15] fix imports & fmt --- tokio-stream/tests/watch.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index c76c4153e03..4f82aea704d 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -1,13 +1,13 @@ #![cfg(feature = "sync")] +use futures::task::noop_waker; use std::pin::Pin; use std::task::Context; -use futures::task::noop_waker; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tokio_stream::{Stream, StreamExt}; -use tokio_test::{assert_pending, task}; +use tokio_test::{assert_pending}; #[tokio::test] async fn watch_stream_message_not_twice() { @@ -52,9 +52,7 @@ async fn watch_stream_new_on_change() { let mut stream = WatchStream::new_on_changed(rx); - assert_pending!( - Pin::new(&mut stream).poll_next(&mut Context::from_waker(&noop_waker())) - ); + assert_pending!(Pin::new(&mut stream).poll_next(&mut Context::from_waker(&noop_waker()))); tx.send("bye").unwrap(); From 48bbfd402e59ab3356d0b5cce3e2bfeeec6e621e Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Fri, 10 Feb 2023 11:47:42 -0500 Subject: [PATCH 05/15] fix an import again --- tokio-stream/tests/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index 4f82aea704d..46d9c6d23c0 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -7,7 +7,7 @@ use std::task::Context; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tokio_stream::{Stream, StreamExt}; -use tokio_test::{assert_pending}; +use tokio_test::assert_pending; #[tokio::test] async fn watch_stream_message_not_twice() { From b8c31ea10afde2d955d7bde23b6cead9a43f34eb Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Sun, 12 Feb 2023 16:50:15 -0500 Subject: [PATCH 06/15] simplify test with utility from tokio-test --- tokio-stream/tests/watch.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index 46d9c6d23c0..39242553d5c 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -1,13 +1,10 @@ #![cfg(feature = "sync")] -use futures::task::noop_waker; -use std::pin::Pin; -use std::task::Context; - use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -use tokio_stream::{Stream, StreamExt}; +use tokio_stream::StreamExt; use tokio_test::assert_pending; +use tokio_test::task::spawn; #[tokio::test] async fn watch_stream_message_not_twice() { @@ -52,7 +49,7 @@ async fn watch_stream_new_on_change() { let mut stream = WatchStream::new_on_changed(rx); - assert_pending!(Pin::new(&mut stream).poll_next(&mut Context::from_waker(&noop_waker()))); + assert_pending!(spawn(&mut stream).poll_next()); tx.send("bye").unwrap(); From 6ca8990053f8785925a3a06829be515d7060556a Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Sun, 12 Feb 2023 22:54:09 -0500 Subject: [PATCH 07/15] add example with new_on_changed & add comment to another example --- tokio-stream/src/wrappers/watch.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index d615036debf..6b6b146e4a7 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -41,6 +41,26 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new(rx); /// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream::new_on_changed`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new_on_changed(rx); +/// +/// // no output from rx is available at this point +/// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } From 2db6ccbd89554b77295263e33ba7ff1dcd5d0c53 Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Sun, 12 Feb 2023 23:34:06 -0500 Subject: [PATCH 08/15] add another test to the example --- tokio-stream/Cargo.toml | 1 + tokio-stream/src/wrappers/watch.rs | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 01acec3cd73..2251ea65bc6 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -37,6 +37,7 @@ async-stream = "0.3" parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } +futures-test = "0.3.26" [package.metadata.docs.rs] all-features = true diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 6b6b146e4a7..7cc66e0b96f 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -53,13 +53,18 @@ use tokio::sync::watch::error::RecvError; /// ``` /// # #[tokio::main] /// # async fn main() { -/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use futures::task::Poll; +/// use futures_test::task::noop_context; +/// use std::pin::Pin; /// use tokio::sync::watch; +/// use tokio_stream::{Stream, StreamExt, wrappers::WatchStream}; /// /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new_on_changed(rx); /// /// // no output from rx is available at this point +/// let from_poll = Pin::new(&mut rx).poll_next(&mut noop_context()); +/// assert_eq!(from_poll, Poll::Pending); /// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); From 1ce7bbb30456a7b439d8dacca84c6b7946b222ae Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Tue, 14 Feb 2023 15:44:16 -0500 Subject: [PATCH 09/15] update sample comment & first_poll variable name --- tokio-stream/src/wrappers/watch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 7cc66e0b96f..c0ea52c878b 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -62,9 +62,9 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new_on_changed(rx); /// -/// // no output from rx is available at this point -/// let from_poll = Pin::new(&mut rx).poll_next(&mut noop_context()); -/// assert_eq!(from_poll, Poll::Pending); +/// // no output from rx is available at this point - let's check: +/// let first_poll = Pin::new(&mut rx).poll_next(&mut noop_context()); +/// assert_eq!(first_poll, Poll::Pending); /// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); From 95870cd27489e0580fe26645a392ee1212c49c33 Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Wed, 15 Feb 2023 18:57:02 -0500 Subject: [PATCH 10/15] sample poll test down to 1 line & update comment --- tokio-stream/src/wrappers/watch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index c0ea52c878b..edbe5b01926 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -53,7 +53,6 @@ use tokio::sync::watch::error::RecvError; /// ``` /// # #[tokio::main] /// # async fn main() { -/// use futures::task::Poll; /// use futures_test::task::noop_context; /// use std::pin::Pin; /// use tokio::sync::watch; @@ -62,9 +61,8 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new_on_changed(rx); /// -/// // no output from rx is available at this point - let's check: -/// let first_poll = Pin::new(&mut rx).poll_next(&mut noop_context()); -/// assert_eq!(first_poll, Poll::Pending); +/// // no output from rx is available at this point - let's check this: +/// assert!(Pin::new(&mut rx).poll_next(&mut noop_context()).is_pending()); /// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); From 63e276a771787eaa797b64d72689d3f2d1845d4e Mon Sep 17 00:00:00 2001 From: Chris Brody Date: Fri, 17 Feb 2023 11:45:06 -0500 Subject: [PATCH 11/15] use `now_or_never` (needs `futures_util::future::FutureExt`) Co-authored-by: Alice Ryhl --- tokio-stream/src/wrappers/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index edbe5b01926..5289a65624a 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -62,7 +62,7 @@ use tokio::sync::watch::error::RecvError; /// let mut rx = WatchStream::new_on_changed(rx); /// /// // no output from rx is available at this point - let's check this: -/// assert!(Pin::new(&mut rx).poll_next(&mut noop_context()).is_pending()); +/// assert!(rx.next().now_or_never().is_none()); /// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); From f413e2add922d4e825e2982892054b4cbd09309e Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Fri, 17 Feb 2023 11:49:13 -0500 Subject: [PATCH 12/15] update imports --- tokio-stream/Cargo.toml | 1 + tokio-stream/src/wrappers/watch.rs | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 2251ea65bc6..d71825309f1 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -38,6 +38,7 @@ parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } futures-test = "0.3.26" +futures-util = "0.3.26" [package.metadata.docs.rs] all-features = true diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 5289a65624a..eb7bcd6d4e0 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -53,10 +53,9 @@ use tokio::sync::watch::error::RecvError; /// ``` /// # #[tokio::main] /// # async fn main() { -/// use futures_test::task::noop_context; -/// use std::pin::Pin; +/// use futures_util::future::FutureExt; /// use tokio::sync::watch; -/// use tokio_stream::{Stream, StreamExt, wrappers::WatchStream}; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; /// /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new_on_changed(rx); From 67d4c872bbd0cd8211fc00fc38fb4e323a27c55a Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Fri, 17 Feb 2023 11:57:20 -0500 Subject: [PATCH 13/15] rename function to `from_changes` --- tokio-stream/src/wrappers/watch.rs | 8 ++++---- tokio-stream/tests/watch.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index eb7bcd6d4e0..d91509ba642 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -12,7 +12,7 @@ use tokio::sync::watch::error::RecvError; /// /// This stream will start by yielding the current value when the WatchStream is polled, /// regardless of whether it was the initial value or sent afterwards, -/// unless you use [`WatchStream::new_on_changed`]. +/// unless you use [`WatchStream::from_changes`]. /// /// # Examples /// @@ -48,7 +48,7 @@ use tokio::sync::watch::error::RecvError; /// # } /// ``` /// -/// Example with [`WatchStream::new_on_changed`]: +/// Example with [`WatchStream::from_changes`]: /// /// ``` /// # #[tokio::main] @@ -58,7 +58,7 @@ use tokio::sync::watch::error::RecvError; /// use tokio_stream::{StreamExt, wrappers::WatchStream}; /// /// let (tx, rx) = watch::channel("hello"); -/// let mut rx = WatchStream::new_on_changed(rx); +/// let mut rx = WatchStream::from_changes(rx); /// /// // no output from rx is available at this point - let's check this: /// assert!(rx.next().now_or_never().is_none()); @@ -91,7 +91,7 @@ impl WatchStream { } /// Create a new `WatchStream` that waits for the value to be changed. - pub fn new_on_changed(rx: Receiver) -> Self { + pub fn from_changes(rx: Receiver) -> Self { Self { inner: ReusableBoxFuture::new(make_future(rx)), } diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index 39242553d5c..3a39aaf3db7 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -44,10 +44,10 @@ async fn watch_stream_from_rx() { } #[tokio::test] -async fn watch_stream_new_on_change() { +async fn watch_stream_from_changes() { let (tx, rx) = watch::channel("hello"); - let mut stream = WatchStream::new_on_changed(rx); + let mut stream = WatchStream::from_changes(rx); assert_pending!(spawn(&mut stream).poll_next()); From f1ddac9fc4c806edd2867f8c652467a75a8782f6 Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Sun, 19 Feb 2023 08:38:00 -0500 Subject: [PATCH 14/15] remove futures-test not needed from dev dependencies --- tokio-stream/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index d71825309f1..44b5ff463a4 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -37,7 +37,6 @@ async-stream = "0.3" parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -futures-test = "0.3.26" futures-util = "0.3.26" [package.metadata.docs.rs] From 7d0f3c562dc17d64995483afbbc9a221380c9957 Mon Sep 17 00:00:00 2001 From: "Christopher J. Brody" Date: Sun, 19 Feb 2023 09:25:10 -0500 Subject: [PATCH 15/15] fix sample import & remove futures-util from dev dependencies --- tokio-stream/Cargo.toml | 1 - tokio-stream/src/wrappers/watch.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 44b5ff463a4..01acec3cd73 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -37,7 +37,6 @@ async-stream = "0.3" parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -futures-util = "0.3.26" [package.metadata.docs.rs] all-features = true diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index d91509ba642..ec8ead06da0 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -53,7 +53,7 @@ use tokio::sync::watch::error::RecvError; /// ``` /// # #[tokio::main] /// # async fn main() { -/// use futures_util::future::FutureExt; +/// use futures::future::FutureExt; /// use tokio::sync::watch; /// use tokio_stream::{StreamExt, wrappers::WatchStream}; ///