From 6a630d925eebc00cd1739b8ac7aabc4def831702 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Wed, 18 Oct 2023 14:17:06 +0300 Subject: [PATCH] iroh-bytes: Get api improvements (#1660) ## Description A number of small improvements of the get side of the iroh-bytes API, needed for the tracker and the progress rework ## Notes & open questions I want to get those in separately to the above 2 PRs ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [ ] Tests if relevant. --- iroh-bytes/src/get.rs | 100 +++++++++++++++++--------- iroh-bytes/src/protocol/range_spec.rs | 4 ++ iroh-bytes/src/util.rs | 49 +++++++++++++ iroh/src/commands/get.rs | 2 +- 4 files changed, 120 insertions(+), 35 deletions(-) diff --git a/iroh-bytes/src/get.rs b/iroh-bytes/src/get.rs index 185f852deb..c8c8136c80 100644 --- a/iroh-bytes/src/get.rs +++ b/iroh-bytes/src/get.rs @@ -55,7 +55,6 @@ pub mod fsm { use super::*; use bao_tree::{ - blake3, io::{ fsm::{ OutboardMut, ResponseDecoderReading, ResponseDecoderReadingNext, @@ -98,6 +97,10 @@ pub mod fsm { pub fn new(owner: RangeSpecSeq) -> Self { Self(RangesIterInner::new(owner, |owner| owner.iter_non_empty())) } + + pub fn offset(&self) -> u64 { + self.0.with_dependent(|_owner, iter| iter.offset()) + } } impl Iterator for RangesIter { @@ -346,8 +349,8 @@ pub mod fsm { } /// Hash of the root blob - pub fn hash(&self) -> &Hash { - &self.hash + pub fn hash(&self) -> Hash { + self.hash } /// Go into the next state, reading the header @@ -437,18 +440,8 @@ pub mod fsm { /// Drain the response and throw away the result pub async fn drain(self) -> result::Result { - let (mut content, _size) = self.next().await?; - loop { - match content.next().await { - BlobContentNext::More((content1, Ok(_))) => { - content = content1; - } - BlobContentNext::More((_, Err(e))) => return Err(e), - BlobContentNext::Done(end) => { - return Ok(end); - } - } - } + let (content, _size) = self.next().await?; + content.drain().await } /// Concatenate the entire response into a vec @@ -458,23 +451,8 @@ pub mod fsm { pub async fn concatenate_into_vec( self, ) -> result::Result<(AtEndBlob, Vec), DecodeError> { - let (mut curr, size) = self.next().await?; - let mut res = Vec::with_capacity(size as usize); - let done = loop { - match curr.next().await { - BlobContentNext::More((next, data)) => { - if let BaoContentItem::Leaf(leaf) = data? { - res.extend_from_slice(&leaf.data); - } - curr = next; - } - BlobContentNext::Done(done) => { - // we are done with the root blob - break done; - } - } - }; - Ok((done, res)) + let (content, _size) = self.next().await?; + content.concatenate_into_vec().await } /// Write the entire blob to a slice writer. @@ -514,6 +492,11 @@ pub mod fsm { pub fn ranges(&self) -> &ChunkRanges { self.stream.ranges() } + + /// The current offset of the blob we are reading. + pub fn offset(&self) -> u64 { + self.misc.ranges_iter.offset() + } } /// State while we are reading content @@ -648,8 +631,52 @@ pub mod fsm { } /// The hash of the blob we are reading. - pub fn hash(&self) -> &blake3::Hash { - self.stream.hash() + pub fn hash(&self) -> Hash { + (*self.stream.hash()).into() + } + + /// The current offset of the blob we are reading. + pub fn offset(&self) -> u64 { + self.misc.ranges_iter.offset() + } + + /// Drain the response and throw away the result + pub async fn drain(self) -> result::Result { + let mut content = self; + loop { + match content.next().await { + BlobContentNext::More((content1, res)) => { + let _ = res?; + content = content1; + } + BlobContentNext::Done(end) => { + break Ok(end); + } + } + } + } + + /// Concatenate the entire response into a vec + pub async fn concatenate_into_vec( + self, + ) -> result::Result<(AtEndBlob, Vec), DecodeError> { + let mut res = Vec::with_capacity(1024); + let mut curr = self; + let done = loop { + match curr.next().await { + BlobContentNext::More((next, data)) => { + if let BaoContentItem::Leaf(leaf) = data? { + res.extend_from_slice(&leaf.data); + } + curr = next; + } + BlobContentNext::Done(done) => { + // we are done with the root blob + break done; + } + } + }; + Ok((done, res)) } /// Write the entire blob to a slice writer and to an optional outboard. @@ -711,6 +738,11 @@ pub mod fsm { } } } + + /// Immediately finish the get response without reading further + pub fn finish(self) -> AtClosing { + AtClosing::new(self.misc, self.stream.finish()) + } } /// State after we have read all the content for a blob diff --git a/iroh-bytes/src/protocol/range_spec.rs b/iroh-bytes/src/protocol/range_spec.rs index d4e0ae2979..f96b9f0cec 100644 --- a/iroh-bytes/src/protocol/range_spec.rs +++ b/iroh-bytes/src/protocol/range_spec.rs @@ -322,6 +322,10 @@ impl<'a> NonEmptyRequestRangeSpecIter<'a> { fn new(inner: RequestRangeSpecIter<'a>) -> Self { Self { inner, count: 0 } } + + pub(crate) fn offset(&self) -> u64 { + self.count + } } impl<'a> Iterator for NonEmptyRequestRangeSpecIter<'a> { diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index ebf5b8dfdd..4b501629e8 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -129,6 +129,42 @@ impl HashAndFormat { } } +impl Display for HashAndFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut slice = [0u8; 65]; + hex::encode_to_slice(self.hash.as_bytes(), &mut slice[1..]).unwrap(); + match self.format { + BlobFormat::Raw => { + write!(f, "{}", std::str::from_utf8(&slice[1..]).unwrap()) + } + BlobFormat::HashSeq => { + slice[0] = b's'; + write!(f, "{}", std::str::from_utf8(&slice).unwrap()) + } + } + } +} + +impl FromStr for HashAndFormat { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let s = s.as_bytes(); + let mut hash = [0u8; 32]; + match s.len() { + 64 => { + hex::decode_to_slice(s, &mut hash)?; + Ok(Self::raw(hash.into())) + } + 65 if s[0].to_ascii_lowercase() == b's' => { + hex::decode_to_slice(&s[1..], &mut hash)?; + Ok(Self::hash_seq(hash.into())) + } + _ => anyhow::bail!("invalid hash and format"), + } + } +} + /// Hash type used throughout. #[derive(PartialEq, Eq, Copy, Clone, Hash)] pub struct Hash(blake3::Hash); @@ -561,4 +597,17 @@ mod tests { assert_eq!(ser.len(), 32); } + + #[test] + fn test_hash_and_format_parse() { + let hash = Hash::new("hello"); + + let expected = HashAndFormat::raw(hash); + let actual = expected.to_string().parse::().unwrap(); + assert_eq!(expected, actual); + + let expected = HashAndFormat::hash_seq(hash); + let actual = expected.to_string().parse::().unwrap(); + assert_eq!(expected, actual); + } } diff --git a/iroh/src/commands/get.rs b/iroh/src/commands/get.rs index 12d3919223..6286b68f76 100644 --- a/iroh/src/commands/get.rs +++ b/iroh/src/commands/get.rs @@ -181,7 +181,7 @@ async fn get_to_stdout_multi( curr: get::fsm::AtStartRoot, sender: FlumeProgressSender, ) -> Result { - let hash = *curr.hash(); + let hash = curr.hash(); let (mut next, _links, collection) = Collection::read_fsm(curr).await?; sender .send(GetProgress::FoundCollection {