-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add WatchStream::from_changes
, add testing, etc.
#5432
stream: add WatchStream::from_changes
, add testing, etc.
#5432
Conversation
add a test case for existing `WatchStream::from` update existing WatchStream test function name apply minor documentation updates
tokio-stream/src/wrappers/watch.rs
Outdated
@@ -66,6 +67,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> { | |||
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), | |||
} | |||
} | |||
|
|||
/// Create a new `WatchStream` that waits for the value to be changed. | |||
pub fn new_on_changed(rx: Receiver<T>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be open to changing this function name. I tried to come up with something consistent with the Receiver::changed
function that we await for, but doesn't feel as pretty as I would like.
I thought of from_changes
and new_from_changes
, but do not feel as consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about new_skip_unchanged
? Or new_skip_until_changed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think new_on_changed
is fine.
…atch-stream-on-changed
…atch-stream-on-changed
…atch-stream-on-changed
I have simplified the test with help from the tokio-test utility and have added an example to the documentation. I have also aded a test to the documentation to assert that the watch stream is still pending until new data is available. If we keep this, it may be enough overlap to remove the test case I added. |
I think this should be better now, please let me know if anything else is needed from my end to get this accepted and merged. I think there is an overlap between the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than this comment, it looks good to me.
Co-authored-by: Alice Ryhl <aliceryhl@google.com>
WatchStream::new_on_changed
, add testing, etc.WatchStream:: from_changes
, add testing, etc.
WatchStream:: from_changes
, add testing, etc.WatchStream::from_changes
, add testing, etc.
…atch-stream-on-changed
tokio-stream/Cargo.toml
Outdated
futures-test = "0.3.26" | ||
futures-util = "0.3.26" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need these. We already import futures
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that FutureExt
is available from the futures
crate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed futures-test
. Unfortunately I still need futures-util
to import FutureExt
as I needed to get now_or_never()
to work.
I noticed that tokio/src/sync/watch.rs
is using recv.changed().now_or_never()
from crate::sync::watch::channel(0i32)
in some tests. I would like to investigate this a little further. Any pointers would still be highly appreciated as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
futures::future::FutureExt
worked for me thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
other minor updates:
WatchStream::from
resolves #5420
Motivation
see #5420
It looks like
WatchStream::from
was not covered by documentation or integration test cases, adding one here as well.Quick disclaimer that I am still quite new with Rust in contrast to my years in quite a few other major systems programming languages.
Solution
WatchStream::from_changes
, with implementation as suggested in WatchStream: Add additional constructor which causes stream to only yield when value changes #5420, along with a test case and minor documentation updates