Skip to content

Commit

Permalink
fix(iroh): improve and test blob share (#1979)
Browse files Browse the repository at this point in the history
- adds `iroh.blobs.share`
- adds `iroh.blobs.status`
 
Closes #1978
  • Loading branch information
dignifiedquire authored Feb 2, 2024
1 parent 81124e6 commit 5db247f
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 40 deletions.
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ anyhow = { version = "1" }
bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false }
bytes = "1"
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] }
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] }
flume = "0.11"
futures = "0.3.25"
genawaiter = { version = "0.99", default-features = false, features = ["futures03"] }
Expand Down
125 changes: 125 additions & 0 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use iroh_bytes::store::ValidateProgress;
// use iroh_bytes::util::progress::FlumeProgressSender;
use iroh_bytes::Hash;
use iroh_bytes::{BlobFormat, Tag};
use iroh_net::ticket::BlobTicket;
use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, NodeAddr};
use iroh_sync::actor::OpenState;
use iroh_sync::store::DownloadPolicy;
Expand Down Expand Up @@ -433,6 +434,77 @@ where
self.rpc.rpc(BlobDeleteBlobRequest { hash }).await??;
Ok(())
}

/// Share a blob.
pub async fn share(
&self,
hash: Hash,
blob_format: BlobFormat,
ticket_options: ShareTicketOptions,
) -> Result<BlobTicket> {
let NodeStatusResponse { addr, .. } = self.rpc.rpc(NodeStatusRequest).await??;
let mut node_addr = NodeAddr::new(addr.node_id);
match ticket_options {
ShareTicketOptions::DerpAndAddresses => {
node_addr = node_addr.with_direct_addresses(addr.direct_addresses().copied());
if let Some(url) = addr.derp_url() {
node_addr = node_addr.with_derp_url(url.clone());
}
}
ShareTicketOptions::Derp => {
if let Some(url) = addr.derp_url() {
node_addr = node_addr.with_derp_url(url.clone());
}
}
ShareTicketOptions::Addresses => {
node_addr = node_addr.with_direct_addresses(addr.direct_addresses().copied());
}
}

let ticket = BlobTicket::new(node_addr, hash, blob_format).expect("correct ticket");

Ok(ticket)
}

/// Get the status of a blob.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
// TODO: this could be implemented more efficiently
let reader = self.read(hash).await?;
if reader.is_complete {
Ok(BlobStatus::Complete { size: reader.size })
} else {
Ok(BlobStatus::Partial { size: reader.size })
}
}
}

/// Options when creating a ticket
#[derive(
Copy, Clone, PartialEq, Eq, Default, Debug, derive_more::Display, derive_more::FromStr,
)]
pub enum ShareTicketOptions {
/// Include both the derp URL and the direct addresses.
#[default]
DerpAndAddresses,
/// Only include the derp URL.
Derp,
/// Only include the direct addresses.
Addresses,
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlobStatus {
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
size: u64,
},
/// The blob is stored completely.
Complete {
/// The size of the blob.
size: u64,
},
}

/// Outcome of a blob add operation.
Expand Down Expand Up @@ -1611,4 +1683,57 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_blob_share() -> Result<()> {
let _guard = iroh_test::logging::setup();

let doc_store = iroh_sync::store::memory::Store::default();
let db = iroh_bytes::store::mem::Store::new();
let node = crate::node::Node::builder(db, doc_store).spawn().await?;

// create temp file
let temp_dir = tempfile::tempdir().context("tempdir")?;

let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;

let path = in_root.join("test-blob");
let size = 1024 * 128;
let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;

let client = node.client();

let import_outcome = client
.blobs
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;

let ticket = client
.blobs
.share(import_outcome.hash, BlobFormat::Raw, Default::default())
.await?;
assert_eq!(ticket.hash(), import_outcome.hash);

let status = client.blobs.status(import_outcome.hash).await?;
assert_eq!(status, BlobStatus::Complete { size });

Ok(())
}
}
58 changes: 19 additions & 39 deletions iroh/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ use indicatif::{
ProgressStyle,
};
use iroh::{
client::Iroh,
client::{BlobStatus, Iroh, ShareTicketOptions},
rpc_protocol::{
BlobDownloadRequest, DownloadLocation, NodeStatusResponse, ProviderService, SetTagOption,
WrapOption,
BlobDownloadRequest, DownloadLocation, ProviderService, SetTagOption, WrapOption,
},
ticket::BlobTicket,
};
Expand Down Expand Up @@ -97,12 +96,9 @@ pub enum BlobCommands {
Share {
/// Hash of the blob to share.
hash: Hash,
/// Do not include DERP reion information in the ticket. (advanced)
#[clap(long, conflicts_with = "derp_only", default_value_t = false)]
no_derp: bool,
/// Include only the DERP url information in the ticket. (advanced)
#[clap(long, conflicts_with = "no_derp", default_value_t = false)]
derp_only: bool,
/// Options to configure the generated ticket.
#[clap(long, default_value_t = ShareTicketOptions::DerpAndAddresses)]
ticket_options: ShareTicketOptions,
/// If the blob is a collection, the requester will also fetch the listed blobs.
#[clap(long, default_value_t = false)]
recursive: bool,
Expand Down Expand Up @@ -267,47 +263,31 @@ impl BlobCommands {
} => add_with_opts(iroh, path, options).await,
Self::Share {
hash,
no_derp,
derp_only,
ticket_options,
recursive,
debug,
} => {
let NodeStatusResponse { addr, .. } = iroh.node.status().await?;
let node_addr = if no_derp {
NodeAddr::new(addr.node_id)
.with_direct_addresses(addr.direct_addresses().copied())
} else if derp_only {
if let Some(url) = addr.derp_url() {
NodeAddr::new(addr.node_id).with_derp_url(url.clone())
} else {
addr
}
} else {
addr
};

let blob_reader = iroh
.blobs
.read(hash)
.await
.context("failed to retrieve blob info")?;
let blob_status = if blob_reader.is_complete() {
"blob"
} else {
"incomplete blob"
};

let format = if recursive {
BlobFormat::HashSeq
} else {
BlobFormat::Raw
};

let ticket = BlobTicket::new(node_addr, hash, format).expect("correct ticket");
let status = iroh.blobs.status(hash).await?;
let ticket = iroh.blobs.share(hash, format, ticket_options).await?;

let (blob_status, size) = match (status, format) {
(BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size),
(BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size),
(BlobStatus::Partial { size }, BlobFormat::HashSeq) => {
("incomplete collection", size)
}
};
println!(
"Ticket for {blob_status} {hash} ({})\n{ticket}",
HumanBytes(blob_reader.size())
HumanBytes(size)
);

if debug {
println!("{ticket:#?}")
}
Expand Down

0 comments on commit 5db247f

Please sign in to comment.