diff --git a/iroh/src/get.rs b/iroh/src/get.rs new file mode 100644 index 0000000000..f5c064c145 --- /dev/null +++ b/iroh/src/get.rs @@ -0,0 +1,431 @@ +//! Functions to get blobs from peers + +use std::io; + +use anyhow::Context; +use bao_tree::io::fsm::OutboardMut; +use bao_tree::{ByteNum, ChunkNum}; +use iroh_bytes::baomap::range_collections::{range_set::RangeSetRange, RangeSet2}; +use iroh_bytes::{ + baomap::{MapEntry, PartialMap, PartialMapEntry, Store as BaoStore}, + collection::CollectionParser, + get::{ + self, + fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext}, + Stats, + }, + protocol::{GetRequest, RangeSpecSeq}, + provider::ShareProgress, + util::{ + progress::{IdGenerator, ProgressSender}, + Hash, + }, + IROH_BLOCK_SIZE, +}; +use iroh_io::AsyncSliceReader; +use tracing::trace; + +use crate::util::progress::ProgressSliceWriter2; + +/// Get a blob or collection +pub async fn get( + db: &D, + collection_parser: &C, + conn: quinn::Connection, + hash: Hash, + recursive: bool, + sender: impl ProgressSender + IdGenerator, +) -> anyhow::Result { + let res = if recursive { + get_collection(db, collection_parser, conn, &hash, sender).await + } else { + get_blob(db, conn, &hash, sender).await + }; + if let Err(e) = res.as_ref() { + tracing::error!("get failed: {}", e); + } + res +} + +/// Get a blob that was requested completely. +/// +/// We need to create our own files and handle the case where an outboard +/// is not needed. +pub async fn get_blob( + db: &D, + conn: quinn::Connection, + hash: &Hash, + progress: impl ProgressSender + IdGenerator, +) -> anyhow::Result { + let end = if let Some(entry) = db.get_partial(hash) { + trace!("got partial data for {}", hash,); + + let required_ranges = get_missing_ranges_blob::(&entry) + .await + .ok() + .unwrap_or_else(RangeSet2::all); + let request = GetRequest::new(*hash, RangeSpecSeq::new([required_ranges])); + // full request + let request = get::fsm::start(conn, iroh_bytes::protocol::Request::Get(request)); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + anyhow::bail!("expected StartRoot"); + }; + // move to the header + let header = start.next(); + // do the ceremony of getting the blob and adding it to the database + + get_blob_inner_partial(db, header, entry, progress).await? + } else { + // full request + let request = get::fsm::start( + conn, + iroh_bytes::protocol::Request::Get(GetRequest::single(*hash)), + ); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + anyhow::bail!("expected StartRoot"); + }; + // move to the header + let header = start.next(); + // do the ceremony of getting the blob and adding it to the database + get_blob_inner(db, header, progress).await? + }; + + // we have requested a single hash, so we must be at closing + let EndBlobNext::Closing(end) = end.next() else { + anyhow::bail!("expected Closing"); + }; + // this closes the bidi stream. Do something with the stats? + let stats = end.next().await?; + anyhow::Ok(stats) +} + +async fn get_missing_ranges_blob( + entry: &D::PartialEntry, +) -> anyhow::Result> { + use tracing::trace as log; + // compute the valid range from just looking at the data file + let mut data_reader = entry.data_reader().await?; + let data_size = data_reader.len().await?; + let valid_from_data = RangeSet2::from(..ByteNum(data_size).full_chunks()); + // compute the valid range from just looking at the outboard file + let mut outboard = entry.outboard().await?; + let valid_from_outboard = bao_tree::io::fsm::valid_ranges(&mut outboard).await?; + let valid: RangeSet2 = valid_from_data.intersection(&valid_from_outboard); + let total_valid: u64 = valid + .iter() + .map(|x| match x { + RangeSetRange::Range(x) => x.end.to_bytes().0 - x.start.to_bytes().0, + RangeSetRange::RangeFrom(_) => 0, + }) + .sum(); + log!("valid_from_data: {:?}", valid_from_data); + log!("valid_from_outboard: {:?}", valid_from_data); + log!("total_valid: {}", total_valid); + let invalid = RangeSet2::all().difference(&valid); + Ok(invalid) +} + +/// Get a blob that was requested completely. +/// +/// We need to create our own files and handle the case where an outboard +/// is not needed. +async fn get_blob_inner( + db: &D, + header: AtBlobHeader, + sender: impl ProgressSender + IdGenerator, +) -> anyhow::Result { + use iroh_io::AsyncSliceWriter; + + let hash = header.hash(); + // read the size + let (content, size) = header.next().await?; + // create the temp file pair + let entry = db.get_or_create_partial(hash, size)?; + // open the data file in any case + let df = entry.data_writer().await?; + let mut of: Option = if needs_outboard(size) { + Some(entry.outboard_mut().await?) + } else { + None + }; + // allocate a new id for progress reports for this transfer + let id = sender.new_id(); + sender.send(ShareProgress::Found { id, hash, size }).await?; + let sender2 = sender.clone(); + let on_write = move |offset: u64, _length: usize| { + // if try send fails it means that the receiver has been dropped. + // in that case we want to abort the write_all_with_outboard. + sender2 + .try_send(ShareProgress::Progress { id, offset }) + .map_err(|e| { + tracing::info!("aborting download of {}", hash); + e + })?; + Ok(()) + }; + let mut pw = ProgressSliceWriter2::new(df, on_write); + // use the convenience method to write all to the two vfs objects + let end = content + .write_all_with_outboard(of.as_mut(), &mut pw) + .await?; + // sync the data file + pw.sync().await?; + // sync the outboard file, if we wrote one + if let Some(mut of) = of { + of.sync().await?; + } + db.insert_complete(entry).await?; + // notify that we are done + sender.send(ShareProgress::Done { id }).await?; + Ok(end) +} + +fn needs_outboard(size: u64) -> bool { + size > (IROH_BLOCK_SIZE.bytes() as u64) +} + +/// Get a blob that was requested partially. +/// +/// We get passed the data and outboard ids. Partial downloads are only done +/// for large blobs where the outboard is present. +async fn get_blob_inner_partial( + db: &D, + header: AtBlobHeader, + entry: D::PartialEntry, + sender: impl ProgressSender + IdGenerator, +) -> anyhow::Result { + // TODO: the data we get is validated at this point, but we need to check + // that it actually contains the requested ranges. Or DO WE? + use iroh_io::AsyncSliceWriter; + + let hash = header.hash(); + // read the size + let (content, size) = header.next().await?; + // open the data file in any case + let df = entry.data_writer().await?; + let mut of = if needs_outboard(size) { + Some(entry.outboard_mut().await?) + } else { + None + }; + // allocate a new id for progress reports for this transfer + let id = sender.new_id(); + sender.send(ShareProgress::Found { id, hash, size }).await?; + let sender2 = sender.clone(); + let on_write = move |offset: u64, _length: usize| { + // if try send fails it means that the receiver has been dropped. + // in that case we want to abort the write_all_with_outboard. + sender2 + .try_send(ShareProgress::Progress { id, offset }) + .map_err(|e| { + tracing::info!("aborting download of {}", hash); + e + })?; + Ok(()) + }; + let mut pw = ProgressSliceWriter2::new(df, on_write); + // use the convenience method to write all to the two vfs objects + let end = content + .write_all_with_outboard(of.as_mut(), &mut pw) + .await?; + // sync the data file + pw.sync().await?; + // sync the outboard file + if let Some(mut of) = of { + of.sync().await?; + } + // actually store the data. it is up to the db to decide if it wants to + // rename the files or not. + db.insert_complete(entry).await?; + // notify that we are done + sender.send(ShareProgress::Done { id }).await?; + Ok(end) +} + +/// Given a collection of hashes, figure out what is missing +async fn get_missing_ranges_collection( + db: &D, + collection: &Vec, +) -> io::Result>> { + let items = collection.iter().map(|hash| async move { + io::Result::Ok(if let Some(entry) = db.get_partial(hash) { + // first look for partial + trace!("got partial data for {}", hash,); + let missing_chunks = get_missing_ranges_blob::(&entry) + .await + .ok() + .unwrap_or_else(RangeSet2::all); + BlobInfo::Partial { + entry, + missing_chunks, + } + } else if db.get(hash).is_some() { + // then look for complete + BlobInfo::Complete + } else { + BlobInfo::Missing + }) + }); + let mut res = Vec::with_capacity(collection.len()); + // todo: parallelize maybe? + for item in items { + res.push(item.await?); + } + Ok(res) +} + +/// Get a collection +pub async fn get_collection( + db: &D, + collection_parser: &C, + conn: quinn::Connection, + root_hash: &Hash, + sender: impl ProgressSender + IdGenerator, +) -> anyhow::Result { + use tracing::info as log; + let finishing = if let Some(entry) = db.get(root_hash) { + log!("already got collection - doing partial download"); + // got the collection + let reader = entry.data_reader().await?; + let (mut collection, stats) = collection_parser.parse(0, reader).await?; + sender + .send(ShareProgress::FoundCollection { + hash: *root_hash, + num_blobs: stats.num_blobs, + total_blobs_size: stats.total_blob_size, + }) + .await?; + let mut children: Vec = vec![]; + while let Some(hash) = collection.next().await? { + children.push(hash); + } + let missing_info = get_missing_ranges_collection(db, &children).await?; + if missing_info.iter().all(|x| matches!(x, BlobInfo::Complete)) { + log!("nothing to do"); + return Ok(Stats::default()); + } + let missing_iter = std::iter::once(RangeSet2::empty()) + .chain(missing_info.iter().map(|x| x.missing_chunks())) + .collect::>(); + log!("requesting chunks {:?}", missing_iter); + let request = GetRequest::new(*root_hash, RangeSpecSeq::new(missing_iter)); + let request = get::fsm::start(conn, request.into()); + // create a new bidi stream + let connected = request.next().await?; + log!("connected"); + // we have not requested the root, so this must be StartChild + let ConnectedNext::StartChild(start) = connected.next().await? else { + anyhow::bail!("expected StartChild"); + }; + let mut next = EndBlobNext::MoreChildren(start); + // read all the children + loop { + let start = match next { + EndBlobNext::MoreChildren(start) => start, + EndBlobNext::Closing(finish) => break finish, + }; + let child_offset = + usize::try_from(start.child_offset()).context("child offset too large")?; + let (child_hash, info) = + match (children.get(child_offset), missing_info.get(child_offset)) { + (Some(blob), Some(info)) => (*blob, info), + _ => break start.finish(), + }; + tracing::info!( + "requesting child {} {:?}", + child_hash, + info.missing_chunks() + ); + let header = start.next(child_hash); + let end_blob = match info { + BlobInfo::Missing => get_blob_inner(db, header, sender.clone()).await?, + BlobInfo::Partial { entry, .. } => { + get_blob_inner_partial(db, header, entry.clone(), sender.clone()).await? + } + BlobInfo::Complete => anyhow::bail!("got data we have not requested"), + }; + next = end_blob.next(); + } + } else { + tracing::info!("don't have collection - doing full download"); + // don't have the collection, so probably got nothing + let request = get::fsm::start( + conn, + iroh_bytes::protocol::Request::Get(GetRequest::all(*root_hash)), + ); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + anyhow::bail!("expected StartRoot"); + }; + // move to the header + let header = start.next(); + // read the blob and add it to the database + let end_root = get_blob_inner(db, header, sender.clone()).await?; + // read the collection fully for now + let entry = db.get(root_hash).context("just downloaded")?; + let reader = entry.data_reader().await?; + let (mut collection, stats) = collection_parser.parse(0, reader).await?; + sender + .send(ShareProgress::FoundCollection { + hash: *root_hash, + num_blobs: stats.num_blobs, + total_blobs_size: stats.total_blob_size, + }) + .await?; + let mut children = vec![]; + while let Some(hash) = collection.next().await? { + children.push(hash); + } + let mut next = end_root.next(); + // read all the children + loop { + let start = match next { + EndBlobNext::MoreChildren(start) => start, + EndBlobNext::Closing(finish) => break finish, + }; + let child_offset = + usize::try_from(start.child_offset()).context("child offset too large")?; + let child_hash = match children.get(child_offset) { + Some(blob) => *blob, + None => break start.finish(), + }; + let header = start.next(child_hash); + let end_blob = get_blob_inner(db, header, sender.clone()).await?; + next = end_blob.next(); + } + }; + // this closes the bidi stream. Do something with the stats? + let stats = finishing.next().await?; + anyhow::Ok(stats) +} + +#[derive(Debug, Clone)] +enum BlobInfo { + // we have the blob completely + Complete, + // we have the blob partially + Partial { + entry: D::PartialEntry, + missing_chunks: RangeSet2, + }, + // we don't have the blob at all + Missing, +} + +impl BlobInfo { + fn missing_chunks(&self) -> RangeSet2 { + match self { + BlobInfo::Complete => RangeSet2::empty(), + BlobInfo::Partial { missing_chunks, .. } => missing_chunks.clone(), + BlobInfo::Missing => RangeSet2::all(), + } + } +} diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index e3d6c6e491..b8a363921c 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -8,6 +8,7 @@ pub mod baomap; #[cfg(feature = "iroh-collection")] pub mod collection; pub mod dial; +pub mod get; pub mod node; pub mod rpc_protocol; pub mod util; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index c4c7628fe1..c99d638ec6 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -23,31 +23,22 @@ use crate::rpc_protocol::{ ProviderService, ShareRequest, ShutdownRequest, ValidateRequest, VersionRequest, VersionResponse, WatchRequest, WatchResponse, }; -use crate::util::progress::ProgressSliceWriter2; use anyhow::{Context, Result}; -use bao_tree::io::fsm::OutboardMut; -use bao_tree::{ByteNum, ChunkNum}; use bytes::Bytes; use futures::future::{BoxFuture, Shared}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; -use iroh_bytes::baomap::{ - range_collections::{range_set::RangeSetRange, RangeSet2}, - ExportMode, Map, MapEntry, PartialMapEntry, ReadableStore, Store, ValidateProgress, -}; +use iroh_bytes::baomap::{ExportMode, Map, MapEntry, ReadableStore, Store, ValidateProgress}; use iroh_bytes::collection::{CollectionParser, NoCollectionParser}; -use iroh_bytes::get::fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext}; -use iroh_bytes::get::{self, Stats}; -use iroh_bytes::protocol::{GetRequest, RangeSpecSeq}; +use iroh_bytes::get::Stats; +use iroh_bytes::protocol::GetRequest; use iroh_bytes::provider::ShareProgress; use iroh_bytes::util::progress::{FlumeProgressSender, IdGenerator, ProgressSender}; -use iroh_bytes::IROH_BLOCK_SIZE; use iroh_bytes::{ protocol::{Closed, Request, RequestToken}, provider::{CustomGetHandler, ProvideProgress, RequestAuthorizationHandler}, util::runtime, util::Hash, }; -use iroh_io::AsyncSliceReader; use iroh_net::{ config::Endpoint, derp::DerpMap, @@ -753,15 +744,15 @@ impl RpcHandler { recursive: bool, sender: impl ProgressSender + IdGenerator, ) -> anyhow::Result { - let res = if recursive { - self.get_collection(conn, &hash, sender).await - } else { - self.get_blob(conn, &hash, sender).await - }; - if let Err(e) = res.as_ref() { - tracing::error!("get failed: {}", e); - } - res + crate::get::get( + &self.inner.db, + &self.collection_parser, + conn, + hash, + recursive, + sender, + ) + .await } async fn export( @@ -828,360 +819,6 @@ impl RpcHandler { anyhow::Ok(()) } - /// Get a blob that was requested completely. - /// - /// We need to create our own files and handle the case where an outboard - /// is not needed. - async fn get_blob_inner( - db: &D, - header: AtBlobHeader, - sender: impl ProgressSender + IdGenerator, - ) -> anyhow::Result { - use iroh_io::AsyncSliceWriter; - - let hash = header.hash(); - // read the size - let (content, size) = header.next().await?; - // create the temp file pair - let entry = db.get_or_create_partial(hash, size)?; - // open the data file in any case - let df = entry.data_writer().await?; - let mut of: Option = if needs_outboard(size) { - Some(entry.outboard_mut().await?) - } else { - None - }; - // allocate a new id for progress reports for this transfer - let id = sender.new_id(); - sender.send(ShareProgress::Found { id, hash, size }).await?; - let sender2 = sender.clone(); - let on_write = move |offset: u64, _length: usize| { - // if try send fails it means that the receiver has been dropped. - // in that case we want to abort the write_all_with_outboard. - sender2 - .try_send(ShareProgress::Progress { id, offset }) - .map_err(|e| { - tracing::info!("aborting download of {}", hash); - e - })?; - Ok(()) - }; - let mut pw = ProgressSliceWriter2::new(df, on_write); - // use the convenience method to write all to the two vfs objects - let end = content - .write_all_with_outboard(of.as_mut(), &mut pw) - .await?; - // sync the data file - pw.sync().await?; - // sync the outboard file, if we wrote one - if let Some(mut of) = of { - of.sync().await?; - } - db.insert_complete(entry).await?; - // notify that we are done - sender.send(ShareProgress::Done { id }).await?; - Ok(end) - } - - /// Get a blob that was requested partially. - /// - /// We get passed the data and outboard ids. Partial downloads are only done - /// for large blobs where the outboard is present. - async fn get_blob_inner_partial( - db: &D, - header: AtBlobHeader, - entry: D::PartialEntry, - sender: impl ProgressSender + IdGenerator, - ) -> anyhow::Result { - // TODO: the data we get is validated at this point, but we need to check - // that it actually contains the requested ranges. Or DO WE? - use iroh_io::AsyncSliceWriter; - - let hash = header.hash(); - // read the size - let (content, size) = header.next().await?; - // open the data file in any case - let df = entry.data_writer().await?; - let mut of = if needs_outboard(size) { - Some(entry.outboard_mut().await?) - } else { - None - }; - // allocate a new id for progress reports for this transfer - let id = sender.new_id(); - sender.send(ShareProgress::Found { id, hash, size }).await?; - let sender2 = sender.clone(); - let on_write = move |offset: u64, _length: usize| { - // if try send fails it means that the receiver has been dropped. - // in that case we want to abort the write_all_with_outboard. - sender2 - .try_send(ShareProgress::Progress { id, offset }) - .map_err(|e| { - tracing::info!("aborting download of {}", hash); - e - })?; - Ok(()) - }; - let mut pw = ProgressSliceWriter2::new(df, on_write); - // use the convenience method to write all to the two vfs objects - let end = content - .write_all_with_outboard(of.as_mut(), &mut pw) - .await?; - // sync the data file - pw.sync().await?; - // sync the outboard file - if let Some(mut of) = of { - of.sync().await?; - } - // actually store the data. it is up to the db to decide if it wants to - // rename the files or not. - db.insert_complete(entry).await?; - // notify that we are done - sender.send(ShareProgress::Done { id }).await?; - Ok(end) - } - - async fn get_missing_ranges_blob( - entry: &D::PartialEntry, - ) -> anyhow::Result> { - use tracing::trace as log; - // compute the valid range from just looking at the data file - let mut data_reader = entry.data_reader().await?; - let data_size = data_reader.len().await?; - let valid_from_data = RangeSet2::from(..ByteNum(data_size).full_chunks()); - // compute the valid range from just looking at the outboard file - let mut outboard = entry.outboard().await?; - let valid_from_outboard = bao_tree::io::fsm::valid_ranges(&mut outboard).await?; - let valid: RangeSet2 = valid_from_data.intersection(&valid_from_outboard); - let total_valid: u64 = valid - .iter() - .map(|x| match x { - RangeSetRange::Range(x) => x.end.to_bytes().0 - x.start.to_bytes().0, - RangeSetRange::RangeFrom(_) => 0, - }) - .sum(); - log!("valid_from_data: {:?}", valid_from_data); - log!("valid_from_outboard: {:?}", valid_from_data); - log!("total_valid: {}", total_valid); - let invalid = RangeSet2::all().difference(&valid); - Ok(invalid) - } - - async fn get_blob( - &self, - conn: quinn::Connection, - hash: &Hash, - progress: impl ProgressSender + IdGenerator, - ) -> anyhow::Result { - let db = &self.inner.db; - let end = if let Some(entry) = db.get_partial(hash) { - trace!("got partial data for {}", hash,); - - let required_ranges = Self::get_missing_ranges_blob(&entry) - .await - .ok() - .unwrap_or_else(RangeSet2::all); - let request = GetRequest::new(*hash, RangeSpecSeq::new([required_ranges])); - // full request - let request = get::fsm::start(conn, iroh_bytes::protocol::Request::Get(request)); - // create a new bidi stream - let connected = request.next().await?; - // next step. we have requested a single hash, so this must be StartRoot - let ConnectedNext::StartRoot(start) = connected.next().await? else { - anyhow::bail!("expected StartRoot"); - }; - // move to the header - let header = start.next(); - // do the ceremony of getting the blob and adding it to the database - - Self::get_blob_inner_partial(db, header, entry, progress).await? - } else { - // full request - let request = get::fsm::start( - conn, - iroh_bytes::protocol::Request::Get(GetRequest::single(*hash)), - ); - // create a new bidi stream - let connected = request.next().await?; - // next step. we have requested a single hash, so this must be StartRoot - let ConnectedNext::StartRoot(start) = connected.next().await? else { - anyhow::bail!("expected StartRoot"); - }; - // move to the header - let header = start.next(); - // do the ceremony of getting the blob and adding it to the database - Self::get_blob_inner(db, header, progress).await? - }; - - // we have requested a single hash, so we must be at closing - let EndBlobNext::Closing(end) = end.next() else { - anyhow::bail!("expected Closing"); - }; - // this closes the bidi stream. Do something with the stats? - let stats = end.next().await?; - anyhow::Ok(stats) - } - - /// Given a collection of hashes, figure out what is missing - async fn get_missing_ranges_collection( - &self, - collection: &Vec, - ) -> io::Result>> { - let db = &self.inner.db; - let items = collection.iter().map(|hash| async move { - io::Result::Ok(if let Some(entry) = db.get_partial(hash) { - // first look for partial - trace!("got partial data for {}", hash,); - let missing_chunks = Self::get_missing_ranges_blob(&entry) - .await - .ok() - .unwrap_or_else(RangeSet2::all); - BlobInfo::Partial { - entry, - missing_chunks, - } - } else if db.get(hash).is_some() { - // then look for complete - BlobInfo::Complete - } else { - BlobInfo::Missing - }) - }); - let mut res = Vec::with_capacity(collection.len()); - // todo: parallelize maybe? - for item in items { - res.push(item.await?); - } - Ok(res) - } - - async fn get_collection( - &self, - conn: quinn::Connection, - root_hash: &Hash, - sender: impl ProgressSender + IdGenerator, - ) -> anyhow::Result { - use tracing::info as log; - let db = &self.inner.db; - let finishing = if let Some(entry) = db.get(root_hash) { - log!("already got collection - doing partial download"); - // got the collection - let reader = entry.data_reader().await?; - let (mut collection, stats) = self.collection_parser.parse(0, reader).await?; - sender - .send(ShareProgress::FoundCollection { - hash: *root_hash, - num_blobs: stats.num_blobs, - total_blobs_size: stats.total_blob_size, - }) - .await?; - let mut children: Vec = vec![]; - while let Some(hash) = collection.next().await? { - children.push(hash); - } - let missing_info = self.get_missing_ranges_collection(&children).await?; - if missing_info.iter().all(|x| matches!(x, BlobInfo::Complete)) { - log!("nothing to do"); - return Ok(Stats::default()); - } - let missing_iter = std::iter::once(RangeSet2::empty()) - .chain(missing_info.iter().map(|x| x.missing_chunks())) - .collect::>(); - log!("requesting chunks {:?}", missing_iter); - let request = GetRequest::new(*root_hash, RangeSpecSeq::new(missing_iter)); - let request = get::fsm::start(conn, request.into()); - // create a new bidi stream - let connected = request.next().await?; - log!("connected"); - // we have not requested the root, so this must be StartChild - let ConnectedNext::StartChild(start) = connected.next().await? else { - anyhow::bail!("expected StartChild"); - }; - let mut next = EndBlobNext::MoreChildren(start); - // read all the children - loop { - let start = match next { - EndBlobNext::MoreChildren(start) => start, - EndBlobNext::Closing(finish) => break finish, - }; - let child_offset = - usize::try_from(start.child_offset()).context("child offset too large")?; - let (child_hash, info) = - match (children.get(child_offset), missing_info.get(child_offset)) { - (Some(blob), Some(info)) => (*blob, info), - _ => break start.finish(), - }; - tracing::info!( - "requesting child {} {:?}", - child_hash, - info.missing_chunks() - ); - let header = start.next(child_hash); - let end_blob = match info { - BlobInfo::Missing => Self::get_blob_inner(db, header, sender.clone()).await?, - BlobInfo::Partial { entry, .. } => { - Self::get_blob_inner_partial(db, header, entry.clone(), sender.clone()) - .await? - } - BlobInfo::Complete => anyhow::bail!("got data we have not requested"), - }; - next = end_blob.next(); - } - } else { - tracing::info!("don't have collection - doing full download"); - // don't have the collection, so probably got nothing - let request = get::fsm::start( - conn, - iroh_bytes::protocol::Request::Get(GetRequest::all(*root_hash)), - ); - // create a new bidi stream - let connected = request.next().await?; - // next step. we have requested a single hash, so this must be StartRoot - let ConnectedNext::StartRoot(start) = connected.next().await? else { - anyhow::bail!("expected StartRoot"); - }; - // move to the header - let header = start.next(); - // read the blob and add it to the database - let end_root = Self::get_blob_inner(db, header, sender.clone()).await?; - // read the collection fully for now - let entry = db.get(root_hash).context("just downloaded")?; - let reader = entry.data_reader().await?; - let (mut collection, stats) = self.collection_parser.parse(0, reader).await?; - sender - .send(ShareProgress::FoundCollection { - hash: *root_hash, - num_blobs: stats.num_blobs, - total_blobs_size: stats.total_blob_size, - }) - .await?; - let mut children = vec![]; - while let Some(hash) = collection.next().await? { - children.push(hash); - } - let mut next = end_root.next(); - // read all the children - loop { - let start = match next { - EndBlobNext::MoreChildren(start) => start, - EndBlobNext::Closing(finish) => break finish, - }; - let child_offset = - usize::try_from(start.child_offset()).context("child offset too large")?; - let child_hash = match children.get(child_offset) { - Some(blob) => *blob, - None => break start.finish(), - }; - let header = start.next(child_hash); - let end_blob = Self::get_blob_inner(db, header, sender.clone()).await?; - next = end_blob.next(); - } - }; - // this closes the bidi stream. Do something with the stats? - let stats = finishing.next().await?; - anyhow::Ok(stats) - } - async fn share0( self, msg: ShareRequest, @@ -1424,29 +1061,6 @@ fn handle_rpc_request, C: Collecti }); } -#[derive(Debug, Clone)] -enum BlobInfo { - // we have the blob completely - Complete, - // we have the blob partially - Partial { - entry: D::PartialEntry, - missing_chunks: RangeSet2, - }, - // we don't have the blob at all - Missing, -} - -impl BlobInfo { - fn missing_chunks(&self) -> RangeSet2 { - match self { - BlobInfo::Complete => RangeSet2::empty(), - BlobInfo::Partial { missing_chunks, .. } => missing_chunks.clone(), - BlobInfo::Missing => RangeSet2::all(), - } - } -} - /// Create a [`quinn::ServerConfig`] with the given keypair and limits. pub fn make_server_config( keypair: &Keypair, @@ -1520,10 +1134,6 @@ impl RequestAuthorizationHandler for StaticTokenAuthHandler { } } -fn needs_outboard(size: u64) -> bool { - size > (IROH_BLOCK_SIZE.bytes() as u64) -} - #[cfg(all(test, feature = "flat-db"))] mod tests { use anyhow::bail;