Skip to content

Commit

Permalink
iroh-bytes: Get api improvements (#1660)
Browse files Browse the repository at this point in the history
## Description

A number of small improvements of the get side of the iroh-bytes API,
needed for the tracker and the progress rework

## Notes & open questions

I want to get those in separately to the above 2 PRs

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
rklaehn authored Oct 18, 2023
1 parent 16773b0 commit 6a630d9
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 35 deletions.
100 changes: 66 additions & 34 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub mod fsm {
use super::*;

use bao_tree::{
blake3,
io::{
fsm::{
OutboardMut, ResponseDecoderReading, ResponseDecoderReadingNext,
Expand Down Expand Up @@ -98,6 +97,10 @@ pub mod fsm {
pub fn new(owner: RangeSpecSeq) -> Self {
Self(RangesIterInner::new(owner, |owner| owner.iter_non_empty()))
}

pub fn offset(&self) -> u64 {
self.0.with_dependent(|_owner, iter| iter.offset())
}
}

impl Iterator for RangesIter {
Expand Down Expand Up @@ -346,8 +349,8 @@ pub mod fsm {
}

/// Hash of the root blob
pub fn hash(&self) -> &Hash {
&self.hash
pub fn hash(&self) -> Hash {
self.hash
}

/// Go into the next state, reading the header
Expand Down Expand Up @@ -437,18 +440,8 @@ pub mod fsm {

/// Drain the response and throw away the result
pub async fn drain(self) -> result::Result<AtEndBlob, DecodeError> {
let (mut content, _size) = self.next().await?;
loop {
match content.next().await {
BlobContentNext::More((content1, Ok(_))) => {
content = content1;
}
BlobContentNext::More((_, Err(e))) => return Err(e),
BlobContentNext::Done(end) => {
return Ok(end);
}
}
}
let (content, _size) = self.next().await?;
content.drain().await
}

/// Concatenate the entire response into a vec
Expand All @@ -458,23 +451,8 @@ pub mod fsm {
pub async fn concatenate_into_vec(
self,
) -> result::Result<(AtEndBlob, Vec<u8>), DecodeError> {
let (mut curr, size) = self.next().await?;
let mut res = Vec::with_capacity(size as usize);
let done = loop {
match curr.next().await {
BlobContentNext::More((next, data)) => {
if let BaoContentItem::Leaf(leaf) = data? {
res.extend_from_slice(&leaf.data);
}
curr = next;
}
BlobContentNext::Done(done) => {
// we are done with the root blob
break done;
}
}
};
Ok((done, res))
let (content, _size) = self.next().await?;
content.concatenate_into_vec().await
}

/// Write the entire blob to a slice writer.
Expand Down Expand Up @@ -514,6 +492,11 @@ pub mod fsm {
pub fn ranges(&self) -> &ChunkRanges {
self.stream.ranges()
}

/// The current offset of the blob we are reading.
pub fn offset(&self) -> u64 {
self.misc.ranges_iter.offset()
}
}

/// State while we are reading content
Expand Down Expand Up @@ -648,8 +631,52 @@ pub mod fsm {
}

/// The hash of the blob we are reading.
pub fn hash(&self) -> &blake3::Hash {
self.stream.hash()
pub fn hash(&self) -> Hash {
(*self.stream.hash()).into()
}

/// The current offset of the blob we are reading.
pub fn offset(&self) -> u64 {
self.misc.ranges_iter.offset()
}

/// Drain the response and throw away the result
pub async fn drain(self) -> result::Result<AtEndBlob, DecodeError> {
let mut content = self;
loop {
match content.next().await {
BlobContentNext::More((content1, res)) => {
let _ = res?;
content = content1;
}
BlobContentNext::Done(end) => {
break Ok(end);
}
}
}
}

/// Concatenate the entire response into a vec
pub async fn concatenate_into_vec(
self,
) -> result::Result<(AtEndBlob, Vec<u8>), DecodeError> {
let mut res = Vec::with_capacity(1024);
let mut curr = self;
let done = loop {
match curr.next().await {
BlobContentNext::More((next, data)) => {
if let BaoContentItem::Leaf(leaf) = data? {
res.extend_from_slice(&leaf.data);
}
curr = next;
}
BlobContentNext::Done(done) => {
// we are done with the root blob
break done;
}
}
};
Ok((done, res))
}

/// Write the entire blob to a slice writer and to an optional outboard.
Expand Down Expand Up @@ -711,6 +738,11 @@ pub mod fsm {
}
}
}

/// Immediately finish the get response without reading further
pub fn finish(self) -> AtClosing {
AtClosing::new(self.misc, self.stream.finish())
}
}

/// State after we have read all the content for a blob
Expand Down
4 changes: 4 additions & 0 deletions iroh-bytes/src/protocol/range_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ impl<'a> NonEmptyRequestRangeSpecIter<'a> {
fn new(inner: RequestRangeSpecIter<'a>) -> Self {
Self { inner, count: 0 }
}

pub(crate) fn offset(&self) -> u64 {
self.count
}
}

impl<'a> Iterator for NonEmptyRequestRangeSpecIter<'a> {
Expand Down
49 changes: 49 additions & 0 deletions iroh-bytes/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,42 @@ impl HashAndFormat {
}
}

impl Display for HashAndFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut slice = [0u8; 65];
hex::encode_to_slice(self.hash.as_bytes(), &mut slice[1..]).unwrap();
match self.format {
BlobFormat::Raw => {
write!(f, "{}", std::str::from_utf8(&slice[1..]).unwrap())
}
BlobFormat::HashSeq => {
slice[0] = b's';
write!(f, "{}", std::str::from_utf8(&slice).unwrap())
}
}
}
}

impl FromStr for HashAndFormat {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.as_bytes();
let mut hash = [0u8; 32];
match s.len() {
64 => {
hex::decode_to_slice(s, &mut hash)?;
Ok(Self::raw(hash.into()))
}
65 if s[0].to_ascii_lowercase() == b's' => {
hex::decode_to_slice(&s[1..], &mut hash)?;
Ok(Self::hash_seq(hash.into()))
}
_ => anyhow::bail!("invalid hash and format"),
}
}
}

/// Hash type used throughout.
#[derive(PartialEq, Eq, Copy, Clone, Hash)]
pub struct Hash(blake3::Hash);
Expand Down Expand Up @@ -561,4 +597,17 @@ mod tests {

assert_eq!(ser.len(), 32);
}

#[test]
fn test_hash_and_format_parse() {
let hash = Hash::new("hello");

let expected = HashAndFormat::raw(hash);
let actual = expected.to_string().parse::<HashAndFormat>().unwrap();
assert_eq!(expected, actual);

let expected = HashAndFormat::hash_seq(hash);
let actual = expected.to_string().parse::<HashAndFormat>().unwrap();
assert_eq!(expected, actual);
}
}
2 changes: 1 addition & 1 deletion iroh/src/commands/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async fn get_to_stdout_multi(
curr: get::fsm::AtStartRoot,
sender: FlumeProgressSender<GetProgress>,
) -> Result<get::Stats> {
let hash = *curr.hash();
let hash = curr.hash();
let (mut next, _links, collection) = Collection::read_fsm(curr).await?;
sender
.send(GetProgress::FoundCollection {
Expand Down

0 comments on commit 6a630d9

Please sign in to comment.