Skip to content

Commit

Permalink
refactor(iroh-bytes): Async bao store (n0-computer#2043)
Browse files Browse the repository at this point in the history
## Description

Asyncify bao store traits

Asyncify almost the complete surface area of the bao store traits, to
make an actor based implementation that lives on its own
std::thread::Thread at least theoretically possible.

## Notes & open questions

Note: There is a sync method `fn entry_status_sync` so we don't have to
rewrite the document sync algorithm. It should be replaced asap since it
will be not very efficient.

Note2: there is 1 unrelated change: get_or_create_partial is renamed to
just get_or_create, since you have no control over whether the created
entry is partial. If the entry is already complete, what do you do? You
return a complete entry.

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
rklaehn authored and fubuloubu committed Feb 27, 2024
1 parent e80d894 commit 9369ff4
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 94 deletions.
2 changes: 1 addition & 1 deletion iroh-bytes/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn export_blob<D: BaoStore>(
}
trace!("exporting blob {} to {}", hash, outpath.display());
let id = progress.new_id();
let entry = db.get(&hash)?.context("entry not there")?;
let entry = db.get(&hash).await?.context("entry not there")?;
progress
.send(ExportProgress::Found {
id,
Expand Down
4 changes: 2 additions & 2 deletions iroh-bytes/src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ impl Collection {
where
D: crate::store::Map,
{
let links_entry = db.get(root)?.context("links not found")?;
let links_entry = db.get(root).await?.context("links not found")?;
anyhow::ensure!(links_entry.is_complete(), "links not complete");
let links_bytes = links_entry.data_reader().await?.read_to_end().await?;
let mut links = HashSeq::try_from(links_bytes)?;
let meta_hash = links.pop_front().context("meta hash not found")?;
let meta_entry = db.get(&meta_hash)?.context("meta not found")?;
let meta_entry = db.get(&meta_hash).await?.context("meta not found")?;
anyhow::ensure!(links_entry.is_complete(), "links not complete");
let meta_bytes = meta_entry.data_reader().await?.read_to_end().await?;
let meta: CollectionMeta = postcard::from_bytes(&meta_bytes)?;
Expand Down
11 changes: 6 additions & 5 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn get_blob<
hash: &Hash,
progress: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> Result<Stats, GetError> {
let end = match db.get_possibly_partial(hash)? {
let end = match db.get_possibly_partial(hash).await? {
PossiblyPartialEntry::Complete(entry) => {
tracing::info!("already got entire blob");
progress
Expand Down Expand Up @@ -174,7 +174,7 @@ async fn get_blob_inner<D: BaoStore>(
let hash = at_content.hash();
let child_offset = at_content.offset();
// get or create the partial entry
let entry = db.get_or_create_partial(hash, size)?;
let entry = db.get_or_create(hash, size).await?;
// open the data file in any case
let bw = entry.batch_writer().await?;
// allocate a new id for progress reports for this transfer
Expand Down Expand Up @@ -270,7 +270,7 @@ async fn get_blob_inner_partial<D: BaoStore>(
///
/// This will compute the valid ranges for partial blobs, so it is somewhat expensive for those.
pub async fn blob_info<D: BaoStore>(db: &D, hash: &Hash) -> io::Result<BlobInfo<D>> {
io::Result::Ok(match db.get_possibly_partial(hash)? {
io::Result::Ok(match db.get_possibly_partial(hash).await? {
PossiblyPartialEntry::Partial(entry) => {
let valid_ranges = valid_ranges::<D>(&entry)
.await
Expand Down Expand Up @@ -309,7 +309,7 @@ async fn get_hash_seq<
) -> Result<Stats, GetError> {
use tracing::info as log;
let finishing =
if let PossiblyPartialEntry::Complete(entry) = db.get_possibly_partial(root_hash)? {
if let PossiblyPartialEntry::Complete(entry) = db.get_possibly_partial(root_hash).await? {
log!("already got collection - doing partial download");
// send info that we have the hashseq itself entirely
sender
Expand Down Expand Up @@ -421,7 +421,8 @@ async fn get_hash_seq<
let end_root = get_blob_inner(db, header, sender.clone()).await?;
// read the collection fully for now
let entry = db
.get(root_hash)?
.get(root_hash)
.await?
.ok_or_else(|| GetError::LocalFailure(anyhow!("just downloaded but not in db")))?;
let reader = entry.data_reader().await?;
let (mut collection, count) = parse_hash_seq(reader).await.map_err(|err| {
Expand Down
4 changes: 2 additions & 2 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ pub async fn handle_get<D: Map, E: EventSender>(
.await;

// 4. Attempt to find hash
match db.get(&hash)? {
match db.get(&hash).await? {
// Collection or blob request
Some(entry) => {
let mut stats = Box::<TransferStats>::default();
Expand Down Expand Up @@ -492,7 +492,7 @@ pub async fn send_blob<D: Map, W: AsyncStreamWriter>(
ranges: &RangeSpec,
writer: W,
) -> Result<(SentStatus, u64, SliceReaderStats)> {
match db.get(&name)? {
match db.get(&name).await? {
Some(entry) => {
let outboard = entry.outboard().await?;
let size = outboard.tree().size().0;
Expand Down
23 changes: 14 additions & 9 deletions iroh-bytes/src/store/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl MapEntryMut for EntryMut {
impl MapMut for Store {
type EntryMut = EntryMut;

fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
let state = self.0.state.read().unwrap();
Ok(if state.complete.contains_key(hash) {
EntryStatus::Complete
Expand All @@ -328,7 +328,11 @@ impl MapMut for Store {
})
}

fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
self.entry_status_sync(hash)
}

async fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
let state = self.0.state.read().unwrap();
Ok(if let Some(entry) = state.partial.get(hash) {
PossiblyPartialEntry::Partial(EntryMut {
Expand All @@ -347,7 +351,7 @@ impl MapMut for Store {
})
}

fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<Self::EntryMut> {
async fn get_or_create(&self, hash: Hash, size: u64) -> io::Result<Self::EntryMut> {
let mut state = self.0.state.write().unwrap();
// this protects the entry from being deleted until the next mark phase
//
Expand Down Expand Up @@ -573,7 +577,7 @@ pub struct EntryMut {

impl Map for Store {
type Entry = Entry;
fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
let state = self.0.state.read().unwrap();
Ok(if let Some(entry) = state.complete.get(hash) {
state.get_entry(hash, entry, &self.0.options)
Expand Down Expand Up @@ -602,7 +606,7 @@ impl Map for Store {
}

impl ReadableStore for Store {
fn blobs(
async fn blobs(
&self,
) -> io::Result<Box<dyn Iterator<Item = io::Result<Hash>> + Send + Sync + 'static>> {
let inner = self.0.state.read().unwrap();
Expand All @@ -621,7 +625,7 @@ impl ReadableStore for Store {
Box::new(items)
}

fn tags(
async fn tags(
&self,
) -> io::Result<
Box<dyn Iterator<Item = io::Result<(Tag, HashAndFormat)>> + Send + Sync + 'static>,
Expand All @@ -639,7 +643,7 @@ impl ReadableStore for Store {
unimplemented!()
}

fn partial_blobs(
async fn partial_blobs(
&self,
) -> io::Result<Box<dyn Iterator<Item = io::Result<Hash>> + Send + Sync + 'static>> {
let lock = self.0.state.read().unwrap();
Expand Down Expand Up @@ -737,14 +741,15 @@ impl super::Store for Store {
TempTag::new(tag, Some(self.0.clone()))
}

fn clear_live(&self) {
async fn clear_live(&self) {
let mut state = self.0.state.write().unwrap();
state.live.clear();
}

fn add_live(&self, elements: impl IntoIterator<Item = Hash>) {
fn add_live(&self, elements: impl IntoIterator<Item = Hash>) -> impl Future<Output = ()> {
let mut state = self.0.state.write().unwrap();
state.live.extend(elements);
futures::future::ready(())
}

fn is_live(&self, hash: &Hash) -> bool {
Expand Down
24 changes: 15 additions & 9 deletions iroh-bytes/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use bao_tree::ChunkRanges;
use bytes::Bytes;
use bytes::BytesMut;
use derive_more::From;
use futures::Future;
use futures::FutureExt;
use futures::{Stream, StreamExt};
use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
Expand Down Expand Up @@ -271,7 +272,7 @@ impl MapEntry for EntryMut {
impl Map for Store {
type Entry = Entry;

fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
let state = self.0.state.read().unwrap();
// look up the ids
Ok(if let Some((data, outboard)) = state.complete.get(hash) {
Expand Down Expand Up @@ -303,7 +304,7 @@ impl Map for Store {
}

impl ReadableStore for Store {
fn blobs(&self) -> io::Result<DbIter<Hash>> {
async fn blobs(&self) -> io::Result<DbIter<Hash>> {
Ok(Box::new(
self.0
.state
Expand All @@ -318,7 +319,7 @@ impl ReadableStore for Store {
))
}

fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
let tags = self
.0
.state
Expand All @@ -340,7 +341,7 @@ impl ReadableStore for Store {
Ok(())
}

fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
let state = self.0.state.read().unwrap();
let hashes = state.partial.keys().copied().map(Ok).collect::<Vec<_>>();
Ok(Box::new(hashes.into_iter()))
Expand All @@ -363,7 +364,7 @@ impl ReadableStore for Store {
impl MapMut for Store {
type EntryMut = EntryMut;

fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
let state = self.0.state.read().unwrap();
Ok(if state.complete.contains_key(hash) {
EntryStatus::Complete
Expand All @@ -374,7 +375,11 @@ impl MapMut for Store {
})
}

fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
self.entry_status_sync(hash)
}

async fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
let state = self.0.state.read().unwrap();
Ok(match state.partial.get(hash) {
Some((data, outboard)) => PossiblyPartialEntry::Partial(EntryMut {
Expand All @@ -386,7 +391,7 @@ impl MapMut for Store {
})
}

fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<EntryMut> {
async fn get_or_create(&self, hash: Hash, size: u64) -> io::Result<EntryMut> {
let tree = BaoTree::new(ByteNum(size), IROH_BLOCK_SIZE);
let outboard_size =
usize::try_from(outboard_size(size, IROH_BLOCK_SIZE)).map_err(data_too_large)?;
Expand Down Expand Up @@ -517,14 +522,15 @@ impl super::Store for Store {
TempTag::new(tag, Some(self.0.clone()))
}

fn clear_live(&self) {
async fn clear_live(&self) {
let mut state = self.0.state.write().unwrap();
state.live.clear();
}

fn add_live(&self, live: impl IntoIterator<Item = Hash>) {
fn add_live(&self, live: impl IntoIterator<Item = Hash>) -> impl Future<Output = ()> {
let mut state = self.0.state.write().unwrap();
state.live.extend(live);
futures::future::ready(())
}

fn is_live(&self, hash: &Hash) -> bool {
Expand Down
26 changes: 16 additions & 10 deletions iroh-bytes/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bao_tree::{
ChunkRanges,
};
use bytes::Bytes;
use futures::Stream;
use futures::{Future, Stream};
use iroh_io::AsyncSliceReader;
use tokio::{io::AsyncWriteExt, sync::mpsc};

Expand Down Expand Up @@ -195,7 +195,7 @@ impl MapEntry for Entry {
impl Map for Store {
type Entry = Entry;

fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
Ok(self.0.get(hash).map(|(o, d)| Entry {
outboard: o.clone(),
data: d.clone(),
Expand All @@ -206,21 +206,25 @@ impl Map for Store {
impl MapMut for Store {
type EntryMut = EntryMut;

fn get_or_create_partial(&self, _hash: Hash, _size: u64) -> io::Result<EntryMut> {
async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result<EntryMut> {
Err(io::Error::new(
io::ErrorKind::Other,
"cannot create temp entry in readonly database",
))
}

fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
Ok(match self.0.contains_key(hash) {
true => EntryStatus::Complete,
false => EntryStatus::NotFound,
})
}

fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
self.entry_status_sync(hash)
}

async fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
// return none because we do not have partial entries
Ok(if let Some((o, d)) = self.0.get(hash) {
PossiblyPartialEntry::Complete(Entry {
Expand All @@ -239,7 +243,7 @@ impl MapMut for Store {
}

impl ReadableStore for Store {
fn blobs(&self) -> io::Result<DbIter<Hash>> {
async fn blobs(&self) -> io::Result<DbIter<Hash>> {
Ok(Box::new(
self.0
.keys()
Expand All @@ -250,7 +254,7 @@ impl ReadableStore for Store {
))
}

fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
Ok(Box::new(std::iter::empty()))
}

Expand All @@ -272,7 +276,7 @@ impl ReadableStore for Store {
self.export_impl(hash, target, mode, progress).await
}

fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
Ok(Box::new(std::iter::empty()))
}
}
Expand Down Expand Up @@ -361,7 +365,7 @@ impl super::Store for Store {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}

fn clear_live(&self) {}
async fn clear_live(&self) {}

async fn set_tag(&self, _name: Tag, _hash: Option<HashAndFormat>) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
Expand All @@ -375,7 +379,9 @@ impl super::Store for Store {
TempTag::new(inner, None)
}

fn add_live(&self, _live: impl IntoIterator<Item = Hash>) {}
fn add_live(&self, _live: impl IntoIterator<Item = Hash>) -> impl Future<Output = ()> {
futures::future::ready(())
}

async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
Expand Down
Loading

0 comments on commit 9369ff4

Please sign in to comment.