Skip to content

Commit

Permalink
Use oneshot channel from the oneshot crate instead of a capacity 1 as…
Browse files Browse the repository at this point in the history
…ync_channel

we can't use the one from tokio bc recv_blocking panics when used in the runtime.
  • Loading branch information
rklaehn committed Aug 14, 2024
1 parent 2433d46 commit 9a936d0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 29 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.22.0", path = "../iroh-metrics", default-features = false }
iroh-net = { version = "0.22.0", path = "../iroh-net" }
num_cpus = "1.15.0"
oneshot = "0.1.8"
parking_lot = { version = "0.12.1", optional = true }
pin-project = "1.1.5"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
Expand Down
46 changes: 26 additions & 20 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use iroh_io::AsyncSliceReader;
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tokio::{io::AsyncWriteExt, sync::oneshot};
use tokio::io::AsyncWriteExt;
use tracing::trace_span;

mod import_flat_store;
Expand Down Expand Up @@ -534,25 +534,25 @@ pub(crate) enum ActorMessage {
/// Query method: get the rough entry status for a hash. Just complete, partial or not found.
EntryStatus {
hash: Hash,
tx: async_channel::Sender<ActorResult<EntryStatus>>,
tx: oneshot::Sender<ActorResult<EntryStatus>>,
},
#[cfg(test)]
/// Query method: get the full entry state for a hash, both in memory and in redb.
/// This is everything we got about the entry, including the actual inline outboard and data.
EntryState {
hash: Hash,
tx: async_channel::Sender<ActorResult<test_support::EntryStateResponse>>,
tx: oneshot::Sender<ActorResult<test_support::EntryStateResponse>>,
},
/// Query method: get the full entry state for a hash.
GetFullEntryState {
hash: Hash,
tx: async_channel::Sender<ActorResult<Option<EntryData>>>,
tx: oneshot::Sender<ActorResult<Option<EntryData>>>,
},
/// Modification method: set the full entry state for a hash.
SetFullEntryState {
hash: Hash,
entry: Option<EntryData>,
tx: async_channel::Sender<ActorResult<()>>,
tx: oneshot::Sender<ActorResult<()>>,
},
/// Modification method: get or create a file handle for a hash.
///
Expand All @@ -575,7 +575,7 @@ pub(crate) enum ActorMessage {
/// At this point the size, hash and outboard must already be known.
Import {
cmd: Import,
tx: async_channel::Sender<ActorResult<(TempTag, u64)>>,
tx: oneshot::Sender<ActorResult<(TempTag, u64)>>,
},
/// Modification method: export data from a redb store
///
Expand Down Expand Up @@ -924,18 +924,18 @@ impl StoreInner {
}

async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
let (tx, rx) = async_channel::bounded(1);
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::EntryStatus { hash: *hash, tx })
.await?;
Ok(rx.recv().await??)
Ok(rx.await??)
}

fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
let (tx, rx) = async_channel::bounded(1);
let (tx, rx) = oneshot::channel();
self.tx
.send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?;
Ok(rx.recv_blocking()??)
Ok(rx.recv()??)
}

async fn complete(&self, entry: Entry) -> OuterResult<()> {
Expand Down Expand Up @@ -1131,7 +1131,7 @@ impl StoreInner {
let tag = self.temp.temp_tag(HashAndFormat { hash, format });
let hash = *tag.hash();
// blocking send for the import
let (tx, rx) = async_channel::bounded(1);
let (tx, rx) = oneshot::channel();
self.tx.send_blocking(ActorMessage::Import {
cmd: Import {
content_id: HashAndFormat { hash, format },
Expand All @@ -1141,7 +1141,7 @@ impl StoreInner {
},
tx,
})?;
Ok(rx.recv_blocking()??)
Ok(rx.recv()??)
}

fn temp_file_name(&self) -> PathBuf {
Expand Down Expand Up @@ -1234,18 +1234,24 @@ pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
pub(crate) enum OuterError {
#[error("inner error: {0}")]
Inner(#[from] ActorError),
#[error("send error: {0}")]
Send(#[from] async_channel::SendError<ActorMessage>),
#[error("send error")]
Send,
#[error("progress send error: {0}")]
ProgressSend(#[from] ProgressSendError),
#[error("recv error: {0}")]
Recv(#[from] oneshot::error::RecvError),
Recv2(#[from] oneshot::RecvError),
#[error("recv error: {0}")]
AsyncChannelRecv(#[from] async_channel::RecvError),
#[error("join error: {0}")]
JoinTask(#[from] tokio::task::JoinError),
}

impl From<async_channel::SendError<ActorMessage>> for OuterError {
fn from(_e: async_channel::SendError<ActorMessage>) -> Self {
OuterError::Send
}
}

/// Result type for calling the redb actor from the store.
///
/// See [`OuterError`] for what can go wrong.
Expand Down Expand Up @@ -2235,7 +2241,7 @@ impl ActorState {
}
ActorMessage::EntryStatus { hash, tx } => {
let res = self.entry_status(tables, hash);
tx.send_blocking(res).ok();
tx.send(res).ok();
}
ActorMessage::Blobs { filter, tx } => {
let res = self.blobs(tables, filter);
Expand All @@ -2255,11 +2261,11 @@ impl ActorState {
}
#[cfg(test)]
ActorMessage::EntryState { hash, tx } => {
tx.send_blocking(self.entry_state(tables, hash)).ok();
tx.send(self.entry_state(tables, hash)).ok();
}
ActorMessage::GetFullEntryState { hash, tx } => {
let res = self.get_full_entry_state(tables, hash);
tx.send_blocking(res).ok();
tx.send(res).ok();
}
x => return Ok(Err(x)),
}
Expand All @@ -2274,7 +2280,7 @@ impl ActorState {
match msg {
ActorMessage::Import { cmd, tx } => {
let res = self.import(tables, cmd);
tx.send_blocking(res).ok();
tx.send(res).ok();
}
ActorMessage::SetTag { tag, value, tx } => {
let res = self.set_tag(tables, tag, value);
Expand Down Expand Up @@ -2305,7 +2311,7 @@ impl ActorState {
}
ActorMessage::SetFullEntryState { hash, entry, tx } => {
let res = self.set_full_entry_state(tables, hash, entry);
tx.send_blocking(res).ok();
tx.send(res).ok();
}
msg => {
// try to handle it as readonly
Expand Down
16 changes: 7 additions & 9 deletions iroh-blobs/src/store/fs/test_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use std::{
path::{Path, PathBuf},
};

use tokio::sync::oneshot;

use super::{
tables::{ReadableTables, Tables},
ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate,
Expand Down Expand Up @@ -106,29 +104,29 @@ impl Store {
impl StoreInner {
#[cfg(test)]
async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse> {
let (tx, rx) = async_channel::bounded(1);
let (tx, rx) = ::oneshot::channel();
self.tx.send(ActorMessage::EntryState { hash, tx }).await?;
Ok(rx.recv().await??)
Ok(rx.await??)
}

async fn set_full_entry_state(&self, hash: Hash, entry: Option<EntryData>) -> OuterResult<()> {
let (tx, rx) = async_channel::bounded(1);
let (tx, rx) = ::oneshot::channel();
self.tx
.send(ActorMessage::SetFullEntryState { hash, entry, tx })
.await?;
Ok(rx.recv().await??)
Ok(rx.await??)
}

async fn get_full_entry_state(&self, hash: Hash) -> OuterResult<Option<EntryData>> {
let (tx, rx) = async_channel::bounded(1);
let (tx, rx) = ::oneshot::channel();
self.tx
.send(ActorMessage::GetFullEntryState { hash, tx })
.await?;
Ok(rx.recv().await??)
Ok(rx.await??)
}

async fn all_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
let (tx, rx) = oneshot::channel();
let (tx, rx) = ::oneshot::channel();
let filter: FilterPredicate<Hash, EntryState> =
Box::new(|_i, k, v| Some((k.value(), v.value())));
self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
Expand Down

0 comments on commit 9a936d0

Please sign in to comment.