Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WatchStream: Add additional constructor which causes stream to only yield when value changes #5420

Closed
extremeandy opened this issue Feb 2, 2023 · 6 comments · Fixed by #5432
Labels
A-tokio-stream Area: The tokio-stream crate C-feature-request Category: A feature request. M-sync Module: tokio/sync

Comments

@extremeandy
Copy link

Is your feature request related to a problem? Please describe.

When using tokio_stream::wrappers::WatchStream, it always yield the current value even if unchanged:

/// This stream will always start by yielding the current value when the WatchStream is polled,
/// regardless of whether it was the initial value or sent afterwards.

I have a use case where I would like to (synchronously) borrow the current value from the Receiver, then construct a WatchStream of subsequent changes.

Describe the solution you'd like
Add a new constructor variant which only yields once a change is observed.

For instance:

/// Create a new `WatchStream` which will only yield after the value changes.
pub fn skip_unchanged(rx: Receiver<T>) -> Self {
    Self {
        inner: ReusableBoxFuture::new(make_future(rx)),
    }
}

Describe alternatives you've considered

Another possibility is to create a WatchStream and then call next() and await the Future. However, in my use case this adds complexity because I want to fetch the initial value synchronously.

@extremeandy extremeandy added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Feb 2, 2023
@Darksonn Darksonn added M-sync Module: tokio/sync A-tokio-stream Area: The tokio-stream crate and removed A-tokio Area: The main tokio crate labels Feb 2, 2023
@Darksonn
Copy link
Contributor

Darksonn commented Feb 2, 2023

I would be happy to accept a PR that adds a constructor like that.

@brody4hire
Copy link

brody4hire commented Feb 6, 2023

I just raised PR #5432.

I have a use case where [...]

I think it would be really helpful if someone could come up with a concrete, real-world example if possible without exposing anything private.

Another possibility is to create a WatchStream and then call next() and await the Future.

assuming this does not lead to any possible race conditions

@extremeandy
Copy link
Author

extremeandy commented Feb 8, 2023

I will try to give a little more info about my use case.

struct SnapshotWithUpdates<T, S>
where
    S: Stream<Item = (String, T)>,
{
    snapshot: HashMap<String, T>,
    updates: S,
}

fn snapshot_with_updates<T>(
    receivers: HashMap<String, Receiver<T>>,
) -> SnapshotWithUpdates<T, impl Stream<Item = (String, T)>> {
    ...
}

snapshot_with_updates returns:

  1. An initial snapshot of the values in all the Receivers i.e. HashMap<String, T>
  2. A stream of all updates which follow. Note this stream should ONLY include changes that were not included in the initial snapshot

So in order to do this I want to take the current value from every Receiver to build the snapshot, and then just listen for changes on each Receiver for the flattened update stream.

@brody4hire
Copy link

Thanks for the example, definitely helpful. I would probably favor simplifying it like this (does not compile):

fn snapshot_with_updates<T>(
    receivers: HashMap<String, Receiver<T>>,
) -> (HashMap<String, T>, impl Stream<Item = (String, T)>) {
    // ...
}

I wonder if we can use anything such as map or merge to help make this more idiomatic?

I would also appreciate it if you can take a look at what I proposed in PR #5432 and comment if it really helps or if you can think of anything else we should do?

@extremeandy
Copy link
Author

extremeandy commented Feb 11, 2023

The struct with named fields was just to make the example a bit clearer on what the two fields were but yes a tuple works too!

Not sure what you mean about map/merge, to me this sort of operation (the example) seems quite use-case specific 🤔

@extremeandy
Copy link
Author

extremeandy commented Mar 1, 2023

I realised after all this... that it's not quite the behaviour I needed 😅

What we have now is a WatchStream which will yield if the value has changed since the Receiver was created.
What I actually wanted was a WatchStream which will yield if the value has changed since the WatchStream was created.

I found a much simpler way to do this:

WatchStream::new(rx).skip(1)

Tada!

I'm sure someone will find this feature useful though! Thanks for all your work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate C-feature-request Category: A feature request. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants