From cbdda420106ee2479ccca35bfb2ec8ed8516af2f Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 20 Mar 2024 12:34:08 +0200 Subject: [PATCH 1/5] Make the entire bao-file module pub(crate) --- iroh-bytes/src/store.rs | 3 ++- iroh-bytes/src/store/bao_file.rs | 12 ++++++------ iroh-bytes/src/store/file.rs | 8 ++++---- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/iroh-bytes/src/store.rs b/iroh-bytes/src/store.rs index 9ff043c316..3f6847f908 100644 --- a/iroh-bytes/src/store.rs +++ b/iroh-bytes/src/store.rs @@ -1,6 +1,7 @@ //! Implementations of blob stores use crate::{BlobFormat, Hash, HashAndFormat}; -pub mod bao_file; + +mod bao_file; pub mod mem; pub mod readonly_mem; diff --git a/iroh-bytes/src/store/bao_file.rs b/iroh-bytes/src/store/bao_file.rs index d163e6d4af..f1d001bf57 100644 --- a/iroh-bytes/src/store/bao_file.rs +++ b/iroh-bytes/src/store/bao_file.rs @@ -82,7 +82,7 @@ struct DataPaths { /// For the memory variant, it does reading in a zero copy way, since storage /// is already a `Bytes`. #[derive(Default, derive_more::Debug)] -pub struct CompleteMemOrFileStorage { +pub struct CompleteStorage { /// data part, which can be in memory or on disk. #[debug("{:?}", data.as_ref().map_mem(|x| x.len()))] pub data: MemOrFile, @@ -91,7 +91,7 @@ pub struct CompleteMemOrFileStorage { pub outboard: MemOrFile, } -impl CompleteMemOrFileStorage { +impl CompleteStorage { /// Read from the data file at the given offset, until end of file or max bytes. pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes { match &self.data { @@ -135,7 +135,7 @@ fn create_read_write(path: impl AsRef) -> io::Result { .open(path) } -/// Mutabie in memory storage for a bao file. +/// Mutable in memory storage for a bao file. /// /// This is used for incomplete files if they are not big enough to warrant /// writing to disk. We must keep track of ranges in both data and outboard @@ -400,7 +400,7 @@ impl FileStorage { /// The storage for a bao file. This can be either in memory or on disk. #[derive(Debug)] -pub enum BaoFileStorage { +pub(crate) enum BaoFileStorage { /// The entry is incomplete and in memory. /// /// Since it is incomplete, it must be writeable. @@ -417,7 +417,7 @@ pub enum BaoFileStorage { /// (memory or file). /// /// Writing to this is a no-op, since it is already complete. - Complete(CompleteMemOrFileStorage), + Complete(CompleteStorage), } impl Default for BaoFileStorage { @@ -668,7 +668,7 @@ impl BaoFileHandle { data: MemOrFile, outboard: MemOrFile, ) -> Self { - let storage = BaoFileStorage::Complete(CompleteMemOrFileStorage { data, outboard }); + let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard }); Self(Arc::new(BaoFileHandleInner { storage: RwLock::new(storage), config, diff --git a/iroh-bytes/src/store/file.rs b/iroh-bytes/src/store/file.rs index 4ec9e393b3..79d4bcd366 100644 --- a/iroh-bytes/src/store/file.rs +++ b/iroh-bytes/src/store/file.rs @@ -99,7 +99,7 @@ mod validate; use crate::{ store::{ - bao_file::{BaoFileStorage, CompleteMemOrFileStorage}, + bao_file::{BaoFileStorage, CompleteStorage}, file::{ tables::BaoFilePart, util::{overwrite_and_sync, read_and_remove, ProgressReader}, @@ -1340,7 +1340,7 @@ impl ReadableStore for Store { } } -impl crate::store::traits::Store for Store { +impl super::Store for Store { async fn import_file( &self, path: PathBuf, @@ -2418,7 +2418,7 @@ fn complete_storage( path_options: &PathOptions, inline_options: &InlineOptions, delete_after_commit: &mut DeleteSet, -) -> ActorResult> { +) -> ActorResult> { let (data, outboard, _sizes) = match storage { BaoFileStorage::Complete(c) => return Ok(Err(c)), BaoFileStorage::IncompleteMem(storage) => { @@ -2496,5 +2496,5 @@ fn complete_storage( // mark sizes for deletion after commit in any case - a complete entry // does not need sizes. delete_after_commit.insert(*hash, [BaoFilePart::Sizes]); - Ok(Ok(CompleteMemOrFileStorage { data, outboard })) + Ok(Ok(CompleteStorage { data, outboard })) } From 11d4ffda01969a13774d421fe2e3f98244b20d7a Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 20 Mar 2024 15:15:48 +0200 Subject: [PATCH 2/5] disentangle feature flags --- iroh-bytes/src/store.rs | 2 + iroh-bytes/src/store/bao_file.rs | 246 ++++-------------- iroh-bytes/src/store/file.rs | 24 +- .../src/store/file/import_flat_store.rs | 7 +- iroh-bytes/src/store/file/test_support.rs | 6 +- iroh-bytes/src/store/file/tests.rs | 2 +- iroh-bytes/src/store/mem.rs | 4 +- iroh-bytes/src/store/mutable_mem_storage.rs | 123 +++++++++ iroh-bytes/src/util.rs | 39 ++- 9 files changed, 235 insertions(+), 218 deletions(-) create mode 100644 iroh-bytes/src/store/mutable_mem_storage.rs diff --git a/iroh-bytes/src/store.rs b/iroh-bytes/src/store.rs index 3f6847f908..f42204bf29 100644 --- a/iroh-bytes/src/store.rs +++ b/iroh-bytes/src/store.rs @@ -1,8 +1,10 @@ //! Implementations of blob stores use crate::{BlobFormat, Hash, HashAndFormat}; +#[cfg(feature = "file-db")] mod bao_file; pub mod mem; +mod mutable_mem_storage; pub mod readonly_mem; #[cfg(feature = "file-db")] diff --git a/iroh-bytes/src/store/bao_file.rs b/iroh-bytes/src/store/bao_file.rs index f1d001bf57..684607273e 100644 --- a/iroh-bytes/src/store/bao_file.rs +++ b/iroh-bytes/src/store/bao_file.rs @@ -31,11 +31,13 @@ use iroh_io::AsyncSliceReader; use crate::{ store::BaoBatchWriter, - util::{MemOrFile, SparseMemFile}, + util::{get_limited_slice, MemOrFile, SparseMemFile}, IROH_BLOCK_SIZE, }; use iroh_base::hash::Hash; +use super::mutable_mem_storage::{MutableMemStorage, SizeInfo}; + /// Data files are stored in 3 files. The data file, the outboard file, /// and a sizes file. The sizes file contains the size that the remote side told us /// when writing each data block. @@ -135,162 +137,6 @@ fn create_read_write(path: impl AsRef) -> io::Result { .open(path) } -/// Mutable in memory storage for a bao file. -/// -/// This is used for incomplete files if they are not big enough to warrant -/// writing to disk. We must keep track of ranges in both data and outboard -/// that have been written to, and track the most precise known size. -#[derive(Debug, Default)] -pub struct MutableMemStorage { - /// Data file, can be any size. - data: SparseMemFile, - /// Outboard file, must be a multiple of 64 bytes. - outboard: SparseMemFile, - /// Size that was announced as we wrote that chunk - sizes: SizeInfo, -} - -/// Keep track of the most precise size we know of. -/// -/// When in memory, we don't have to write the size for every chunk to a separate -/// slot, but can just keep the best one. -#[derive(Debug, Default)] -pub struct SizeInfo { - offset: u64, - size: u64, -} - -impl SizeInfo { - /// Create a new size info for a complete file of size `size`. - pub(crate) fn complete(size: u64) -> Self { - let mask = (1 << IROH_BLOCK_SIZE.0) - 1; - // offset of the last bao chunk in a file of size `size` - let last_chunk_offset = size & mask; - Self { - offset: last_chunk_offset, - size, - } - } - - /// Write a size at the given offset. The size at the highest offset is going to be kept. - fn write(&mut self, offset: u64, size: u64) { - // >= instead of > because we want to be able to update size 0, the initial value. - if offset >= self.offset { - self.offset = offset; - self.size = size; - } - } - - /// Persist into a file where each chunk has its own slot. - fn persist(&self, mut target: impl WriteAt) -> io::Result<()> { - if self.offset & ((IROH_BLOCK_SIZE.bytes() as u64) - 1) != 0 { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "offset not aligned", - )); - } - let size_offset = (self.offset >> IROH_BLOCK_SIZE.0) << 3; - target.write_all_at(size_offset, self.size.to_le_bytes().as_slice())?; - Ok(()) - } - - /// The current size, representing the most correct size we know. - pub fn current_size(&self) -> u64 { - self.size - } - - /// Convert to a vec in slot format. - pub fn to_vec(&self) -> Vec { - let mut res = Vec::new(); - self.persist(&mut res).expect("io error writing to vec"); - res - } -} - -impl MutableMemStorage { - /// Get the parts data, outboard and sizes - pub fn into_parts(self) -> (SparseMemFile, SparseMemFile, SizeInfo) { - (self.data, self.outboard, self.sizes) - } - - /// Create a new mutable mem storage from the given data - pub fn complete(bytes: Bytes) -> (Self, iroh_base::hash::Hash) { - let (outboard, hash) = raw_outboard(bytes.as_ref()); - let res = Self { - data: bytes.to_vec().into(), - outboard: outboard.into(), - sizes: SizeInfo::complete(bytes.len() as u64), - }; - (res, hash) - } - - /// Persist the batch to disk, creating a FileBatch. - fn persist(&self, paths: DataPaths) -> io::Result { - let mut data = create_read_write(&paths.data)?; - let mut outboard = create_read_write(&paths.outboard)?; - let mut sizes = create_read_write(&paths.sizes)?; - self.data.persist(&mut data)?; - self.outboard.persist(&mut outboard)?; - self.sizes.persist(&mut sizes)?; - data.sync_all()?; - outboard.sync_all()?; - sizes.sync_all()?; - Ok(FileStorage { - data, - outboard, - sizes, - }) - } - - pub(super) fn current_size(&self) -> u64 { - self.sizes.current_size() - } - - pub(super) fn read_data_at(&self, offset: u64, len: usize) -> Bytes { - copy_limited_slice(&self.data, offset, len) - } - - pub(super) fn data_len(&self) -> u64 { - self.data.len() as u64 - } - - pub(super) fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes { - copy_limited_slice(&self.outboard, offset, len) - } - - pub(super) fn outboard_len(&self) -> u64 { - self.outboard.len() as u64 - } - - pub(super) fn write_batch( - &mut self, - size: u64, - batch: &[BaoContentItem], - ) -> std::io::Result<()> { - let tree = BaoTree::new(ByteNum(size), IROH_BLOCK_SIZE); - for item in batch { - match item { - BaoContentItem::Parent(parent) => { - if let Some(offset) = tree.pre_order_offset(parent.node) { - let o0 = offset - .checked_mul(64) - .expect("u64 overflow multiplying to hash pair offset"); - let o1 = o0.checked_add(32).expect("u64 overflow"); - let outboard = &mut self.outboard; - outboard.write_all_at(o0, parent.pair.0.as_bytes().as_slice())?; - outboard.write_all_at(o1, parent.pair.1.as_bytes().as_slice())?; - } - } - BaoContentItem::Leaf(leaf) => { - self.sizes.write(leaf.offset.0, size); - self.data.write_all_at(leaf.offset.0, leaf.data.as_ref())?; - } - } - } - Ok(()) - } -} - /// Read from the given file at the given offset, until end of file or max bytes. fn read_to_end(file: impl ReadAt, offset: u64, max: usize) -> io::Result { let mut res = BytesMut::new(); @@ -786,6 +632,53 @@ impl BaoFileHandle { } } +impl SizeInfo { + /// Persist into a file where each chunk has its own slot. + pub fn persist(&self, mut target: impl WriteAt) -> io::Result<()> { + if self.offset & ((IROH_BLOCK_SIZE.bytes() as u64) - 1) != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "offset not aligned", + )); + } + let size_offset = (self.offset >> IROH_BLOCK_SIZE.0) << 3; + target.write_all_at(size_offset, self.size.to_le_bytes().as_slice())?; + Ok(()) + } + + /// Convert to a vec in slot format. + pub fn to_vec(&self) -> Vec { + let mut res = Vec::new(); + self.persist(&mut res).expect("io error writing to vec"); + res + } +} + +impl MutableMemStorage { + /// Persist the batch to disk, creating a FileBatch. + fn persist(&self, paths: DataPaths) -> io::Result { + let mut data = create_read_write(&paths.data)?; + let mut outboard = create_read_write(&paths.outboard)?; + let mut sizes = create_read_write(&paths.sizes)?; + self.data.persist(&mut data)?; + self.outboard.persist(&mut outboard)?; + self.sizes.persist(&mut sizes)?; + data.sync_all()?; + outboard.sync_all()?; + sizes.sync_all()?; + Ok(FileStorage { + data, + outboard, + sizes, + }) + } + + /// Get the parts data, outboard and sizes + pub fn into_parts(self) -> (SparseMemFile, SparseMemFile, SizeInfo) { + (self.data, self.outboard, self.sizes) + } +} + /// This is finally the thing for which we can implement BaoPairMut. /// /// It is a BaoFileHandle wrapped in an Option, so that we can take it out @@ -877,28 +770,6 @@ pub(crate) fn parse_hash_pair(buf: Bytes) -> io::Result<(blake3::Hash, blake3::H Ok((l_hash, r_hash)) } -pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range { - if offset < buf_len as u64 { - let start = offset as usize; - let end = start.saturating_add(len).min(buf_len); - start..end - } else { - 0..0 - } -} - -/// zero copy get a limited slice from a `Bytes` as a `Bytes`. -fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes { - bytes.slice(limited_range(offset, len, bytes.len())) -} - -/// copy a limited slice from a slice as a `Bytes`. -fn copy_limited_slice(bytes: &[u8], offset: u64, len: usize) -> Bytes { - bytes[limited_range(offset, len, bytes.len())] - .to_vec() - .into() -} - #[cfg(test)] pub mod test_support { use std::{io::Cursor, ops::Range}; @@ -918,6 +789,8 @@ pub mod test_support { use range_collections::RangeSet2; use tokio::io::AsyncRead; + use crate::util::limited_range; + use super::*; pub const IROH_BLOCK_SIZE: BlockSize = BlockSize(4); @@ -1206,18 +1079,3 @@ mod tests { println!("{:?}", handle); } } - -/// Compute raw outboard size, without the size header. -#[allow(dead_code)] -pub(crate) fn raw_outboard_size(size: u64) -> u64 { - bao_tree::io::outboard_size(size, IROH_BLOCK_SIZE) - 8 -} - -/// Compute raw outboard, without the size header. -#[allow(dead_code)] -pub(crate) fn raw_outboard(data: &[u8]) -> (Vec, Hash) { - let (mut outboard, hash) = bao_tree::io::outboard(data, IROH_BLOCK_SIZE); - // remove the size header - outboard.splice(0..8, []); - (outboard, hash.into()) -} diff --git a/iroh-bytes/src/store/file.rs b/iroh-bytes/src/store/file.rs index 79d4bcd366..c5411e3f9d 100644 --- a/iroh-bytes/src/store/file.rs +++ b/iroh-bytes/src/store/file.rs @@ -107,7 +107,7 @@ use crate::{ }, util::{ progress::{IdGenerator, IgnoreProgressSender, ProgressSendError, ProgressSender}, - LivenessTracker, MemOrFile, + raw_outboard_size, LivenessTracker, MemOrFile, }, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -118,7 +118,7 @@ use self::{tables::DeleteSet, util::PeekableFlumeReceiver}; use self::test_support::EntryData; use super::{ - bao_file::{raw_outboard_size, BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb}, + bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb}, temp_name, BaoBatchWriter, BaoBlobSize, EntryStatus, ExportMode, ExportProgressCb, ImportMode, ImportProgress, ReadableStore, TempCounterMap, ValidateProgress, }; @@ -337,9 +337,9 @@ impl redb::RedbValue for EntryState { #[derive(Debug, Clone)] pub struct InlineOptions { /// Maximum data size to inline. - max_data_inlined: u64, + pub max_data_inlined: u64, /// Maximum outboard size to inline. - max_outboard_inlined: u64, + pub max_outboard_inlined: u64, } impl InlineOptions { @@ -368,11 +368,11 @@ impl Default for InlineOptions { #[derive(Debug, Clone)] pub struct PathOptions { /// Path to the directory where data and outboard files are stored. - data_path: PathBuf, + pub data_path: PathBuf, /// Path to the directory where temp files are stored. /// This *must* be on the same device as `data_path`, since we need to /// atomically move temp files into place. - temp_path: PathBuf, + pub temp_path: PathBuf, } impl PathOptions { @@ -404,13 +404,13 @@ impl PathOptions { #[derive(Debug, Clone)] pub struct BatchOptions { /// Maximum number of actor messages to batch before creating a new read transaction. - max_read_batch: usize, - /// Maximum number of actor messages to batch before committing write transaction. - max_write_batch: usize, + pub max_read_batch: usize, /// Maximum duration to wait before committing a read transaction. - max_read_duration: Duration, + pub max_read_duration: Duration, + /// Maximum number of actor messages to batch before committing write transaction. + pub max_write_batch: usize, /// Maximum duration to wait before committing a write transaction. - max_write_duration: Duration, + pub max_write_duration: Duration, } impl Default for BatchOptions { @@ -426,7 +426,7 @@ impl Default for BatchOptions { /// Options for the file store. #[derive(Debug, Clone)] -pub struct Options { +pub(crate) struct Options { path: PathOptions, /// Inline storage options. inline: InlineOptions, diff --git a/iroh-bytes/src/store/file/import_flat_store.rs b/iroh-bytes/src/store/file/import_flat_store.rs index 7a4348d15c..66b0252c5b 100644 --- a/iroh-bytes/src/store/file/import_flat_store.rs +++ b/iroh-bytes/src/store/file/import_flat_store.rs @@ -9,11 +9,8 @@ use std::{ }; use crate::{ - store::{ - bao_file::raw_outboard_size, - file::{tables::Tables, DataLocation, EntryState, OutboardLocation}, - }, - util::Tag, + store::file::{tables::Tables, DataLocation, EntryState, OutboardLocation}, + util::{raw_outboard_size, Tag}, IROH_BLOCK_SIZE, }; diff --git a/iroh-bytes/src/store/file/test_support.rs b/iroh-bytes/src/store/file/test_support.rs index b57b7ecec2..d6c715d145 100644 --- a/iroh-bytes/src/store/file/test_support.rs +++ b/iroh-bytes/src/store/file/test_support.rs @@ -15,10 +15,8 @@ use super::{ OutboardLocation, OuterResult, Store, StoreInner, }; use crate::{ - store::{ - bao_file::{raw_outboard_size, SizeInfo}, - DbIter, - }, + store::{mutable_mem_storage::SizeInfo, DbIter}, + util::raw_outboard_size, Hash, }; use redb::ReadableTable; diff --git a/iroh-bytes/src/store/file/tests.rs b/iroh-bytes/src/store/file/tests.rs index 786e4d0472..9e03778767 100644 --- a/iroh-bytes/src/store/file/tests.rs +++ b/iroh-bytes/src/store/file/tests.rs @@ -3,11 +3,11 @@ use iroh_io::AsyncSliceReaderExt; use std::io::Cursor; use std::time::Duration; -use crate::store::bao_file::raw_outboard; use crate::store::bao_file::test_support::{ decode_response_into_batch, make_wire_data, random_test_data, simulate_remote, validate, }; use crate::store::{Map as _, MapEntry, MapEntryMut, MapMut, Store as _}; +use crate::util::raw_outboard; macro_rules! assert_matches { ($expression:expr, $pattern:pat) => { diff --git a/iroh-bytes/src/store/mem.rs b/iroh-bytes/src/store/mem.rs index a4d9d25fea..51100d2f7f 100644 --- a/iroh-bytes/src/store/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -18,7 +18,9 @@ use std::{ }; use crate::{ - store::{bao_file::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore}, + store::{ + mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore, + }, util::{ progress::{IdGenerator, IgnoreProgressSender, ProgressSender}, LivenessTracker, diff --git a/iroh-bytes/src/store/mutable_mem_storage.rs b/iroh-bytes/src/store/mutable_mem_storage.rs new file mode 100644 index 0000000000..41a22bf92c --- /dev/null +++ b/iroh-bytes/src/store/mutable_mem_storage.rs @@ -0,0 +1,123 @@ +use bao_tree::{ + io::{fsm::BaoContentItem, sync::WriteAt}, + BaoTree, ByteNum, +}; +use bytes::Bytes; + +use crate::{ + util::{copy_limited_slice, raw_outboard, SparseMemFile}, + IROH_BLOCK_SIZE, +}; + +/// Mutable in memory storage for a bao file. +/// +/// This is used for incomplete files if they are not big enough to warrant +/// writing to disk. We must keep track of ranges in both data and outboard +/// that have been written to, and track the most precise known size. +#[derive(Debug, Default)] +pub struct MutableMemStorage { + /// Data file, can be any size. + pub data: SparseMemFile, + /// Outboard file, must be a multiple of 64 bytes. + pub outboard: SparseMemFile, + /// Size that was announced as we wrote that chunk + pub sizes: SizeInfo, +} + +/// Keep track of the most precise size we know of. +/// +/// When in memory, we don't have to write the size for every chunk to a separate +/// slot, but can just keep the best one. +#[derive(Debug, Default)] +pub struct SizeInfo { + pub offset: u64, + pub size: u64, +} + +impl SizeInfo { + /// Create a new size info for a complete file of size `size`. + pub(crate) fn complete(size: u64) -> Self { + let mask = (1 << IROH_BLOCK_SIZE.0) - 1; + // offset of the last bao chunk in a file of size `size` + let last_chunk_offset = size & mask; + Self { + offset: last_chunk_offset, + size, + } + } + + /// Write a size at the given offset. The size at the highest offset is going to be kept. + fn write(&mut self, offset: u64, size: u64) { + // >= instead of > because we want to be able to update size 0, the initial value. + if offset >= self.offset { + self.offset = offset; + self.size = size; + } + } + + /// The current size, representing the most correct size we know. + pub fn current_size(&self) -> u64 { + self.size + } +} + +impl MutableMemStorage { + /// Create a new mutable mem storage from the given data + pub fn complete(bytes: Bytes) -> (Self, iroh_base::hash::Hash) { + let (outboard, hash) = raw_outboard(bytes.as_ref()); + let res = Self { + data: bytes.to_vec().into(), + outboard: outboard.into(), + sizes: SizeInfo::complete(bytes.len() as u64), + }; + (res, hash) + } + + pub(super) fn current_size(&self) -> u64 { + self.sizes.current_size() + } + + pub(super) fn read_data_at(&self, offset: u64, len: usize) -> Bytes { + copy_limited_slice(&self.data, offset, len) + } + + pub(super) fn data_len(&self) -> u64 { + self.data.len() as u64 + } + + pub(super) fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes { + copy_limited_slice(&self.outboard, offset, len) + } + + pub(super) fn outboard_len(&self) -> u64 { + self.outboard.len() as u64 + } + + pub(super) fn write_batch( + &mut self, + size: u64, + batch: &[BaoContentItem], + ) -> std::io::Result<()> { + let tree = BaoTree::new(ByteNum(size), IROH_BLOCK_SIZE); + for item in batch { + match item { + BaoContentItem::Parent(parent) => { + if let Some(offset) = tree.pre_order_offset(parent.node) { + let o0 = offset + .checked_mul(64) + .expect("u64 overflow multiplying to hash pair offset"); + let o1 = o0.checked_add(32).expect("u64 overflow"); + let outboard = &mut self.outboard; + outboard.write_all_at(o0, parent.pair.0.as_bytes().as_slice())?; + outboard.write_all_at(o1, parent.pair.1.as_bytes().as_slice())?; + } + } + BaoContentItem::Leaf(leaf) => { + self.sizes.write(leaf.offset.0, size); + self.data.write_all_at(leaf.offset.0, leaf.data.as_ref())?; + } + } + } + Ok(()) + } +} diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index d711a4f969..9654d3ce80 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -6,7 +6,7 @@ use range_collections::range_set::RangeSetRange; use serde::{Deserialize, Serialize}; use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime}; -use crate::{BlobFormat, Hash, HashAndFormat}; +use crate::{BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE}; pub mod io; mod mem_or_file; @@ -235,3 +235,40 @@ impl NonSend { } } } + +/// copy a limited slice from a slice as a `Bytes`. +pub(crate) fn copy_limited_slice(bytes: &[u8], offset: u64, len: usize) -> Bytes { + bytes[limited_range(offset, len, bytes.len())] + .to_vec() + .into() +} + +pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range { + if offset < buf_len as u64 { + let start = offset as usize; + let end = start.saturating_add(len).min(buf_len); + start..end + } else { + 0..0 + } +} + +/// zero copy get a limited slice from a `Bytes` as a `Bytes`. +#[allow(dead_code)] +pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes { + bytes.slice(limited_range(offset, len, bytes.len())) +} + +/// Compute raw outboard size, without the size header. +#[allow(dead_code)] +pub(crate) fn raw_outboard_size(size: u64) -> u64 { + bao_tree::io::outboard_size(size, IROH_BLOCK_SIZE) - 8 +} + +/// Compute raw outboard, without the size header. +pub(crate) fn raw_outboard(data: &[u8]) -> (Vec, Hash) { + let (mut outboard, hash) = bao_tree::io::outboard(data, IROH_BLOCK_SIZE); + // remove the size header + outboard.splice(0..8, []); + (outboard, hash.into()) +} From 7739016c5865555a7f544d60054dececb4f24f67 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 20 Mar 2024 15:21:06 +0200 Subject: [PATCH 3/5] Expose creating store with non default options --- iroh-bytes/src/store/file.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/iroh-bytes/src/store/file.rs b/iroh-bytes/src/store/file.rs index c5411e3f9d..77862e0634 100644 --- a/iroh-bytes/src/store/file.rs +++ b/iroh-bytes/src/store/file.rs @@ -426,12 +426,13 @@ impl Default for BatchOptions { /// Options for the file store. #[derive(Debug, Clone)] -pub(crate) struct Options { - path: PathOptions, +pub struct Options { + /// Path options. + pub path: PathOptions, /// Inline storage options. - inline: InlineOptions, + pub inline: InlineOptions, /// Transaction batching options. - batch: BatchOptions, + pub batch: BatchOptions, } #[derive(derive_more::Debug)] @@ -722,7 +723,8 @@ impl Store { Self::new(db_path, options).await } - async fn new(path: PathBuf, options: Options) -> io::Result { + /// Create a new store with custom options. + pub async fn new(path: PathBuf, options: Options) -> io::Result { // spawn_blocking because StoreInner::new creates directories let rt = tokio::runtime::Handle::try_current() .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?; From bf9aaa2baa8cd00fc1269172d73e8a00be9cb8b6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 20 Mar 2024 16:16:18 +0200 Subject: [PATCH 4/5] Get rid of PossiblyPartialEntry --- iroh-bytes/src/get/db.rs | 31 +++++++------ iroh-bytes/src/store/file.rs | 22 ++------- iroh-bytes/src/store/mem.rs | 29 ++++-------- iroh-bytes/src/store/readonly_mem.rs | 69 ++++++---------------------- iroh-bytes/src/store/traits.rs | 33 ++++--------- iroh/src/node/rpc.rs | 8 ++-- iroh/tests/provide.rs | 2 +- 7 files changed, 57 insertions(+), 137 deletions(-) diff --git a/iroh-bytes/src/get/db.rs b/iroh-bytes/src/get/db.rs index e763e57645..31a7afdad3 100644 --- a/iroh-bytes/src/get/db.rs +++ b/iroh-bytes/src/get/db.rs @@ -12,7 +12,6 @@ use std::io; use crate::hashseq::parse_hash_seq; use crate::store::BaoBatchWriter; -use crate::store::PossiblyPartialEntry; use crate::{ export::ExportProgress, @@ -72,8 +71,8 @@ async fn get_blob< hash: &Hash, progress: impl ProgressSender + IdGenerator, ) -> Result { - let end = match db.get_possibly_partial(hash).await? { - PossiblyPartialEntry::Complete(entry) => { + let end = match db.get_mut(hash).await? { + Some(entry) if entry.is_complete() => { tracing::info!("already got entire blob"); progress .send(DownloadProgress::FoundLocal { @@ -85,7 +84,7 @@ async fn get_blob< .await?; return Ok(Stats::default()); } - PossiblyPartialEntry::Partial(entry) => { + Some(entry) => { trace!("got partial data for {}", hash); let valid_ranges = valid_ranges::(&entry) .await @@ -117,7 +116,7 @@ async fn get_blob< get_blob_inner_partial(db, header, entry, progress).await? } - PossiblyPartialEntry::NotFound => { + None => { // full request let conn = get_conn().await.map_err(GetError::Io)?; let request = get::fsm::start(conn, GetRequest::single(*hash)); @@ -270,8 +269,11 @@ async fn get_blob_inner_partial( /// /// This will compute the valid ranges for partial blobs, so it is somewhat expensive for those. pub async fn blob_info(db: &D, hash: &Hash) -> io::Result> { - io::Result::Ok(match db.get_possibly_partial(hash).await? { - PossiblyPartialEntry::Partial(entry) => { + io::Result::Ok(match db.get_mut(hash).await? { + Some(entry) if entry.is_complete() => BlobInfo::Complete { + size: entry.size().value(), + }, + Some(entry) => { let valid_ranges = valid_ranges::(&entry) .await .ok() @@ -281,10 +283,7 @@ pub async fn blob_info(db: &D, hash: &Hash) -> io::Result BlobInfo::Complete { - size: entry.size().value(), - }, - PossiblyPartialEntry::NotFound => BlobInfo::Missing, + None => BlobInfo::Missing, }) } @@ -308,8 +307,8 @@ async fn get_hash_seq< sender: impl ProgressSender + IdGenerator, ) -> Result { use tracing::info as log; - let finishing = - if let PossiblyPartialEntry::Complete(entry) = db.get_possibly_partial(root_hash).await? { + let finishing = match db.get_mut(root_hash).await? { + Some(entry) if entry.is_complete() => { log!("already got collection - doing partial download"); // send info that we have the hashseq itself entirely sender @@ -404,7 +403,8 @@ async fn get_hash_seq< }; next = end_blob.next(); } - } else { + } + _ => { tracing::info!("don't have collection - doing full download"); // don't have the collection, so probably got nothing let conn = get_conn().await.map_err(GetError::Io)?; @@ -456,7 +456,8 @@ async fn get_hash_seq< let end_blob = get_blob_inner(db, header, sender.clone()).await?; next = end_blob.next(); } - }; + } + }; // this closes the bidi stream. Do something with the stats? let stats = finishing.next().await?; Ok(stats) diff --git a/iroh-bytes/src/store/file.rs b/iroh-bytes/src/store/file.rs index 77862e0634..e8671aad46 100644 --- a/iroh-bytes/src/store/file.rs +++ b/iroh-bytes/src/store/file.rs @@ -120,7 +120,7 @@ use self::test_support::EntryData; use super::{ bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb}, temp_name, BaoBatchWriter, BaoBlobSize, EntryStatus, ExportMode, ExportProgressCb, ImportMode, - ImportProgress, ReadableStore, TempCounterMap, ValidateProgress, + ImportProgress, Map, ReadableStore, TempCounterMap, ValidateProgress, }; /// Location of the data. @@ -1261,7 +1261,7 @@ impl From for io::Error { } } -impl crate::store::traits::Map for Store { +impl super::Map for Store { type Entry = Entry; async fn get(&self, hash: &Hash) -> io::Result> { @@ -1269,7 +1269,7 @@ impl crate::store::traits::Map for Store { } } -impl crate::store::traits::MapMut for Store { +impl super::MapMut for Store { type EntryMut = Entry; async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result { @@ -1280,20 +1280,8 @@ impl crate::store::traits::MapMut for Store { Ok(self.0.entry_status(hash).await?) } - async fn get_possibly_partial( - &self, - hash: &Hash, - ) -> io::Result> { - match self.0.get(*hash).await? { - Some(entry) => Ok({ - if entry.is_complete() { - super::PossiblyPartialEntry::Complete(entry.into()) - } else { - super::PossiblyPartialEntry::Partial(entry.into()) - } - }), - None => Ok(super::PossiblyPartialEntry::NotFound), - } + async fn get_mut(&self, hash: &Hash) -> io::Result> { + self.get(hash).await } async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> { diff --git a/iroh-bytes/src/store/mem.rs b/iroh-bytes/src/store/mem.rs index 51100d2f7f..b6afecec54 100644 --- a/iroh-bytes/src/store/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -29,7 +29,7 @@ use crate::{ }; use super::{ - temp_name, BaoBatchWriter, ExportMode, ExportProgressCb, ImportMode, ImportProgress, + temp_name, BaoBatchWriter, ExportMode, ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap, }; @@ -315,7 +315,7 @@ impl AsyncSliceReader for OutboardReader { struct BatchWriter(Arc); -impl crate::store::BaoBatchWriter for BatchWriter { +impl super::BaoBatchWriter for BatchWriter { async fn write_batch( &mut self, size: u64, @@ -329,7 +329,7 @@ impl crate::store::BaoBatchWriter for BatchWriter { } } -impl crate::store::Map for Store { +impl super::Map for Store { type Entry = Entry; async fn get(&self, hash: &Hash) -> std::io::Result> { @@ -337,9 +337,13 @@ impl crate::store::Map for Store { } } -impl crate::store::MapMut for Store { +impl super::MapMut for Store { type EntryMut = Entry; + async fn get_mut(&self, hash: &Hash) -> std::io::Result> { + self.get(hash).await + } + async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result { let entry = Entry { inner: Arc::new(EntryInner { @@ -368,23 +372,6 @@ impl crate::store::MapMut for Store { }) } - async fn get_possibly_partial( - &self, - hash: &Hash, - ) -> std::io::Result> { - Ok(match self.inner.0.read().unwrap().entries.get(hash) { - Some(entry) => { - let entry = entry.clone(); - if entry.complete { - crate::store::PossiblyPartialEntry::Complete(entry) - } else { - crate::store::PossiblyPartialEntry::Partial(entry) - } - } - None => crate::store::PossiblyPartialEntry::NotFound, - }) - } - async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> { let hash = entry.hash(); let mut inner = self.inner.0.write().unwrap(); diff --git a/iroh-bytes/src/store/readonly_mem.rs b/iroh-bytes/src/store/readonly_mem.rs index 9207c90bf0..c7740e4e15 100644 --- a/iroh-bytes/src/store/readonly_mem.rs +++ b/iroh-bytes/src/store/readonly_mem.rs @@ -10,7 +10,7 @@ use std::{ use crate::{ store::{ - EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut, MapMut, + EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut, ReadableStore, ValidateProgress, }, util::{ @@ -28,7 +28,7 @@ use futures::Stream; use iroh_io::AsyncSliceReader; use tokio::{io::AsyncWriteExt, sync::mpsc}; -use super::{BaoBatchWriter, BaoBlobSize, DbIter, ExportProgressCb, PossiblyPartialEntry}; +use super::{BaoBatchWriter, BaoBlobSize, DbIter, ExportProgressCb}; /// A readonly in memory database for iroh-bytes. /// @@ -107,7 +107,7 @@ impl Store { } /// Get the bytes associated with a hash, if they exist. - pub fn get(&self, hash: &Hash) -> Option { + pub fn get_content(&self, hash: &Hash) -> Option { let entry = self.0.get(hash)?; Some(entry.1.clone()) } @@ -136,7 +136,7 @@ impl Store { // create the directory in which the target file is tokio::fs::create_dir_all(parent).await?; let data = self - .get(&hash) + .get_content(&hash) .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?; let mut offset = 0u64; @@ -159,12 +159,6 @@ pub struct Entry { data: Bytes, } -/// The [MapEntryMut] implementation for [Store]. -/// -/// This is an unoccupied type, since [Store] is does not allow creating partial entries. -#[derive(Debug, Clone)] -pub enum EntryMut {} - impl MapEntry for Entry { fn hash(&self) -> Hash { self.outboard.root().into() @@ -198,10 +192,14 @@ impl Map for Store { } } -impl MapMut for Store { - type EntryMut = EntryMut; +impl super::MapMut for Store { + type EntryMut = Entry; - async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result { + async fn get_mut(&self, hash: &Hash) -> io::Result> { + self.get(hash).await + } + + async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result { Err(io::Error::new( io::ErrorKind::Other, "cannot create temp entry in readonly database", @@ -219,19 +217,7 @@ impl MapMut for Store { self.entry_status_sync(hash) } - async fn get_possibly_partial(&self, hash: &Hash) -> io::Result> { - // return none because we do not have partial entries - Ok(if let Some((o, d)) = self.0.get(hash) { - PossiblyPartialEntry::Complete(Entry { - outboard: o.clone(), - data: d.clone(), - }) - } else { - PossiblyPartialEntry::NotFound - }) - } - - async fn insert_complete(&self, _entry: EntryMut) -> io::Result<()> { + async fn insert_complete(&self, _entry: Entry) -> io::Result<()> { // this is unreachable, since we cannot create partial entries unreachable!() } @@ -276,36 +262,7 @@ impl ReadableStore for Store { } } -impl MapEntry for EntryMut { - fn hash(&self) -> Hash { - // this is unreachable, since EntryMut can not be created - unreachable!() - } - - fn size(&self) -> BaoBlobSize { - // this is unreachable, since EntryMut can not be created - unreachable!() - } - - #[allow(refining_impl_trait)] - async fn outboard(&self) -> io::Result { - // this is unreachable, since EntryMut can not be created - unreachable!() - } - - #[allow(refining_impl_trait)] - async fn data_reader(&self) -> io::Result { - // this is unreachable, since EntryMut can not be created - unreachable!() - } - - fn is_complete(&self) -> bool { - // this is unreachable, since EntryMut can not be created - unreachable!() - } -} - -impl MapEntryMut for EntryMut { +impl MapEntryMut for Entry { async fn batch_writer(&self) -> io::Result { enum Bar {} impl BaoBatchWriter for Bar { diff --git a/iroh-bytes/src/store/traits.rs b/iroh-bytes/src/store/traits.rs index 67b755b082..44abaf9adc 100644 --- a/iroh-bytes/src/store/traits.rs +++ b/iroh-bytes/src/store/traits.rs @@ -39,19 +39,6 @@ pub enum EntryStatus { NotFound, } -/// An entry in a store that supports partial entries. -/// -/// This correspnds to [`EntryStatus`], but also includes the entry itself. -#[derive(Debug)] -pub enum PossiblyPartialEntry { - /// A complete entry. - Complete(D::Entry), - /// A partial entry. - Partial(D::EntryMut), - /// We got nothing. - NotFound, -} - /// The size of a bao file #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum BaoBlobSize { @@ -273,6 +260,15 @@ pub trait MapMut: Map { /// An entry that is possibly writable type EntryMut: MapEntryMut; + /// Get an existing entry as an EntryMut. + /// + /// For implementations where EntryMut and Entry are the same type, this is just an alias for + /// `get`. + fn get_mut( + &self, + hash: &Hash, + ) -> impl Future>> + Send; + /// Get an existing partial entry, or create a new one. /// /// We need to know the size of the partial entry. This might produce an @@ -294,17 +290,6 @@ pub trait MapMut: Map { /// Don't count on this to be efficient. fn entry_status_sync(&self, hash: &Hash) -> io::Result; - /// Get an existing entry. - /// - /// This will return either a complete entry, a partial entry, or not found. - /// - /// This function should not block to perform io. The knowledge about - /// partial entries must be present in memory. - fn get_possibly_partial( - &self, - hash: &Hash, - ) -> impl Future>> + Send; - /// Upgrade a partial entry to a complete entry. fn insert_complete(&self, entry: Self::EntryMut) -> impl Future> + Send; diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 5739fcfd5a..61a0c3db0c 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -16,7 +16,7 @@ use iroh_bytes::BlobFormat; use iroh_bytes::{ hashseq::parse_hash_seq, provider::AddProgress, - store::{PossiblyPartialEntry, Store as BaoStore, ValidateProgress}, + store::{Store as BaoStore, ValidateProgress}, util::progress::FlumeProgressSender, HashAndFormat, }; @@ -285,10 +285,12 @@ impl Handler { let db = self.inner.db.clone(); for hash in db.partial_blobs().await? { let hash = hash?; - let Ok(PossiblyPartialEntry::Partial(entry)) = db.get_possibly_partial(&hash).await - else { + let Ok(Some(entry)) = db.get_mut(&hash).await else { continue; }; + if entry.is_complete() { + continue; + } let size = 0; let expected_size = entry.size().value(); co.yield_(Ok(BlobListIncompleteResponse { diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index b4bf2654a1..a745738e9e 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -240,7 +240,7 @@ where for (i, (expected_name, expected_hash)) in expects.iter().enumerate() { let (name, hash) = &collection[i]; let got = &children[&(i as u64)]; - let expected = mdb.get(expected_hash).unwrap(); + let expected = mdb.get_content(expected_hash).unwrap(); assert_eq!(expected_name, name); assert_eq!(expected_hash, hash); assert_eq!(expected, got); From 3f4a1fec390561c4ef81b9a148f5f71948b7c469 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 20 Mar 2024 16:20:33 +0200 Subject: [PATCH 5/5] more pub(crate) --- iroh-bytes/src/store/traits.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/iroh-bytes/src/store/traits.rs b/iroh-bytes/src/store/traits.rs index 44abaf9adc..ae93169f17 100644 --- a/iroh-bytes/src/store/traits.rs +++ b/iroh-bytes/src/store/traits.rs @@ -168,7 +168,7 @@ impl BaoBatchWriter for &mut W { /// A wrapper around a batch writer that calls a progress callback for one leaf /// per batch. #[derive(Debug)] -pub struct FallibleProgressBatchWriter(W, F); +pub(crate) struct FallibleProgressBatchWriter(W, F); impl io::Result<()> + 'static> FallibleProgressBatchWriter @@ -181,11 +181,6 @@ impl io::Result<()> + 'static> pub fn new(inner: W, on_write: F) -> Self { Self(inner, on_write) } - - /// Return the inner writer. - pub fn into_inner(self) -> W { - self.0 - } } impl io::Result<()> + 'static> BaoBatchWriter @@ -221,7 +216,7 @@ impl io::Result<()> + 'static> BaoBatchW /// This is just temporary to allow reusing the existing store implementations /// that have separate data and outboard writers. #[derive(Debug)] -pub struct CombinedBatchWriter { +pub(crate) struct CombinedBatchWriter { /// data part pub data: D, /// outboard part