Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: remove broadcast channel slot level closed flag #4867

Merged
merged 2 commits into from
Aug 10, 2022

Conversation

hds
Copy link
Contributor

@hds hds commented Jul 26, 2022

Fixes #4814

Motivation

There is an off by one error discovered by @nathaniel-daniel in #4814 which caused a
receiver which is created after a channel has already been closed to receive Empty
instead of Closed when calling try_recv. This would also cause the async function
recv to wait forever.

The broadcast channel stores two closed flags, one in each slot in the buffer (to indicate
the slot after the last filled slot) and a channel wide flag on the tail member. The
implementation misses the slot level closed flag because its next field doesn't account
for the increment when it was added.

As @nathaniel-daniel mentions in their issue, this can be solved by updating the next
field on the new receiver (seriously, this observation helped me a lot, thank you!).

However, after digging into the internals of the broadcast channel a bit, it looks like we
don't need both the slot level and the channel level closed flags. Instead, we can rely
on only the channel level closed flag.

Instead, a receiver will continue to receive buffered values until it reaches the tail of the
buffer. At that point we check whether the channel level closed flag is set and return
Empty or Closed. This fixes the off by one error and logically should work in all cases
(and the tests all pass without modification).

Solution

The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a Closed error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot after the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see #2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then Empty is returned, if
the channel is closed then Closed is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in #4814,
which caused a receiver which was created after the channel was already
closed to get Empty from try_recv (or hang forever when calling
recv) instead of receiving Closed.

As a bonus, we save a single bool on each buffer slot.

Refs: #4814

The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single `bool` on each buffer slot.

Refs: tokio-rs#4814
@github-actions github-actions bot added the R-loom Run loom tests on this PR label Jul 26, 2022
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jul 26, 2022
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good.

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
replace unsafe call with `UnsafeCell::new` as the unsafe isn't necessary.

Co-authored-by: Alice Ryhl <aliceryhl@google.com>
@Darksonn Darksonn merged commit 9d9488d into tokio-rs:master Aug 10, 2022
crapStone pushed a commit to Calciumdibromid/CaBr2 that referenced this pull request Sep 11, 2022
This PR contains the following updates:

| Package | Type | Update | Change |
|---|---|---|---|
| [tokio](https://tokio.rs) ([source](/~https://github.com/tokio-rs/tokio)) | dependencies | minor | `1.20.1` -> `1.21.0` |
| [tokio](https://tokio.rs) ([source](/~https://github.com/tokio-rs/tokio)) | dev-dependencies | minor | `1.20.1` -> `1.21.0` |

---

### Release Notes

<details>
<summary>tokio-rs/tokio</summary>

### [`v1.21.0`](/~https://github.com/tokio-rs/tokio/releases/tag/tokio-1.21.0)

[Compare Source](tokio-rs/tokio@tokio-1.20.1...tokio-1.21.0)

##### 1.21.0 (September 2, 2022)

This release is the first release of Tokio to intentionally support WASM. The `sync,macros,io-util,rt,time` features are stabilized on WASM. Additionally the wasm32-wasi target is given unstable support for the `net` feature.

##### Added

-   net: add `device` and `bind_device` methods to TCP/UDP sockets ([#&#8203;4882])
-   net: add `tos` and `set_tos` methods to TCP and UDP sockets ([#&#8203;4877])
-   net: add security flags to named pipe `ServerOptions` ([#&#8203;4845])
-   signal: add more windows signal handlers ([#&#8203;4924])
-   sync: add `mpsc::Sender::max_capacity` method ([#&#8203;4904])
-   sync: implement Weak version of `mpsc::Sender` ([#&#8203;4595])
-   task: add `LocalSet::enter` ([#&#8203;4765])
-   task: stabilize `JoinSet` and `AbortHandle` ([#&#8203;4920])
-   tokio: add `track_caller` to public APIs ([#&#8203;4805], [#&#8203;4848], [#&#8203;4852])
-   wasm: initial support for `wasm32-wasi` target ([#&#8203;4716])

##### Fixed

-   miri: improve miri compatibility by avoiding temporary references in `linked_list::Link` impls ([#&#8203;4841])
-   signal: don't register write interest on signal pipe ([#&#8203;4898])
-   sync: add `#[must_use]` to lock guards ([#&#8203;4886])
-   sync: fix hang when calling `recv` on closed and reopened broadcast channel ([#&#8203;4867])
-   task: propagate attributes on task-locals ([#&#8203;4837])

##### Changed

-   fs: change panic to error in `File::start_seek` ([#&#8203;4897])
-   io: reduce syscalls in `poll_read` ([#&#8203;4840])
-   process: use blocking threadpool for child stdio I/O ([#&#8203;4824])
-   signal: make `SignalKind` methods const ([#&#8203;4956])

##### Internal changes

-   rt: extract `basic_scheduler::Config` ([#&#8203;4935])
-   rt: move I/O driver into `runtime` module ([#&#8203;4942])
-   rt: rename internal scheduler types ([#&#8203;4945])

##### Documented

-   chore: fix typos and grammar ([#&#8203;4858], [#&#8203;4894], [#&#8203;4928])
-   io: fix typo in `AsyncSeekExt::rewind` docs ([#&#8203;4893])
-   net: add documentation to `try_read()` for zero-length buffers ([#&#8203;4937])
-   runtime: remove incorrect panic section for `Builder::worker_threads` ([#&#8203;4849])
-   sync: doc of `watch::Sender::send` improved ([#&#8203;4959])
-   task: add cancel safety docs to `JoinHandle` ([#&#8203;4901])
-   task: expand on cancellation of `spawn_blocking` ([#&#8203;4811])
-   time: clarify that the first tick of `Interval::tick` happens immediately ([#&#8203;4951])

##### Unstable

-   rt: add unstable option to disable the LIFO slot ([#&#8203;4936])
-   task: fix incorrect signature in `Builder::spawn_on` ([#&#8203;4953])
-   task: make `task::Builder::spawn*` methods fallible ([#&#8203;4823])

[#&#8203;4595]: tokio-rs/tokio#4595

[#&#8203;4716]: tokio-rs/tokio#4716

[#&#8203;4765]: tokio-rs/tokio#4765

[#&#8203;4805]: tokio-rs/tokio#4805

[#&#8203;4811]: tokio-rs/tokio#4811

[#&#8203;4823]: tokio-rs/tokio#4823

[#&#8203;4824]: tokio-rs/tokio#4824

[#&#8203;4837]: tokio-rs/tokio#4837

[#&#8203;4840]: tokio-rs/tokio#4840

[#&#8203;4841]: tokio-rs/tokio#4841

[#&#8203;4845]: tokio-rs/tokio#4845

[#&#8203;4848]: tokio-rs/tokio#4848

[#&#8203;4849]: tokio-rs/tokio#4849

[#&#8203;4852]: tokio-rs/tokio#4852

[#&#8203;4858]: tokio-rs/tokio#4858

[#&#8203;4867]: tokio-rs/tokio#4867

[#&#8203;4877]: tokio-rs/tokio#4877

[#&#8203;4882]: tokio-rs/tokio#4882

[#&#8203;4886]: tokio-rs/tokio#4886

[#&#8203;4893]: tokio-rs/tokio#4893

[#&#8203;4894]: tokio-rs/tokio#4894

[#&#8203;4897]: tokio-rs/tokio#4897

[#&#8203;4898]: tokio-rs/tokio#4898

[#&#8203;4901]: tokio-rs/tokio#4901

[#&#8203;4904]: tokio-rs/tokio#4904

[#&#8203;4920]: tokio-rs/tokio#4920

[#&#8203;4924]: tokio-rs/tokio#4924

[#&#8203;4928]: tokio-rs/tokio#4928

[#&#8203;4935]: tokio-rs/tokio#4935

[#&#8203;4936]: tokio-rs/tokio#4936

[#&#8203;4937]: tokio-rs/tokio#4937

[#&#8203;4942]: tokio-rs/tokio#4942

[#&#8203;4945]: tokio-rs/tokio#4945

[#&#8203;4951]: tokio-rs/tokio#4951

[#&#8203;4953]: tokio-rs/tokio#4953

[#&#8203;4956]: tokio-rs/tokio#4956

[#&#8203;4959]: tokio-rs/tokio#4959

</details>

---

### Configuration

📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about these updates again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox.

---

This PR has been generated by [Renovate Bot](/~https://github.com/renovatebot/renovate).
<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzMi4xODcuMCIsInVwZGF0ZWRJblZlciI6IjMyLjE4Ny4wIn0=-->

Co-authored-by: cabr2-bot <cabr2.help@gmail.com>
Reviewed-on: https://codeberg.org/Calciumdibromid/CaBr2/pulls/1532
Reviewed-by: crapStone <crapstone@noreply.codeberg.org>
Co-authored-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
Co-committed-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom Run loom tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Resubscribing to a closed broadcast receiver will hang on a call to recv
2 participants