Skip to content

Commit

Permalink
feat: additional public get utils
Browse files Browse the repository at this point in the history
This adds some more higher level utilities for getting data via
iroh-bytes.

get::db adds utilities to get into a store, including considering data
that is already present locally
get::request adds utilities to do non-trivial iroh-bytes requests such
as asking for the verified and unverified size.

## Notes & open questions

Note: some of this is not yet completely finished, but I think it is
good to have it nonetheless. It is used in sendme and in the content
tracker example as well as the iroh gateway.
  • Loading branch information
rklaehn authored Dec 20, 2023
1 parent 4b18e67 commit 1389857
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 150 deletions.
10 changes: 8 additions & 2 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The client side API
//!
//! To get data, create a connection using the `dial` function or use any quinn
//! To get data, create a connection using [iroh-net] or use any quinn
//! connection that was obtained in another way.
//!
//! Create a request describing the data you want to get.
Expand All @@ -10,6 +10,8 @@
//!
//! For some states you have to provide additional arguments when calling next,
//! or you can choose to finish early.
//!
//! [iroh-net]: https://docs.rs/iroh-net
use std::error::Error;
use std::fmt::{self, Debug};
use std::time::{Duration, Instant};
Expand All @@ -25,6 +27,9 @@ use crate::protocol::RangeSpecSeq;
use crate::util::io::{TrackingReader, TrackingWriter};
use crate::IROH_BLOCK_SIZE;

pub mod db;
pub mod request;

/// Stats about the transfer.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Stats {
Expand All @@ -44,8 +49,9 @@ impl Stats {
}
}

/// Finite state machine for get responses
/// Finite state machine for get responses.
///
/// This is the low level API for getting data from a peer.
#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
pub mod fsm {
use std::{io, result};
Expand Down
237 changes: 183 additions & 54 deletions iroh/src/get.rs → iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
//! Functions to get blobs from peers
//! Functions that use the iroh-bytes protocol in conjunction with a bao store.
use std::path::PathBuf;
use std::time::Duration;

use bytes::Bytes;
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use iroh_base::{hash::Hash, rpc::RpcError};
use serde::{Deserialize, Serialize};

use crate::protocol::RangeSpec;
use std::io;

use anyhow::Context;
use bao_tree::io::fsm::OutboardMut;
use bao_tree::{ByteNum, ChunkRanges};
use iroh_bytes::hashseq::parse_hash_seq;
use iroh_bytes::protocol::RangeSpec;
use iroh_bytes::store::PossiblyPartialEntry;
use iroh_bytes::{
use crate::hashseq::parse_hash_seq;
use crate::store::PossiblyPartialEntry;
use crate::{
get::{
self,
fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext},
Stats,
},
protocol::{GetRequest, RangeSpecSeq},
provider::DownloadProgress,
store::{MapEntry, PartialMap, PartialMapEntry, Store as BaoStore},
util::progress::{IdGenerator, ProgressSender},
BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE,
BlobFormat, HashAndFormat, IROH_BLOCK_SIZE,
};
use iroh_io::AsyncSliceReader;
use anyhow::Context;
use bao_tree::io::fsm::OutboardMut;
use bao_tree::{ByteNum, ChunkRanges};
use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
use tracing::trace;

use crate::util::progress::ProgressSliceWriter2;

/// Get a blob or collection
pub async fn get<D: BaoStore>(
/// Get a blob or collection into a store.
///
/// This considers data that is already in the store, and will only request
/// the remaining data.
pub async fn get_to_db<D: BaoStore>(
db: &D,
conn: quinn::Connection,
hash_and_format: &HashAndFormat,
Expand Down Expand Up @@ -64,7 +71,7 @@ async fn get_blob<D: BaoStore>(
}
PossiblyPartialEntry::Partial(entry) => {
trace!("got partial data for {}", hash);
let valid_ranges = get_valid_ranges::<D>(&entry)
let valid_ranges = valid_ranges::<D>(&entry)
.await
.ok()
.unwrap_or_else(ChunkRanges::all);
Expand Down Expand Up @@ -118,9 +125,8 @@ async fn get_blob<D: BaoStore>(
anyhow::Ok(stats)
}

pub(crate) async fn get_valid_ranges<D: PartialMap>(
entry: &D::PartialEntry,
) -> anyhow::Result<ChunkRanges> {
/// Given a partial entry, get the valid ranges.
pub async fn valid_ranges<D: PartialMap>(entry: &D::PartialEntry) -> anyhow::Result<ChunkRanges> {
use tracing::trace as log;
// compute the valid range from just looking at the data file
let mut data_reader = entry.data_reader().await?;
Expand All @@ -144,7 +150,6 @@ async fn get_blob_inner<D: BaoStore>(
at_header: AtBlobHeader,
sender: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> anyhow::Result<AtEndBlob> {
use iroh_io::AsyncSliceWriter;
// read the size
let (at_content, size) = at_header.next().await?;
let hash = at_content.hash();
Expand Down Expand Up @@ -213,7 +218,6 @@ async fn get_blob_inner_partial<D: BaoStore>(
) -> anyhow::Result<AtEndBlob> {
// TODO: the data we get is validated at this point, but we need to check
// that it actually contains the requested ranges. Or DO WE?
use iroh_io::AsyncSliceWriter;

// read the size
let (at_content, size) = at_header.next().await?;
Expand Down Expand Up @@ -267,35 +271,32 @@ async fn get_blob_inner_partial<D: BaoStore>(
Ok(at_end)
}

/// Given a sequence of hashes, figure out what is missing
pub(crate) async fn get_missing_ranges_hash_seq<D: BaoStore>(
db: &D,
hash_seq: &[Hash],
) -> io::Result<Vec<BlobInfo<D>>> {
let items = hash_seq.iter().map(|hash| async move {
io::Result::Ok(match db.get_possibly_partial(hash) {
PossiblyPartialEntry::Partial(entry) => {
// first look for partial
trace!("got partial data for {}", hash);
let valid_ranges = get_valid_ranges::<D>(&entry)
.await
.ok()
.unwrap_or_else(ChunkRanges::all);
BlobInfo::Partial {
entry,
valid_ranges,
}
/// Get information about a blob in a store.
///
/// This will compute the valid ranges for partial blobs, so it is somewhat expensive for those.
pub async fn blob_info<D: BaoStore>(db: &D, hash: &Hash) -> io::Result<BlobInfo<D>> {
io::Result::Ok(match db.get_possibly_partial(hash) {
PossiblyPartialEntry::Partial(entry) => {
let valid_ranges = valid_ranges::<D>(&entry)
.await
.ok()
.unwrap_or_else(ChunkRanges::all);
BlobInfo::Partial {
entry,
valid_ranges,
}
PossiblyPartialEntry::Complete(entry) => BlobInfo::Complete { size: entry.size() },
PossiblyPartialEntry::NotFound => BlobInfo::Missing,
})
});
let mut res = Vec::with_capacity(hash_seq.len());
// todo: parallelize maybe?
for item in items {
res.push(item.await?);
}
Ok(res)
}
PossiblyPartialEntry::Complete(entry) => BlobInfo::Complete { size: entry.size() },
PossiblyPartialEntry::NotFound => BlobInfo::Missing,
})
}

/// Like `get_blob_info`, but for multiple hashes
async fn blob_infos<D: BaoStore>(db: &D, hash_seq: &[Hash]) -> io::Result<Vec<BlobInfo<D>>> {
let items = futures::stream::iter(hash_seq)
.then(|hash| blob_info(db, hash))
.collect::<Vec<_>>();
items.await.into_iter().collect()
}

/// Get a sequence of hashes
Expand Down Expand Up @@ -331,7 +332,7 @@ async fn get_hash_seq<D: BaoStore>(
while let Some(hash) = hash_seq.next().await? {
children.push(hash);
}
let missing_info = get_missing_ranges_hash_seq(db, &children).await?;
let missing_info = blob_infos(db, &children).await?;
// send the info about what we have
for (i, info) in missing_info.iter().enumerate() {
if let Some(size) = info.size() {
Expand Down Expand Up @@ -446,18 +447,22 @@ async fn get_hash_seq<D: BaoStore>(
anyhow::Ok(stats)
}

/// Information about a the status of a blob in a store.
#[derive(Debug, Clone)]
pub(crate) enum BlobInfo<D: BaoStore> {
// we have the blob completely
pub enum BlobInfo<D: BaoStore> {
/// we have the blob completely
Complete {
/// The size of the entry in bytes.
size: u64,
},
// we have the blob partially
/// we have the blob partially
Partial {
/// The partial entry.
entry: D::PartialEntry,
/// The ranges that are available locally.
valid_ranges: ChunkRanges,
},
// we don't have the blob at all
/// we don't have the blob at all
Missing,
}

Expand Down Expand Up @@ -495,3 +500,127 @@ impl<D: BaoStore> BlobInfo<D> {
}
}
}

/// Progress updates for the get operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum DownloadProgress {
/// Data was found locally.
FoundLocal {
/// child offset
child: u64,
/// The hash of the entry.
hash: Hash,
/// The size of the entry in bytes.
size: u64,
/// The ranges that are available locally.
valid_ranges: RangeSpec,
},
/// A new connection was established.
Connected,
/// An item was found with hash `hash`, from now on referred to via `id`.
Found {
/// A new unique id for this entry.
id: u64,
/// child offset
child: u64,
/// The hash of the entry.
hash: Hash,
/// The size of the entry in bytes.
size: u64,
},
/// An item was found with hash `hash`, from now on referred to via `id`.
FoundHashSeq {
/// The name of the entry.
hash: Hash,
/// Number of children in the collection, if known.
children: u64,
},
/// We got progress ingesting item `id`.
Progress {
/// The unique id of the entry.
id: u64,
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done with `id`, and the hash is `hash`.
Done {
/// The unique id of the entry.
id: u64,
},
/// We are done with the network part - all data is local.
NetworkDone {
/// The number of bytes written.
bytes_written: u64,
/// The number of bytes read.
bytes_read: u64,
/// The time it took to transfer the data.
elapsed: Duration,
},
/// The download part is done for this id, we are now exporting the data
/// to the specified out path.
Export {
/// Unique id of the entry.
id: u64,
/// The hash of the entry.
hash: Hash,
/// The size of the entry in bytes.
size: u64,
/// The path to the file where the data is exported.
target: PathBuf,
},
/// We have made progress exporting the data.
///
/// This is only sent for large blobs.
ExportProgress {
/// Unique id of the entry that is being exported.
id: u64,
/// The offset of the progress, in bytes.
offset: u64,
},
/// We got an error and need to abort.
Abort(RpcError),
/// We are done with the whole operation.
AllDone,
}

/// A slice writer that adds a synchronous progress callback
#[derive(Debug)]
struct ProgressSliceWriter2<W, F>(W, F);

impl<W: AsyncSliceWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
ProgressSliceWriter2<W, F>
{
/// Create a new `ProgressSliceWriter` from an inner writer and a progress callback
pub fn new(inner: W, on_write: F) -> Self {
Self(inner, on_write)
}
}

impl<W: AsyncSliceWriter + 'static, F: Fn(u64, usize) -> io::Result<()> + 'static> AsyncSliceWriter
for ProgressSliceWriter2<W, F>
{
type WriteBytesAtFuture<'a> = LocalBoxFuture<'a, io::Result<()>>;
fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> {
// todo: get rid of the boxing
async move {
(self.1)(offset, data.len())?;
self.0.write_bytes_at(offset, data).await
}
.boxed_local()
}

type WriteAtFuture<'a> = W::WriteAtFuture<'a>;
fn write_at<'a>(&'a mut self, offset: u64, bytes: &'a [u8]) -> Self::WriteAtFuture<'a> {
self.0.write_at(offset, bytes)
}

type SyncFuture<'a> = W::SyncFuture<'a>;
fn sync(&mut self) -> Self::SyncFuture<'_> {
self.0.sync()
}

type SetLenFuture<'a> = W::SetLenFuture<'a>;
fn set_len(&mut self, size: u64) -> Self::SetLenFuture<'_> {
self.0.set_len(size)
}
}
Loading

0 comments on commit 1389857

Please sign in to comment.