Skip to content

Commit

Permalink
refactor(iroh)!: remove tags from downloader (#2348)
Browse files Browse the repository at this point in the history
## Description

Remove tags from downloader. Adding temp or persistent tags is now the
responsibility of the caller.

## Breaking Changes

- iroh_blobs::downloader::DownloadRequest
    - field `tag` is removed
    - fn `untagged` is removed
    - fn `tag` is removed

Docs are changed to indicate that tagging is now responibility of the
caller...

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
rklaehn authored Jun 7, 2024
1 parent 35ce780 commit 82aa93f
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 133 deletions.
68 changes: 13 additions & 55 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ use tracing::{debug, error_span, trace, warn, Instrument};
use crate::{
get::{db::DownloadProgress, Stats},
store::Store,
util::{progress::ProgressSender, SetTagOption, TagSet},
TempTag,
util::progress::ProgressSender,
};

mod get;
Expand Down Expand Up @@ -188,45 +187,29 @@ impl Default for RetryConfig {
pub struct DownloadRequest {
kind: DownloadKind,
nodes: Vec<NodeAddr>,
tag: Option<SetTagOption>,
progress: Option<ProgressSubscriber>,
}

impl DownloadRequest {
/// Create a new download request.
///
/// The blob will be auto-tagged after the download to prevent it from being garbage collected.
/// It is the responsibility of the caller to ensure that the data is tagged either with a
/// temp tag or with a persistent tag to make sure the data is not garbage collected during
/// the download.
///
/// If this is not done, there download will proceed as normal, but there is no guarantee
/// that the data is still available when the download is complete.
pub fn new(
resource: impl Into<DownloadKind>,
nodes: impl IntoIterator<Item = impl Into<NodeAddr>>,
) -> Self {
Self {
kind: resource.into(),
nodes: nodes.into_iter().map(|n| n.into()).collect(),
tag: Some(SetTagOption::Auto),
progress: None,
}
}

/// Create a new untagged download request.
///
/// The blob will not be tagged, so only use this if the blob is already protected from garbage
/// collection through other means.
pub fn untagged(
resource: HashAndFormat,
nodes: impl IntoIterator<Item = impl Into<NodeAddr>>,
) -> Self {
let mut r = Self::new(resource, nodes);
r.tag = None;
r
}

/// Set a tag to apply to the blob after download.
pub fn tag(mut self, tag: SetTagOption) -> Self {
self.tag = Some(tag);
self
}

/// Pass a progress sender to receive progress updates.
pub fn progress_sender(mut self, sender: ProgressSubscriber) -> Self {
self.progress = Some(sender);
Expand Down Expand Up @@ -351,14 +334,7 @@ impl Downloader {
store: store.clone(),
};

let service = Service::new(
store,
getter,
dialer,
concurrency_limits,
retry_config,
msg_rx,
);
let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);

service.run().instrument(error_span!("downloader", %me))
};
Expand Down Expand Up @@ -450,8 +426,6 @@ struct IntentHandlers {
struct RequestInfo {
/// Registered intents with progress senders and result callbacks.
intents: HashMap<IntentId, IntentHandlers>,
/// Tags requested for the blob to be created once the download finishes.
tags: TagSet,
}

/// Information about a request in progress.
Expand All @@ -462,8 +436,6 @@ struct ActiveRequestInfo {
cancellation: CancellationToken,
/// Peer doing this request attempt.
node: NodeId,
/// Temporary tag to protect the partial blob from being garbage collected.
temp_tag: TempTag,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -531,7 +503,7 @@ enum NodeState<'a, Conn> {
}

#[derive(Debug)]
struct Service<G: Getter, D: Dialer, DB: Store> {
struct Service<G: Getter, D: Dialer> {
/// The getter performs individual requests.
getter: G,
/// Map to query for nodes that we believe have the data we are looking for.
Expand Down Expand Up @@ -562,12 +534,9 @@ struct Service<G: Getter, D: Dialer, DB: Store> {
in_progress_downloads: JoinSet<(DownloadKind, InternalDownloadResult)>,
/// Progress tracker
progress_tracker: ProgressTracker,
/// The [`Store`] where tags are saved after a download completes.
db: DB,
}
impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D, DB> {
impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
fn new(
db: DB,
getter: G,
dialer: D,
concurrency_limits: ConcurrencyLimits,
Expand All @@ -590,7 +559,6 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
in_progress_downloads: Default::default(),
progress_tracker: ProgressTracker::new(),
queue: Default::default(),
db,
}
}

Expand All @@ -614,7 +582,7 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
match res {
Ok((kind, result)) => {
trace!(%kind, "tick: transfer completed");
self.on_download_completed(kind, result).await;
self.on_download_completed(kind, result);
}
Err(err) => {
warn!(?err, "transfer task panicked");
Expand Down Expand Up @@ -679,7 +647,6 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
let DownloadRequest {
kind,
nodes,
tag,
progress,
} = request;
debug!(%kind, nodes=?nodes.iter().map(|n| n.node_id.fmt_short()).collect::<Vec<_>>(), "queue intent");
Expand Down Expand Up @@ -732,9 +699,6 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
// store the request info
let request_info = self.requests.entry(kind).or_default();
request_info.intents.insert(intent_id, intent_handlers);
if let Some(tag) = &tag {
request_info.tags.insert(tag.clone());
}
}

/// Cancels a download intent.
Expand Down Expand Up @@ -797,7 +761,7 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
}
}

async fn on_download_completed(&mut self, kind: DownloadKind, result: InternalDownloadResult) {
fn on_download_completed(&mut self, kind: DownloadKind, result: InternalDownloadResult) {
// first remove the request
let active_request_info = self
.active_requests
Expand All @@ -807,7 +771,7 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,
// get general request info
let request_info = self.requests.remove(&kind).expect("request was active");

let ActiveRequestInfo { node, temp_tag, .. } = active_request_info;
let ActiveRequestInfo { node, .. } = active_request_info;

// get node info
let node_info = self
Expand Down Expand Up @@ -867,10 +831,6 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,

if finalize {
let result = result.map_err(|_| DownloadError::DownloadFailed);
if result.is_ok() {
request_info.tags.apply(&self.db, kind.0).await.ok();
}
drop(temp_tag);
self.finalize_download(kind, request_info.intents, result);
} else {
// reinsert the download at the front of the queue to try from the next node
Expand Down Expand Up @@ -1124,11 +1084,9 @@ impl<DB: Store, G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D,

// create the active request state
let cancellation = CancellationToken::new();
let temp_tag = self.db.temp_tag(kind.0);
let state = ActiveRequestInfo {
cancellation: cancellation.clone(),
node,
temp_tag,
};
let conn = node_info.conn.clone();
let get_fut = self.getter.get(kind, conn, progress_sender);
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/downloader/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use super::*;

/// invariants for the service.
impl<G: Getter<Connection = D::Connection>, D: Dialer, S: Store> Service<G, D, S> {
impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
/// Checks the various invariants the service must maintain
#[track_caller]
pub(in crate::downloader) fn check_invariants(&self) {
Expand Down
4 changes: 1 addition & 3 deletions iroh-blobs/src/downloader/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ impl Downloader {
retry_config: RetryConfig,
) -> Self {
let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY);
let db = crate::store::mem::Store::default();

LocalPoolHandle::new(1).spawn_pinned(move || async move {
// we want to see the logs of the service
let _guard = iroh_test::logging::setup();

let service =
Service::new(db, getter, dialer, concurrency_limits, retry_config, msg_rx);
let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
service.run().await
});

Expand Down
51 changes: 1 addition & 50 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
time::SystemTime,
};

use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};
use crate::{BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};

pub mod io;
mod mem_or_file;
Expand Down Expand Up @@ -126,55 +126,6 @@ impl Tag {
}
}

/// A set of merged [`SetTagOption`]s for a blob.
#[derive(Debug, Default)]
pub struct TagSet {
auto: bool,
named: Vec<Tag>,
}

impl TagSet {
/// Insert a new tag into the set.
pub fn insert(&mut self, tag: SetTagOption) {
match tag {
SetTagOption::Auto => self.auto = true,
SetTagOption::Named(tag) => {
if !self.named.iter().any(|t| t == &tag) {
self.named.push(tag)
}
}
}
}

/// Convert the [`TagSet`] into a list of [`SetTagOption`].
pub fn into_tags(self) -> impl Iterator<Item = SetTagOption> {
self.auto
.then_some(SetTagOption::Auto)
.into_iter()
.chain(self.named.into_iter().map(SetTagOption::Named))
}

/// Apply the tags in the [`TagSet`] to the database.
pub async fn apply<D: Store>(
self,
db: &D,
hash_and_format: HashAndFormat,
) -> std::io::Result<()> {
let tags = self.into_tags();
for tag in tags {
match tag {
SetTagOption::Named(tag) => {
db.set_tag(tag, Some(hash_and_format)).await?;
}
SetTagOption::Auto => {
db.create_tag(hash_and_format).await?;
}
}
}
Ok(())
}
}

/// Option for commands that allow setting a tag
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum SetTagOption {
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
self.queued_hashes.insert(hash, namespace);
self.downloader.nodes_have(hash, vec![node]).await;
} else if !only_if_missing || self.missing_hashes.contains(&hash) {
let req = DownloadRequest::untagged(HashAndFormat::raw(hash), vec![node]);
let req = DownloadRequest::new(HashAndFormat::raw(hash), vec![node]);
let handle = self.downloader.queue(req).await;

self.queued_hashes.insert(hash, namespace);
Expand Down
35 changes: 12 additions & 23 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,25 +1012,34 @@ where
mode,
} = req;
let hash_and_format = HashAndFormat { hash, format };
let temp_tag = db.temp_tag(hash_and_format);
let stats = match mode {
DownloadMode::Queued => {
download_queued(
endpoint,
downloader,
hash_and_format,
nodes,
tag,
progress.clone(),
)
.await?
}
DownloadMode::Direct => {
download_direct_from_nodes(db, endpoint, hash_and_format, nodes, tag, progress.clone())
download_direct_from_nodes(db, endpoint, hash_and_format, nodes, progress.clone())
.await?
}
};

progress.send(DownloadProgress::AllDone(stats)).await.ok();
match tag {
SetTagOption::Named(tag) => {
db.set_tag(tag, Some(hash_and_format)).await?;
}
SetTagOption::Auto => {
db.create_tag(hash_and_format).await?;
}
}
drop(temp_tag);

Ok(())
}
Expand All @@ -1040,17 +1049,14 @@ async fn download_queued(
downloader: &Downloader,
hash_and_format: HashAndFormat,
nodes: Vec<NodeAddr>,
tag: SetTagOption,
progress: FlumeProgressSender<DownloadProgress>,
) -> Result<Stats> {
let mut node_ids = Vec::with_capacity(nodes.len());
for node in nodes {
node_ids.push(node.node_id);
endpoint.add_node_addr(node)?;
}
let req = DownloadRequest::new(hash_and_format, node_ids)
.progress_sender(progress)
.tag(tag);
let req = DownloadRequest::new(hash_and_format, node_ids).progress_sender(progress);
let handle = downloader.queue(req).await;
let stats = handle.await?;
Ok(stats)
Expand All @@ -1061,7 +1067,6 @@ async fn download_direct_from_nodes<D>(
endpoint: Endpoint,
hash_and_format: HashAndFormat,
nodes: Vec<NodeAddr>,
tag: SetTagOption,
progress: FlumeProgressSender<DownloadProgress>,
) -> Result<Stats>
where
Expand All @@ -1076,7 +1081,6 @@ where
endpoint.clone(),
hash_and_format,
node,
tag.clone(),
progress.clone(),
)
.await
Expand All @@ -1096,13 +1100,11 @@ async fn download_direct<D>(
endpoint: Endpoint,
hash_and_format: HashAndFormat,
node: NodeAddr,
tag: SetTagOption,
progress: FlumeProgressSender<DownloadProgress>,
) -> Result<Stats>
where
D: BaoStore,
{
let temp_pin = db.temp_tag(hash_and_format);
let get_conn = {
let progress = progress.clone();
move || async move {
Expand All @@ -1114,18 +1116,5 @@ where

let res = iroh_blobs::get::db::get_to_db(db, get_conn, &hash_and_format, progress).await;

if res.is_ok() {
match tag {
SetTagOption::Named(tag) => {
db.set_tag(tag, Some(hash_and_format)).await?;
}
SetTagOption::Auto => {
db.create_tag(hash_and_format).await?;
}
}
}

drop(temp_pin);

res.map_err(Into::into)
}

0 comments on commit 82aa93f

Please sign in to comment.