Skip to content

Commit

Permalink
refactor(iroh-bytes): Simplify store traits (#2023)
Browse files Browse the repository at this point in the history
## Description

Simplify and rename some store traits, taking advantage of return
position impl trait in trait.

The main change is that the map entries are no longer for one specific
map impl, but just generic map entries, so the type parameter goes away.
Other than that, the PartialXXX traits are renamed to XXXMut, which is
more accurate. An immutable entry can be partial. Since MapEntry does
not guarantee that it is complete (anymore), the main difference between
MapEntry and MapEntryMut is not that the latter allows writing.

Edit: one additional change is to introduce BaoBlobSize, which is a size
together with an information about whether the size is validated or not.
Why not just (u64, bool)? Because later we might add more info to the
unverified enum case.

## Notes & open questions

Note: this is not the end of it. Just trying to do this incrementally.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
rklaehn authored Feb 20, 2024
1 parent af7783e commit 27a8ef1
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 153 deletions.
4 changes: 2 additions & 2 deletions iroh-bytes/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::trace;

use crate::{
format::collection::Collection,
store::{ExportMode, MapEntry, Store as BaoStore},
store::{BaoBlobSize, ExportMode, MapEntry, Store as BaoStore},
util::progress::{IdGenerator, ProgressSender},
Hash,
};
Expand Down Expand Up @@ -99,7 +99,7 @@ pub enum ExportProgress {
/// The hash of the entry.
hash: Hash,
/// The size of the entry in bytes.
size: u64,
size: BaoBlobSize,
/// The path to the file where the data is exported.
outpath: PathBuf,
/// Operation-specific metadata.
Expand Down
21 changes: 13 additions & 8 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use iroh_base::rpc::RpcError;
use serde::{Deserialize, Serialize};

use crate::protocol::RangeSpec;
use crate::store::BaoBlobSize;
use crate::store::FallibleProgressBatchWriter;
use std::io;

Expand All @@ -22,7 +23,7 @@ use crate::{
Stats,
},
protocol::{GetRequest, RangeSpecSeq},
store::{MapEntry, PartialMap, PartialMapEntry, Store as BaoStore},
store::{MapEntry, MapEntryMut, MapMut, Store as BaoStore},
util::progress::{IdGenerator, ProgressSender},
BlobFormat, HashAndFormat,
};
Expand Down Expand Up @@ -143,7 +144,7 @@ async fn get_blob<
}

/// Given a partial entry, get the valid ranges.
pub async fn valid_ranges<D: PartialMap>(entry: &D::PartialEntry) -> anyhow::Result<ChunkRanges> {
pub async fn valid_ranges<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<ChunkRanges> {
use tracing::trace as log;
// compute the valid range from just looking at the data file
let mut data_reader = entry.data_reader().await?;
Expand Down Expand Up @@ -203,6 +204,7 @@ async fn get_blob_inner<D: BaoStore>(
let end = at_content.write_all_batch(&mut bw).await?;
// sync the underlying storage, if needed
bw.sync().await?;
drop(bw);
db.insert_complete(entry).await?;
// notify that we are done
sender.send(DownloadProgress::Done { id }).await?;
Expand All @@ -216,7 +218,7 @@ async fn get_blob_inner<D: BaoStore>(
async fn get_blob_inner_partial<D: BaoStore>(
db: &D,
at_header: AtBlobHeader,
entry: D::PartialEntry,
entry: D::EntryMut,
sender: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> Result<AtEndBlob, GetError> {
// read the size. The size we get here is not verified, but since we use
Expand Down Expand Up @@ -253,6 +255,7 @@ async fn get_blob_inner_partial<D: BaoStore>(
let at_end = at_content.write_all_batch(&mut bw).await?;
// sync the underlying storage, if needed
bw.sync().await?;
drop(bw);
// we got to the end without error, so we can mark the entry as complete
//
// caution: this assumes that the request filled all the gaps in our local
Expand All @@ -278,7 +281,9 @@ pub async fn blob_info<D: BaoStore>(db: &D, hash: &Hash) -> io::Result<BlobInfo<
valid_ranges,
}
}
PossiblyPartialEntry::Complete(entry) => BlobInfo::Complete { size: entry.size() },
PossiblyPartialEntry::Complete(entry) => BlobInfo::Complete {
size: entry.size().value(),
},
PossiblyPartialEntry::NotFound => BlobInfo::Missing,
})
}
Expand Down Expand Up @@ -467,7 +472,7 @@ pub enum BlobInfo<D: BaoStore> {
/// we have the blob partially
Partial {
/// The partial entry.
entry: D::PartialEntry,
entry: D::EntryMut,
/// The ranges that are available locally.
valid_ranges: ChunkRanges,
},
Expand All @@ -477,9 +482,9 @@ pub enum BlobInfo<D: BaoStore> {

impl<D: BaoStore> BlobInfo<D> {
/// The size of the blob, if known.
pub fn size(&self) -> Option<u64> {
pub fn size(&self) -> Option<BaoBlobSize> {
match self {
BlobInfo::Complete { size } => Some(*size),
BlobInfo::Complete { size } => Some(BaoBlobSize::Verified(*size)),
BlobInfo::Partial { entry, .. } => Some(entry.size()),
BlobInfo::Missing => None,
}
Expand Down Expand Up @@ -520,7 +525,7 @@ pub enum DownloadProgress {
/// The hash of the entry.
hash: Hash,
/// The size of the entry in bytes.
size: u64,
size: BaoBlobSize,
/// The ranges that are available locally.
valid_ranges: RangeSpec,
},
Expand Down
6 changes: 3 additions & 3 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroh_base::rpc::RpcError;
use iroh_io::stats::{
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,
};
use iroh_io::{AsyncStreamWriter, TokioStreamWriter};
use iroh_io::{AsyncSliceReader, AsyncStreamWriter, TokioStreamWriter};
use serde::{Deserialize, Serialize};
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, debug_span, info, trace, warn};
Expand Down Expand Up @@ -184,8 +184,8 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
// Response writer, containing the quinn stream.
writer: &mut ResponseWriter<E>,
// the collection to transfer
mut outboard: D::Outboard,
mut data: D::DataReader,
mut outboard: impl Outboard,
mut data: impl AsyncSliceReader,
stats: &mut TransferStats,
) -> Result<SentStatus> {
let hash = request.hash;
Expand Down
57 changes: 28 additions & 29 deletions iroh-bytes/src/store/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ use std::sync::{Arc, Mutex, RwLock};
use std::time::SystemTime;

use super::{
CombinedBatchWriter, EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry,
PartialMap, PartialMapEntry, PossiblyPartialEntry, ReadableStore, ValidateProgress,
BaoBatchWriter, BaoBlobSize, CombinedBatchWriter, EntryStatus, ExportMode, ImportMode,
ImportProgress, Map, MapEntry, MapEntryMut, MapMut, PossiblyPartialEntry, ReadableStore,
ValidateProgress,
};
use crate::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender};
use crate::util::{LivenessTracker, Tag};
use crate::{BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE};
use bao_tree::io::fsm::Outboard;
use bao_tree::io::outboard::{PostOrderMemOutboard, PreOrderOutboard};
use bao_tree::io::sync::ReadAt;
use bao_tree::ChunkRanges;
Expand Down Expand Up @@ -239,20 +241,20 @@ impl PartialEntryData {
}
}

impl MapEntry<Store> for PartialEntry {
impl MapEntry for EntryMut {
fn hash(&self) -> Hash {
self.hash
}

fn size(&self) -> u64 {
self.size
fn size(&self) -> BaoBlobSize {
BaoBlobSize::new(self.size, self.is_complete())
}

async fn available_ranges(&self) -> io::Result<ChunkRanges> {
Ok(ChunkRanges::all())
}

async fn outboard(&self) -> io::Result<PreOrderOutboard<MemOrFile>> {
async fn outboard(&self) -> io::Result<impl Outboard> {
let file = File::open(self.outboard_path.clone()).await?;
Ok(PreOrderOutboard {
root: self.hash.into(),
Expand All @@ -261,7 +263,7 @@ impl MapEntry<Store> for PartialEntry {
})
}

async fn data_reader(&self) -> io::Result<MemOrFile> {
async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
let file = File::open(self.data_path.clone()).await?;
Ok(MemOrFile::File(file))
}
Expand All @@ -271,7 +273,7 @@ impl MapEntry<Store> for PartialEntry {
}
}

impl PartialEntry {
impl EntryMut {
async fn outboard_mut(&self) -> io::Result<PreOrderOutboard<File>> {
let hash = self.hash;
let size = self.size;
Expand Down Expand Up @@ -304,18 +306,16 @@ impl PartialEntry {
}
}

impl PartialMapEntry<Store> for PartialEntry {
async fn batch_writer(&self) -> io::Result<<Store as PartialMap>::BatchWriter> {
impl MapEntryMut for EntryMut {
async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
let data = self.data_writer().await?;
let outboard = self.outboard_mut().await?;
Ok(CombinedBatchWriter { data, outboard })
}
}

impl PartialMap for Store {
type PartialEntry = PartialEntry;

type BatchWriter = CombinedBatchWriter<File, PreOrderOutboard<File>>;
impl MapMut for Store {
type EntryMut = EntryMut;

fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
let state = self.0.state.read().unwrap();
Expand All @@ -331,7 +331,7 @@ impl PartialMap for Store {
fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
let state = self.0.state.read().unwrap();
Ok(if let Some(entry) = state.partial.get(hash) {
PossiblyPartialEntry::Partial(PartialEntry {
PossiblyPartialEntry::Partial(EntryMut {
hash: *hash,
size: entry.size,
data_path: self.0.options.partial_data_path(*hash, &entry.uuid),
Expand All @@ -347,7 +347,7 @@ impl PartialMap for Store {
})
}

fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<Self::PartialEntry> {
fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<Self::EntryMut> {
let mut state = self.0.state.write().unwrap();
// this protects the entry from being deleted until the next mark phase
//
Expand All @@ -368,15 +368,15 @@ impl PartialMap for Store {
.or_insert_with(|| PartialEntryData::new(size, new_uuid()));
let data_path = self.0.options.partial_data_path(hash, &entry.uuid);
let outboard_path = self.0.options.partial_outboard_path(hash, &entry.uuid);
Ok(PartialEntry {
Ok(EntryMut {
hash,
size: entry.size,
data_path,
outboard_path,
})
}

async fn insert_complete(&self, entry: Self::PartialEntry) -> io::Result<()> {
async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.insert_complete_sync(entry))
.map(flatten_to_io)
Expand Down Expand Up @@ -449,23 +449,24 @@ pub struct Entry {
is_complete: bool,
}

impl MapEntry<Store> for Entry {
impl MapEntry for Entry {
fn hash(&self) -> Hash {
self.hash
}

fn size(&self) -> u64 {
match &self.entry.data {
fn size(&self) -> BaoBlobSize {
let size = match &self.entry.data {
Either::Left(bytes) => bytes.len() as u64,
Either::Right((_, size)) => *size,
}
};
BaoBlobSize::new(size, self.is_complete())
}

async fn available_ranges(&self) -> io::Result<ChunkRanges> {
Ok(ChunkRanges::all())
}

async fn outboard(&self) -> io::Result<PreOrderOutboard<MemOrFile>> {
async fn outboard(&self) -> io::Result<impl Outboard> {
let size = self.entry.size();
let data = self.entry.outboard_reader().await?;
Ok(PreOrderOutboard {
Expand All @@ -475,7 +476,7 @@ impl MapEntry<Store> for Entry {
})
}

async fn data_reader(&self) -> io::Result<MemOrFile> {
async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
self.entry.data_reader().await
}

Expand Down Expand Up @@ -561,9 +562,9 @@ fn needs_outboard(size: u64) -> bool {
size > (IROH_BLOCK_SIZE.bytes() as u64)
}

/// The [PartialMapEntry] implementation for [Store].
/// The [MapEntryMut] implementation for [Store].
#[derive(Debug, Clone)]
pub struct PartialEntry {
pub struct EntryMut {
hash: Hash,
size: u64,
data_path: PathBuf,
Expand All @@ -572,8 +573,6 @@ pub struct PartialEntry {

impl Map for Store {
type Entry = Entry;
type Outboard = PreOrderOutboard<MemOrFile>;
type DataReader = MemOrFile;
fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
let state = self.0.state.read().unwrap();
Ok(if let Some(entry) = state.complete.get(hash) {
Expand Down Expand Up @@ -1068,7 +1067,7 @@ impl Store {
Ok(())
}

fn insert_complete_sync(&self, entry: PartialEntry) -> io::Result<()> {
fn insert_complete_sync(&self, entry: EntryMut) -> io::Result<()> {
let hash = entry.hash;
let data_path = self.0.options.owned_data_path(&hash);
let size = entry.size;
Expand Down
Loading

0 comments on commit 27a8ef1

Please sign in to comment.