-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync: remove broadcast channel slot level closed flag #4867
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The broadcast channel allows multiple senders to send messages to multiple receivers, where each receiver receives messages starting from when it subscribes. After all senders are dropped, the receivers will continue to receive all waiting messages in the buffer and then receive a `Closed` error. To mark that a channel has closed, it stores two closed flags, one on the channel level and another in the buffer slot *after* the last used slot (this may also be the earliest entry being kept for lagged receivers, see tokio-rs#2425). However, we don't need both closed flags, keeping the channel level closed flag is sufficient. Without the slot level closed flag, each receiver receives each message until it is up to date and for that receiver the channel is empty. Then, the actual return message is chosen depending on the channel level closed flag; if the channel is NOT closed, then `Empty` is returned, if the channel is closed then `Closed` is returned instead. With the modified logic, there is no longer a need to append a closed token to the internal buffer (by setting the slot level closed flag on the next slot). This fixes the off by one error described in tokio-rs#4814, which caused a receiver which was created after the channel was already closed to get `Empty` from `try_recv` (or hang forever when calling `recv`) instead of receiving `Closed`. As a bonus, we save a single `bool` on each buffer slot. Refs: tokio-rs#4814
Darksonn
approved these changes
Aug 10, 2022
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good.
replace unsafe call with `UnsafeCell::new` as the unsafe isn't necessary. Co-authored-by: Alice Ryhl <aliceryhl@google.com>
crapStone
pushed a commit
to Calciumdibromid/CaBr2
that referenced
this pull request
Sep 11, 2022
This PR contains the following updates: | Package | Type | Update | Change | |---|---|---|---| | [tokio](https://tokio.rs) ([source](/~https://github.com/tokio-rs/tokio)) | dependencies | minor | `1.20.1` -> `1.21.0` | | [tokio](https://tokio.rs) ([source](/~https://github.com/tokio-rs/tokio)) | dev-dependencies | minor | `1.20.1` -> `1.21.0` | --- ### Release Notes <details> <summary>tokio-rs/tokio</summary> ### [`v1.21.0`](/~https://github.com/tokio-rs/tokio/releases/tag/tokio-1.21.0) [Compare Source](tokio-rs/tokio@tokio-1.20.1...tokio-1.21.0) ##### 1.21.0 (September 2, 2022) This release is the first release of Tokio to intentionally support WASM. The `sync,macros,io-util,rt,time` features are stabilized on WASM. Additionally the wasm32-wasi target is given unstable support for the `net` feature. ##### Added - net: add `device` and `bind_device` methods to TCP/UDP sockets ([#​4882]) - net: add `tos` and `set_tos` methods to TCP and UDP sockets ([#​4877]) - net: add security flags to named pipe `ServerOptions` ([#​4845]) - signal: add more windows signal handlers ([#​4924]) - sync: add `mpsc::Sender::max_capacity` method ([#​4904]) - sync: implement Weak version of `mpsc::Sender` ([#​4595]) - task: add `LocalSet::enter` ([#​4765]) - task: stabilize `JoinSet` and `AbortHandle` ([#​4920]) - tokio: add `track_caller` to public APIs ([#​4805], [#​4848], [#​4852]) - wasm: initial support for `wasm32-wasi` target ([#​4716]) ##### Fixed - miri: improve miri compatibility by avoiding temporary references in `linked_list::Link` impls ([#​4841]) - signal: don't register write interest on signal pipe ([#​4898]) - sync: add `#[must_use]` to lock guards ([#​4886]) - sync: fix hang when calling `recv` on closed and reopened broadcast channel ([#​4867]) - task: propagate attributes on task-locals ([#​4837]) ##### Changed - fs: change panic to error in `File::start_seek` ([#​4897]) - io: reduce syscalls in `poll_read` ([#​4840]) - process: use blocking threadpool for child stdio I/O ([#​4824]) - signal: make `SignalKind` methods const ([#​4956]) ##### Internal changes - rt: extract `basic_scheduler::Config` ([#​4935]) - rt: move I/O driver into `runtime` module ([#​4942]) - rt: rename internal scheduler types ([#​4945]) ##### Documented - chore: fix typos and grammar ([#​4858], [#​4894], [#​4928]) - io: fix typo in `AsyncSeekExt::rewind` docs ([#​4893]) - net: add documentation to `try_read()` for zero-length buffers ([#​4937]) - runtime: remove incorrect panic section for `Builder::worker_threads` ([#​4849]) - sync: doc of `watch::Sender::send` improved ([#​4959]) - task: add cancel safety docs to `JoinHandle` ([#​4901]) - task: expand on cancellation of `spawn_blocking` ([#​4811]) - time: clarify that the first tick of `Interval::tick` happens immediately ([#​4951]) ##### Unstable - rt: add unstable option to disable the LIFO slot ([#​4936]) - task: fix incorrect signature in `Builder::spawn_on` ([#​4953]) - task: make `task::Builder::spawn*` methods fallible ([#​4823]) [#​4595]: tokio-rs/tokio#4595 [#​4716]: tokio-rs/tokio#4716 [#​4765]: tokio-rs/tokio#4765 [#​4805]: tokio-rs/tokio#4805 [#​4811]: tokio-rs/tokio#4811 [#​4823]: tokio-rs/tokio#4823 [#​4824]: tokio-rs/tokio#4824 [#​4837]: tokio-rs/tokio#4837 [#​4840]: tokio-rs/tokio#4840 [#​4841]: tokio-rs/tokio#4841 [#​4845]: tokio-rs/tokio#4845 [#​4848]: tokio-rs/tokio#4848 [#​4849]: tokio-rs/tokio#4849 [#​4852]: tokio-rs/tokio#4852 [#​4858]: tokio-rs/tokio#4858 [#​4867]: tokio-rs/tokio#4867 [#​4877]: tokio-rs/tokio#4877 [#​4882]: tokio-rs/tokio#4882 [#​4886]: tokio-rs/tokio#4886 [#​4893]: tokio-rs/tokio#4893 [#​4894]: tokio-rs/tokio#4894 [#​4897]: tokio-rs/tokio#4897 [#​4898]: tokio-rs/tokio#4898 [#​4901]: tokio-rs/tokio#4901 [#​4904]: tokio-rs/tokio#4904 [#​4920]: tokio-rs/tokio#4920 [#​4924]: tokio-rs/tokio#4924 [#​4928]: tokio-rs/tokio#4928 [#​4935]: tokio-rs/tokio#4935 [#​4936]: tokio-rs/tokio#4936 [#​4937]: tokio-rs/tokio#4937 [#​4942]: tokio-rs/tokio#4942 [#​4945]: tokio-rs/tokio#4945 [#​4951]: tokio-rs/tokio#4951 [#​4953]: tokio-rs/tokio#4953 [#​4956]: tokio-rs/tokio#4956 [#​4959]: tokio-rs/tokio#4959 </details> --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about these updates again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox. --- This PR has been generated by [Renovate Bot](/~https://github.com/renovatebot/renovate). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzMi4xODcuMCIsInVwZGF0ZWRJblZlciI6IjMyLjE4Ny4wIn0=--> Co-authored-by: cabr2-bot <cabr2.help@gmail.com> Reviewed-on: https://codeberg.org/Calciumdibromid/CaBr2/pulls/1532 Reviewed-by: crapStone <crapstone@noreply.codeberg.org> Co-authored-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org> Co-committed-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #4814
Motivation
There is an off by one error discovered by @nathaniel-daniel in #4814 which caused a
receiver which is created after a channel has already been closed to receive
Empty
instead of
Closed
when callingtry_recv
. This would also cause the async functionrecv
to wait forever.The broadcast channel stores two
closed
flags, one in each slot in the buffer (to indicatethe slot after the last filled slot) and a channel wide flag on the
tail
member. Theimplementation misses the slot level
closed
flag because itsnext
field doesn't accountfor the increment when it was added.
As @nathaniel-daniel mentions in their issue, this can be solved by updating the
next
field on the new receiver (seriously, this observation helped me a lot, thank you!).
However, after digging into the internals of the broadcast channel a bit, it looks like we
don't need both the slot level and the channel level
closed
flags. Instead, we can relyon only the channel level closed flag.
Instead, a receiver will continue to receive buffered values until it reaches the tail of the
buffer. At that point we check whether the channel level
closed
flag is set and returnEmpty
orClosed
. This fixes the off by one error and logically should work in all cases(and the tests all pass without modification).
Solution
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a
Closed
error.To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot after the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see #2425).
However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.
Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then
Empty
is returned, ifthe channel is closed then
Closed
is returned instead.With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in #4814,
which caused a receiver which was created after the channel was already
closed to get
Empty
fromtry_recv
(or hang forever when callingrecv
) instead of receivingClosed
.As a bonus, we save a single
bool
on each buffer slot.Refs: #4814