Skip to content

Commit

Permalink
Fix partial downloads for individual blobs (#1903)
Browse files Browse the repository at this point in the history
## Description

To check if data is available, it is not enough to call get_partial. We
now have get_possibly_partial which returns all 3 possible cases
complete/partial/missing.

contains was renamed to entry_status and removed to PartialMap.
get_partial was replaced with get_possibly_partial and extended as
described above.

Unfortunately a bit of repetition due to the code duplication in the
downloader.

## Notes & open questions

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
  • Loading branch information
rklaehn authored Dec 19, 2023
1 parent 5eb329a commit b390353
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 191 deletions.
103 changes: 58 additions & 45 deletions iroh-bytes/src/store/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ use std::time::SystemTime;

use super::{
EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, PartialMap,
PartialMapEntry, ReadableStore, ValidateProgress,
PartialMapEntry, PossiblyPartialEntry, ReadableStore, ValidateProgress,
};
use crate::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender};
use crate::util::{LivenessTracker, Tag};
Expand Down Expand Up @@ -321,14 +321,34 @@ impl PartialMap for Store {

type PartialEntry = PartialEntry;

fn get_partial(&self, hash: &Hash) -> Option<Self::PartialEntry> {
let entry = self.0.state.read().unwrap().partial.get(hash)?.clone();
Some(PartialEntry {
hash: blake3::Hash::from(*hash),
size: entry.size,
data_path: self.0.options.partial_data_path(*hash, &entry.uuid),
outboard_path: self.0.options.partial_outboard_path(*hash, &entry.uuid),
})
fn entry_status(&self, hash: &Hash) -> EntryStatus {
let state = self.0.state.read().unwrap();
if state.complete.contains_key(hash) {
EntryStatus::Complete
} else if state.partial.contains_key(hash) {
EntryStatus::Partial
} else {
EntryStatus::NotFound
}
}

fn get_possibly_partial(&self, hash: &Hash) -> PossiblyPartialEntry<Self> {
let state = self.0.state.read().unwrap();
if let Some(entry) = state.partial.get(hash) {
PossiblyPartialEntry::Partial(PartialEntry {
hash: blake3::Hash::from(*hash),
size: entry.size,
data_path: self.0.options.partial_data_path(*hash, &entry.uuid),
outboard_path: self.0.options.partial_outboard_path(*hash, &entry.uuid),
})
} else if let Some(entry) = state.complete.get(hash) {
state
.get_entry(hash, entry, &self.0.options)
.map(PossiblyPartialEntry::Complete)
.unwrap_or(PossiblyPartialEntry::NotFound)
} else {
PossiblyPartialEntry::NotFound
}
}

fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<Self::PartialEntry> {
Expand Down Expand Up @@ -574,31 +594,7 @@ impl Map for Store {
fn get(&self, hash: &Hash) -> Option<Self::Entry> {
let state = self.0.state.read().unwrap();
if let Some(entry) = state.complete.get(hash) {
tracing::trace!("got complete: {} {}", hash, entry.size);
let outboard = state.load_outboard(entry.size, hash)?;
// check if we have the data cached
let data = state.data.get(hash).cloned();
Some(Entry {
hash: blake3::Hash::from(*hash),
is_complete: true,
entry: EntryData {
data: if let Some(data) = data {
Either::Left(data)
} else {
// get the data path
let path = if entry.owned_data {
// use the path for the data in the default location
self.owned_data_path(hash)
} else {
// use the first external path. if we don't have any
// we don't have a valid entry
entry.external_path()?.clone()
};
Either::Right((path, entry.size))
},
outboard: Either::Left(outboard),
},
})
state.get_entry(hash, entry, &self.0.options)
} else if let Some(entry) = state.partial.get(hash) {
let data_path = self.0.options.partial_data_path(*hash, &entry.uuid);
let outboard_path = self.0.options.partial_outboard_path(*hash, &entry.uuid);
Expand All @@ -621,17 +617,6 @@ impl Map for Store {
None
}
}

fn contains(&self, hash: &Hash) -> EntryStatus {
let state = self.0.state.read().unwrap();
if state.complete.contains_key(hash) {
EntryStatus::Complete
} else if state.partial.contains_key(hash) {
EntryStatus::Partial
} else {
EntryStatus::NotFound
}
}
}

impl ReadableStore for Store {
Expand Down Expand Up @@ -808,6 +793,34 @@ impl State {
Some(Bytes::from(size.to_le_bytes().to_vec()))
}
}

fn get_entry(&self, hash: &Hash, entry: &CompleteEntry, options: &Options) -> Option<Entry> {
tracing::trace!("got complete: {} {}", hash, entry.size);
let outboard = self.load_outboard(entry.size, hash)?;
// check if we have the data cached
let data = self.data.get(hash).cloned();
Some(Entry {
hash: blake3::Hash::from(*hash),
is_complete: true,
entry: EntryData {
data: if let Some(data) = data {
Either::Left(data)
} else {
// get the data path
let path = if entry.owned_data {
// use the path for the data in the default location
options.owned_data_path(hash)
} else {
// use the first external path. if we don't have any
// we don't have a valid entry
entry.external_path()?.clone()
};
Either::Right((path, entry.size))
},
outboard: Either::Left(outboard),
},
})
}
}

enum ImportFile {
Expand Down
39 changes: 21 additions & 18 deletions iroh-bytes/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::time::SystemTime;

use super::flatten_to_io;
use super::temp_name;
use super::PossiblyPartialEntry;
use super::TempCounterMap;
use crate::{
store::{
Expand Down Expand Up @@ -308,17 +309,6 @@ impl Map for Store {
None
}
}

fn contains(&self, hash: &Hash) -> EntryStatus {
let state = self.0.state.read().unwrap();
if state.complete.contains_key(hash) {
EntryStatus::Complete
} else if state.partial.contains_key(hash) {
EntryStatus::Partial
} else {
EntryStatus::NotFound
}
}
}

impl ReadableStore for Store {
Expand Down Expand Up @@ -385,14 +375,27 @@ impl PartialMap for Store {

type PartialEntry = PartialEntry;

fn get_partial(&self, hash: &Hash) -> Option<PartialEntry> {
fn entry_status(&self, hash: &Hash) -> EntryStatus {
let state = self.0.state.read().unwrap();
let (data, outboard) = state.partial.get(hash)?;
Some(PartialEntry {
hash: (*hash).into(),
outboard: outboard.clone(),
data: data.clone(),
})
if state.complete.contains_key(hash) {
EntryStatus::Complete
} else if state.partial.contains_key(hash) {
EntryStatus::Partial
} else {
EntryStatus::NotFound
}
}

fn get_possibly_partial(&self, hash: &Hash) -> PossiblyPartialEntry<Self> {
let state = self.0.state.read().unwrap();
match state.partial.get(hash) {
Some((data, outboard)) => PossiblyPartialEntry::Partial(PartialEntry {
hash: (*hash).into(),
outboard: outboard.clone(),
data: data.clone(),
}),
None => PossiblyPartialEntry::NotFound,
}
}

fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<PartialEntry> {
Expand Down
27 changes: 18 additions & 9 deletions iroh-bytes/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use futures::{
};
use tokio::{io::AsyncWriteExt, sync::mpsc};

use super::PossiblyPartialEntry;

/// A readonly in memory database for iroh-bytes.
///
/// This is basically just a HashMap, so it does not allow for any modifications
Expand Down Expand Up @@ -207,13 +209,6 @@ impl Map for Store {
data: d.clone(),
})
}

fn contains(&self, hash: &Hash) -> EntryStatus {
match self.0.contains_key(hash) {
true => EntryStatus::Complete,
false => EntryStatus::NotFound,
}
}
}

impl PartialMap for Store {
Expand All @@ -230,9 +225,23 @@ impl PartialMap for Store {
))
}

fn get_partial(&self, _hash: &Hash) -> Option<PartialEntry> {
fn entry_status(&self, hash: &Hash) -> EntryStatus {
match self.0.contains_key(hash) {
true => EntryStatus::Complete,
false => EntryStatus::NotFound,
}
}

fn get_possibly_partial(&self, hash: &Hash) -> PossiblyPartialEntry<Self> {
// return none because we do not have partial entries
None
if let Some((o, d)) = self.0.get(hash) {
PossiblyPartialEntry::Complete(Entry {
outboard: o.clone(),
data: d.clone(),
})
} else {
PossiblyPartialEntry::NotFound
}
}

fn insert_complete(&self, _entry: PartialEntry) -> BoxFuture<'_, io::Result<()>> {
Expand Down
31 changes: 22 additions & 9 deletions iroh-bytes/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ pub enum EntryStatus {
NotFound,
}

/// An entry in a store that supports partial entries.
///
/// This correspnds to [`EntryStatus`], but also includes the entry itself.
#[derive(Debug)]
pub enum PossiblyPartialEntry<D: PartialMap> {
/// A complete entry.
Complete(D::Entry),
/// A partial entry.
Partial(D::PartialEntry),
/// We got nothing.
NotFound,
}

/// An entry for one hash in a bao collection
///
/// The entry has the ability to provide you with an (outboard, data)
Expand Down Expand Up @@ -86,12 +99,6 @@ pub trait Map: Clone + Send + Sync + 'static {
/// This function should not block to perform io. The knowledge about
/// existing entries must be present in memory.
fn get(&self, hash: &Hash) -> Option<Self::Entry>;

/// Find out if the data behind a `hash` is complete, partial, or not present.
///
/// Note that this does not actually verify the on-disc data, but only checks in which section
/// of the store the entry is present.
fn contains(&self, hash: &Hash) -> EntryStatus;
}

/// A partial entry
Expand Down Expand Up @@ -119,13 +126,19 @@ pub trait PartialMap: Map {
/// error e.g. if there is not enough space on disk.
fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<Self::PartialEntry>;

/// Get an existing partial entry.
/// Find out if the data behind a `hash` is complete, partial, or not present.
///
/// Note that this does not actually verify the on-disc data, but only checks in which section
/// of the store the entry is present.
fn entry_status(&self, hash: &Hash) -> EntryStatus;

/// Get an existing entry.
///
/// This will return `None` if there is no partial entry for this hash.
/// This will return either a complete entry, a partial entry, or not found.
///
/// This function should not block to perform io. The knowledge about
/// partial entries must be present in memory.
fn get_partial(&self, hash: &Hash) -> Option<Self::PartialEntry>;
fn get_possibly_partial(&self, hash: &Hash) -> PossiblyPartialEntry<Self>;

/// Upgrade a partial entry to a complete entry.
fn insert_complete(&self, entry: Self::PartialEntry) -> BoxFuture<'_, io::Result<()>>;
Expand Down
Loading

0 comments on commit b390353

Please sign in to comment.