Skip to content

Commit

Permalink
fix(iroh): do not establish connection if content already exists loca…
Browse files Browse the repository at this point in the history
…lly (#1969)

Avoid establishing a connection in `blobs.download` if content is
already locally available.

Closes #1947
  • Loading branch information
dignifiedquire authored Jan 23, 2024
1 parent f9ac5ad commit f7264ff
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 27 deletions.
33 changes: 25 additions & 8 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::PathBuf;
use std::time::Duration;

use bytes::Bytes;
use futures::Future;
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use iroh_base::{hash::Hash, rpc::RpcError};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -33,26 +34,34 @@ use tracing::trace;
///
/// This considers data that is already in the store, and will only request
/// the remaining data.
pub async fn get_to_db<D: BaoStore>(
pub async fn get_to_db<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
>(
db: &D,
conn: quinn::Connection,
get_conn: C,
hash_and_format: &HashAndFormat,
sender: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> anyhow::Result<Stats> {
let HashAndFormat { hash, format } = hash_and_format;
match format {
BlobFormat::Raw => get_blob(db, conn, hash, sender).await,
BlobFormat::HashSeq => get_hash_seq(db, conn, hash, sender).await,
BlobFormat::Raw => get_blob(db, get_conn, hash, sender).await,
BlobFormat::HashSeq => get_hash_seq(db, get_conn, hash, sender).await,
}
}

/// Get a blob that was requested completely.
///
/// We need to create our own files and handle the case where an outboard
/// is not needed.
async fn get_blob<D: BaoStore>(
async fn get_blob<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
>(
db: &D,
conn: quinn::Connection,
get_conn: C,
hash: &Hash,
progress: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> anyhow::Result<Stats> {
Expand Down Expand Up @@ -87,6 +96,7 @@ async fn get_blob<D: BaoStore>(

let request = GetRequest::new(*hash, RangeSpecSeq::from_ranges([required_ranges]));
// full request
let conn = get_conn().await?;
let request = get::fsm::start(conn, request);
// create a new bidi stream
let connected = request.next().await?;
Expand All @@ -102,6 +112,7 @@ async fn get_blob<D: BaoStore>(
}
PossiblyPartialEntry::NotFound => {
// full request
let conn = get_conn().await?;
let request = get::fsm::start(conn, GetRequest::single(*hash));
// create a new bidi stream
let connected = request.next().await?;
Expand Down Expand Up @@ -300,9 +311,13 @@ async fn blob_infos<D: BaoStore>(db: &D, hash_seq: &[Hash]) -> io::Result<Vec<Bl
}

/// Get a sequence of hashes
async fn get_hash_seq<D: BaoStore>(
async fn get_hash_seq<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
>(
db: &D,
conn: quinn::Connection,
get_conn: C,
root_hash: &Hash,
sender: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> anyhow::Result<Stats> {
Expand Down Expand Up @@ -359,6 +374,7 @@ async fn get_hash_seq<D: BaoStore>(
.collect::<Vec<_>>();
log!("requesting chunks {:?}", missing_iter);
let request = GetRequest::new(*root_hash, RangeSpecSeq::from_ranges(missing_iter));
let conn = get_conn().await?;
let request = get::fsm::start(conn, request);
// create a new bidi stream
let connected = request.next().await?;
Expand Down Expand Up @@ -399,6 +415,7 @@ async fn get_hash_seq<D: BaoStore>(
} else {
tracing::info!("don't have collection - doing full download");
// don't have the collection, so probably got nothing
let conn = get_conn().await?;
let request = get::fsm::start(conn, GetRequest::all(*root_hash));
// create a new bidi stream
let connected = request.next().await?;
Expand Down
38 changes: 19 additions & 19 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,33 +1116,27 @@ impl<D: BaoStore> RpcHandler<D> {
let format = msg.format;
let db = self.inner.db.clone();
let haf = HashAndFormat { hash, format };

let temp_pin = db.temp_tag(haf);
let conn = self
.inner
.endpoint
.connect(msg.peer, iroh_bytes::protocol::ALPN)
.await?;
let ep = self.inner.endpoint.clone();
let get_conn =
move || async move { ep.connect(msg.peer, iroh_bytes::protocol::ALPN).await };
progress.send(DownloadProgress::Connected).await?;
let progress2 = progress.clone();
let progress3 = progress.clone();

let db = self.inner.db.clone();
let db2 = db.clone();
let download = local.spawn_pinned(move || async move {
iroh_bytes::get::db::get_to_db(
&db2,
conn,
let this = self.clone();
let _export = local.spawn_pinned(move || async move {
let stats = iroh_bytes::get::db::get_to_db(
&db,
get_conn,
&HashAndFormat {
hash: msg.hash,
format: msg.format,
},
progress2,
progress.clone(),
)
.await
});
.await?;

let this = self.clone();
let _export = local.spawn_pinned(move || async move {
let stats = download.await.unwrap()?;
progress
.send(DownloadProgress::NetworkDone {
bytes_written: stats.bytes_written,
Expand All @@ -1153,7 +1147,13 @@ impl<D: BaoStore> RpcHandler<D> {
match msg.out {
DownloadLocation::External { path, in_place } => {
if let Err(cause) = this
.blob_export(path, hash, msg.format.is_hash_seq(), in_place, progress3)
.blob_export(
path,
hash,
msg.format.is_hash_seq(),
in_place,
progress.clone(),
)
.await
{
progress.send(DownloadProgress::Abort(cause.into())).await?;
Expand Down

0 comments on commit f7264ff

Please sign in to comment.