-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync::broadcast returns Lagged error when capacity is not exceeded #2425
Comments
Darksonn
added
A-tokio
Area: The main tokio crate
C-question
User questions that are neither feature requests nor bug reports
M-sync
Module: tokio/sync
C-bug
Category: This is a bug.
and removed
C-question
User questions that are neither feature requests nor bug reports
labels
Apr 21, 2020
This appears to be a bug. I will take a look at it when I have time. |
This is because the stream sends a |
carllerche
pushed a commit
that referenced
this issue
Apr 28, 2020
Broadcast uses a ring buffer to store values sent to the channel. In order to deal with slow receivers, the oldest values are overwritten with new values once the buffer wraps. A receiver should be able to calculate how many values it has missed. Additionally, when the broadcast closes, a final value of `None` is sent to the channel. If the buffer has wrapped, this value overwrites the oldest value. This is an issue mainly in a single capacity broadcast when a value is sent and then the sender is dropped. The original value is immediately overwritten with `None` meaning that receivers assume they have lagged behind. **Solution** A value of `None` is no longer sent to the channel when the final sender has been dropped. This solves the single capacity broadcast case by completely removing the behavior of overwriting values when the channel is closed. Now, when the final sender is dropped a closed bit is set on the next slot that the channel is supposed to send to. In the case of a fast receiver, if it finds a slot where the closed bit is set, it knows the channel is closed without locking the tail. In the case of a slow receiver, it must first find out if it has missed any values. This is similar to before, but must be able to account for channel closure. If the channel is not closed, the oldest value may be located at index `n`. If the channel is closed, the oldest value is located at index `n - 1`. Knowing the index where the oldest value is located, a receiver can calculate how many values it may have missed and starts to catch up. Closes #2425
hds
added a commit
to hds/tokio
that referenced
this issue
Jul 26, 2022
The broadcast channel allows multiple senders to send messages to multiple receivers, where each receiver receives messages starting from when it subscribes. After all senders are dropped, the receivers will continue to receive all waiting messages in the buffer and then receive a `Closed` error. To mark that a channel has closed, it stores two closed flags, one on the channel level and another in the buffer slot *after* the last used slot (this may also be the earliest entry being kept for lagged receivers, see tokio-rs#2425). However, we don't need both closed flags, keeping the channel level closed flag is sufficient. Without the slot level closed flag, each receiver receives each message until it is up to date and for that receiver the channel is empty. Then, the actual return message is chosen depending on the channel level closed flag; if the channel is NOT closed, then `Empty` is returned, if the channel is closed then `Closed` is returned instead. With the modified logic, there is no longer a need to append a closed token to the internal buffer (by setting the slot level closed flag on the next slot). This fixes the off by one error described in tokio-rs#4814, which caused a receiver which was created after the channel was already closed to get `Empty` from `try_recv` (or hang forever when calling `recv`) instead of receiving `Closed`. As a bonus, we save a single bool on each buffer slot.
hds
added a commit
to hds/tokio
that referenced
this issue
Jul 26, 2022
The broadcast channel allows multiple senders to send messages to multiple receivers, where each receiver receives messages starting from when it subscribes. After all senders are dropped, the receivers will continue to receive all waiting messages in the buffer and then receive a `Closed` error. To mark that a channel has closed, it stores two closed flags, one on the channel level and another in the buffer slot *after* the last used slot (this may also be the earliest entry being kept for lagged receivers, see tokio-rs#2425). However, we don't need both closed flags, keeping the channel level closed flag is sufficient. Without the slot level closed flag, each receiver receives each message until it is up to date and for that receiver the channel is empty. Then, the actual return message is chosen depending on the channel level closed flag; if the channel is NOT closed, then `Empty` is returned, if the channel is closed then `Closed` is returned instead. With the modified logic, there is no longer a need to append a closed token to the internal buffer (by setting the slot level closed flag on the next slot). This fixes the off by one error described in tokio-rs#4814, which caused a receiver which was created after the channel was already closed to get `Empty` from `try_recv` (or hang forever when calling `recv`) instead of receiving `Closed`. As a bonus, we save a single bool on each buffer slot. Refs: tokio-rs#4814
hds
added a commit
to hds/tokio
that referenced
this issue
Jul 26, 2022
The broadcast channel allows multiple senders to send messages to multiple receivers, where each receiver receives messages starting from when it subscribes. After all senders are dropped, the receivers will continue to receive all waiting messages in the buffer and then receive a `Closed` error. To mark that a channel has closed, it stores two closed flags, one on the channel level and another in the buffer slot *after* the last used slot (this may also be the earliest entry being kept for lagged receivers, see tokio-rs#2425). However, we don't need both closed flags, keeping the channel level closed flag is sufficient. Without the slot level closed flag, each receiver receives each message until it is up to date and for that receiver the channel is empty. Then, the actual return message is chosen depending on the channel level closed flag; if the channel is NOT closed, then `Empty` is returned, if the channel is closed then `Closed` is returned instead. With the modified logic, there is no longer a need to append a closed token to the internal buffer (by setting the slot level closed flag on the next slot). This fixes the off by one error described in tokio-rs#4814, which caused a receiver which was created after the channel was already closed to get `Empty` from `try_recv` (or hang forever when calling `recv`) instead of receiving `Closed`. As a bonus, we save a single bool on each buffer slot. Refs: tokio-rs#4814
hds
added a commit
to hds/tokio
that referenced
this issue
Jul 26, 2022
The broadcast channel allows multiple senders to send messages to multiple receivers, where each receiver receives messages starting from when it subscribes. After all senders are dropped, the receivers will continue to receive all waiting messages in the buffer and then receive a `Closed` error. To mark that a channel has closed, it stores two closed flags, one on the channel level and another in the buffer slot *after* the last used slot (this may also be the earliest entry being kept for lagged receivers, see tokio-rs#2425). However, we don't need both closed flags, keeping the channel level closed flag is sufficient. Without the slot level closed flag, each receiver receives each message until it is up to date and for that receiver the channel is empty. Then, the actual return message is chosen depending on the channel level closed flag; if the channel is NOT closed, then `Empty` is returned, if the channel is closed then `Closed` is returned instead. With the modified logic, there is no longer a need to append a closed token to the internal buffer (by setting the slot level closed flag on the next slot). This fixes the off by one error described in tokio-rs#4814, which caused a receiver which was created after the channel was already closed to get `Empty` from `try_recv` (or hang forever when calling `recv`) instead of receiving `Closed`. As a bonus, we save a single `bool` on each buffer slot. Refs: tokio-rs#4814
Darksonn
pushed a commit
that referenced
this issue
Aug 10, 2022
The broadcast channel allows multiple senders to send messages to multiple receivers, where each receiver receives messages starting from when it subscribes. After all senders are dropped, the receivers will continue to receive all waiting messages in the buffer and then receive a `Closed` error. To mark that a channel has closed, it stores two closed flags, one on the channel level and another in the buffer slot *after* the last used slot (this may also be the earliest entry being kept for lagged receivers, see #2425). However, we don't need both closed flags, keeping the channel level closed flag is sufficient. Without the slot level closed flag, each receiver receives each message until it is up to date and for that receiver the channel is empty. Then, the actual return message is chosen depending on the channel level closed flag; if the channel is NOT closed, then `Empty` is returned, if the channel is closed then `Closed` is returned instead. With the modified logic, there is no longer a need to append a closed token to the internal buffer (by setting the slot level closed flag on the next slot). This fixes the off by one error described in #4814, which caused a receiver which was created after the channel was already closed to get `Empty` from `try_recv` (or hang forever when calling `recv`) instead of receiving `Closed`. As a bonus, we save a single `bool` on each buffer slot. Refs: #4814
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Version
rustc 1.41.0 (5e1a79984 2020-01-27)
tokio: 0.2.18
Platform
Linux 5.3.0-45-generic #37~18.04.1-Ubuntu
Description
Hi! I ran into an issue on the sync::broadcast::channel where buffering events up to the channel's capacity triggers a
RecvError::Lagged
error when the channel is finally read from. I expected this error to only be triggered if the channel's capacity is exceeded.As an example the following code errors out with
Err(Lagged(1))
.It seems to be a threading issue? compared to the following that works as expected.
The text was updated successfully, but these errors were encountered: