Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(redb-store): Optimization for small file import in redb store #2062

Merged
merged 176 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
176 commits
Select commit Hold shift + click to select a range
be6e8ff
Add optional redb support for foundational iroh-base types
rklaehn Feb 7, 2024
6eee85c
Add explicit feature
rklaehn Feb 7, 2024
81e1f7d
Make the store traits fallible...
rklaehn Feb 7, 2024
8f7b825
Merge branch 'main' into fallible-store-traits
rklaehn Feb 7, 2024
7a226ae
Make sure we can send the error over the wire for the various list bl…
rklaehn Feb 7, 2024
76ef755
Merge branch 'main' into fallible-store-traits
rklaehn Feb 7, 2024
bed2dc3
Add a trait alias to make the types a bit nicer
rklaehn Feb 7, 2024
1f0c942
WIP
rklaehn Feb 7, 2024
18604a9
Delete old code
rklaehn Feb 7, 2024
0b293d3
Merge branch 'fallible-store-traits' into batch-write-trait
rklaehn Feb 7, 2024
19fcef2
more batch writer plumbing
rklaehn Feb 7, 2024
ad41319
Use batch writer trait
rklaehn Feb 7, 2024
e56896b
switch over to the batch writer everywhere
rklaehn Feb 7, 2024
dc8cb42
Get rid of the last remaining instances of non batch usage
rklaehn Feb 7, 2024
9d42b31
keep only the batch writing
rklaehn Feb 7, 2024
33b171c
Add debug log at warn level when entry status produces an error
rklaehn Feb 7, 2024
b6776dd
Use RpcResult<...Item> instead of having a separate error case
rklaehn Feb 8, 2024
7b47509
Merge branch 'main' into fallible-store-traits
rklaehn Feb 8, 2024
bd36031
Merge branch 'fallible-store-traits' into batch-write-trait
rklaehn Feb 8, 2024
f40e43b
Add more comments about the batch writer
rklaehn Feb 8, 2024
f413cd8
WIP
rklaehn Feb 8, 2024
1cc9747
Get rid of OutboardMut and DataWriter associated types
rklaehn Feb 8, 2024
5ae880d
Merge branch 'batch-write-trait' into bao-file
rklaehn Feb 8, 2024
8fccf11
WIP
rklaehn Feb 8, 2024
09a0c3c
flatten the streams
rklaehn Feb 8, 2024
b2972b2
Merge branch 'fallible-store-traits' into bao-file
rklaehn Feb 8, 2024
f665670
Merge branch 'fallible-store-traits' into batch-write-trait
rklaehn Feb 8, 2024
745f457
Merge branch 'batch-write-trait' into bao-file
rklaehn Feb 8, 2024
f4c6d9f
local pool handle stuff
rklaehn Feb 8, 2024
d7f553b
Add some details about the two trait methods
rklaehn Feb 8, 2024
bf4ef92
Use future join to sync in parallel
rklaehn Feb 8, 2024
2002063
Merge branch 'batch-write-trait' into bao-file
rklaehn Feb 8, 2024
6c0b408
WIP
rklaehn Feb 9, 2024
af5a293
Merge branch 'main' into bao-file
rklaehn Feb 13, 2024
3bb3dba
remove some boxing
rklaehn Feb 13, 2024
d01a248
WIP
rklaehn Feb 13, 2024
e683a16
WIP
rklaehn Feb 13, 2024
ee82c16
simplify the traits
rklaehn Feb 13, 2024
12db8c2
don't emit error level warnings to avoid breaking the tests...
rklaehn Feb 13, 2024
d386636
Some renaming:
rklaehn Feb 19, 2024
8ef2bc3
Merge branch 'main' into simplify-store-triat
rklaehn Feb 19, 2024
1cfc7b3
Merge branch 'simplify-store-triat' into bao-file
rklaehn Feb 19, 2024
ebd58f5
Merge branch 'main' into simplify-store-triat
rklaehn Feb 19, 2024
40c97f9
Merge branch 'simplify-store-triat' into bao-file
rklaehn Feb 19, 2024
18a8082
Add some more docs about the trait hierarchy
rklaehn Feb 19, 2024
6d2016e
Merge branch 'main' into simplify-store-triat
rklaehn Feb 19, 2024
9e39e42
Merge branch 'simplify-store-triat' into bao-file
rklaehn Feb 19, 2024
241f8bc
Introduce BaoBlobSize enum
rklaehn Feb 20, 2024
a31f322
Merge branch 'main' into simplify-store-triat
rklaehn Feb 20, 2024
087e8b0
Merge branch 'simplify-store-triat' into bao-file
rklaehn Feb 20, 2024
b320597
Replace old mem implementation with new one
rklaehn Feb 20, 2024
77a9e1d
Implement inlining and flip some tests over from flat to redb
rklaehn Feb 21, 2024
fdf4e16
Mark as partial WIP
rklaehn Feb 21, 2024
af9c30e
Support partial download resume
rklaehn Feb 21, 2024
aeb709b
Some more refactoring
rklaehn Feb 21, 2024
efdf524
Leave out size header for outboards in all cases
rklaehn Feb 22, 2024
c4595ae
Hack: no sync store
rklaehn Feb 22, 2024
fd5a0bd
Clean up and factor out some io stuff
rklaehn Feb 23, 2024
8cf7600
Wire up dump
rklaehn Feb 23, 2024
bc16afe
Rename paths to data, temp and meta
rklaehn Feb 23, 2024
c993831
Start with a storage actor and deferring the actual storage
rklaehn Feb 23, 2024
a5fb0cc
make error handling nicer
rklaehn Feb 23, 2024
ef73c9c
Merge branch 'main' into bao-file
rklaehn Feb 27, 2024
4da5fd3
wire up and test in cli mode
rklaehn Feb 27, 2024
2a6e620
Last commit before purging the old impl
rklaehn Feb 28, 2024
5e20cc4
redb support for tags
rklaehn Feb 28, 2024
953e03d
use redb support for tags
rklaehn Feb 28, 2024
a3b5201
fsck thingie
rklaehn Feb 28, 2024
e048de3
more fsck (part of validate)
rklaehn Feb 28, 2024
b109b02
Silence some debug logging to make cli tests pass
rklaehn Feb 28, 2024
afa5be3
Lots of tests pass
rklaehn Feb 28, 2024
b41b8ae
Fix bug in size writing
rklaehn Feb 29, 2024
bcb218f
Comment out the tricky part of the partial tests, and start working o…
rklaehn Feb 29, 2024
7f67018
import test works
rklaehn Feb 29, 2024
51be8d4
clippy
rklaehn Feb 29, 2024
65fb0c9
rewrite stress test to be for redb
rklaehn Feb 29, 2024
af29e09
Merge branch 'main' into bao-file
rklaehn Feb 29, 2024
c72b2ef
pointless clippy shit
rklaehn Feb 29, 2024
da3c957
features stuff
rklaehn Feb 29, 2024
3ebcdfe
remove flatten_to_io
rklaehn Feb 29, 2024
9c30de2
Move some garbage collection internals out of the traits and into the…
rklaehn Feb 29, 2024
671f51d
Use flume channel instead of tokio oneshot channel
rklaehn Mar 1, 2024
ce3825c
Add update_options fn
rklaehn Mar 1, 2024
819c65e
Split options into inline options and path options
rklaehn Mar 1, 2024
d28faf4
Get rid of flat-db feature
rklaehn Mar 1, 2024
90f0de6
Add stress test for import of blobs via doc import
rklaehn Mar 1, 2024
28d0644
refactor flat file import code
rklaehn Mar 1, 2024
fe96ea9
Add configurable verbosity to validate call
rklaehn Mar 1, 2024
1d65e5d
apply inline options when importing old data
rklaehn Mar 1, 2024
2310518
Add entry removal
rklaehn Mar 1, 2024
722c9d4
Some more RPITIT goodness
rklaehn Mar 1, 2024
5da2fe0
Add ExportProgressCb type alias
rklaehn Mar 4, 2024
09d99b5
Change export to change entry from owned to external...
rklaehn Mar 4, 2024
250a9fa
cargo deny
rklaehn Mar 5, 2024
78e0e64
Merge branch 'main' into bao-file
rklaehn Mar 5, 2024
9e93824
Add handle_one to handle one message.
rklaehn Mar 5, 2024
c78b28f
more cargo deny, and more import tests
rklaehn Mar 5, 2024
478ef37
import override tests
rklaehn Mar 5, 2024
6b0f48c
add tests for import_stream as well
rklaehn Mar 5, 2024
d765764
add test case for get
rklaehn Mar 5, 2024
c8b2ecb
comment out some tests that don't seem to be platform independent
rklaehn Mar 5, 2024
4611aaf
Add test for insert via partial entry
rklaehn Mar 5, 2024
fc31aab
Some renaming
rklaehn Mar 6, 2024
1132e3f
Move large chunks of code into separate modules
rklaehn Mar 6, 2024
57dede2
More moving stuff around
rklaehn Mar 6, 2024
faa3780
Try to reproduce doc import file error
rklaehn Mar 6, 2024
138c967
MOAR!
rklaehn Mar 6, 2024
ea5f7cb
better error logging
rklaehn Mar 6, 2024
306cfe6
print more info to find error
rklaehn Mar 6, 2024
98a1344
moar logging
rklaehn Mar 6, 2024
c5251a3
even moar logging
rklaehn Mar 6, 2024
49de249
add back protected hashes
rklaehn Mar 6, 2024
d7fb118
change the time when protection happens
rklaehn Mar 6, 2024
c5a9c05
Merge branch 'main' into bao-file
rklaehn Mar 7, 2024
653909d
refactor(iroh-bytes): Create helper structs for tables (#2061)
rklaehn Mar 7, 2024
454c0df
Add explicit import and export messages
rklaehn Mar 7, 2024
dff79ed
Merge branch 'main' into bao-file
rklaehn Mar 7, 2024
145433b
Add explicit call for entry_state and make the full entry_status call…
rklaehn Mar 7, 2024
e4073bc
Implement batch processing
rklaehn Mar 7, 2024
d3b5e56
Small file optimization: if a file is tiny and to be copied, don't do…
rklaehn Mar 7, 2024
bfbfdce
Batching with limits in terms of messages and duration
rklaehn Mar 7, 2024
bd13a40
Explicit call to sync to try to fix entry test.
rklaehn Mar 11, 2024
011c8fc
PR review: add comments, use async, style fixes
rklaehn Mar 11, 2024
55e02c5
Merge branch 'main' into bao-file
rklaehn Mar 11, 2024
20ac5e5
Remove unused code
rklaehn Mar 11, 2024
73a6ea1
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 11, 2024
88d04fb
Rename redb store to file store
rklaehn Mar 11, 2024
115e769
clippy
rklaehn Mar 11, 2024
078aecb
use derive_more for CompleteMemOrFileStorage Debug
rklaehn Mar 11, 2024
cba2f5d
Add feature flag instead of #[allow(dead_code)]
rklaehn Mar 11, 2024
733b619
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 11, 2024
5710104
Be more paranoid about not killing the actor loop on an io error
rklaehn Mar 11, 2024
4a8af3c
Merge branch 'main' into bao-file
rklaehn Mar 12, 2024
ab8e159
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 12, 2024
e116993
Merge branch 'main' into bao-file
rklaehn Mar 12, 2024
12392aa
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 12, 2024
7a633c1
Merge branch 'main' into bao-file
rklaehn Mar 12, 2024
bfc47cd
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 12, 2024
28c9570
defer file deletion
rklaehn Mar 12, 2024
0da1894
Merge branch 'main' into bao-file
rklaehn Mar 12, 2024
5088b2d
Name the bao redb store thread
rklaehn Mar 12, 2024
aa3736b
PR review
rklaehn Mar 12, 2024
536e984
Add iroh doctor validate-blob-store
rklaehn Mar 13, 2024
e04c304
print to stdout
rklaehn Mar 13, 2024
aa3d0f9
Add repair flag to validate
rklaehn Mar 13, 2024
ab02afa
Don't do anything in case an entry that is external is being exported to
rklaehn Mar 13, 2024
327efe1
repair
rklaehn Mar 13, 2024
51565d3
Don't do anything in case an entry that is external is being exported to
rklaehn Mar 13, 2024
5d46746
Add repair flag to validate
rklaehn Mar 13, 2024
d1c9d40
Merge branch 'repair' into bao-file
rklaehn Mar 13, 2024
5e61af4
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 13, 2024
d378035
WIP validate fix
rklaehn Mar 13, 2024
470e31c
Fix clap repair arg.
rklaehn Mar 13, 2024
55b29a6
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 13, 2024
6303e49
Better reporting when deleting orphaned files and inline data.
rklaehn Mar 13, 2024
a45c10e
First batch of PR review fixes
rklaehn Mar 13, 2024
1afd7a8
Remove available_ranges for now
rklaehn Mar 13, 2024
2097ba1
more PR review
rklaehn Mar 13, 2024
84b03ad
Merge branch 'main' into bao-file
rklaehn Mar 13, 2024
a383f32
Add more comments about the queue size etc.
rklaehn Mar 13, 2024
65853c9
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 13, 2024
7612f2c
Add more comments about the queue size etc.
rklaehn Mar 13, 2024
5bddb4e
produce warnings on size mismatches
rklaehn Mar 13, 2024
4c1f836
try reactivating some of the unhappy case tests.
rklaehn Mar 13, 2024
c56f12e
refactor(iroh-bytes): Weak entry map (#2080)
rklaehn Mar 14, 2024
23ff8d1
Merge branch 'main' into bao-file
rklaehn Mar 14, 2024
ba40d01
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 14, 2024
34d2245
PR review
rklaehn Mar 14, 2024
693a44e
make batch options configurable
rklaehn Mar 14, 2024
2c56c7a
Move some stuff into test_support module and re-enable partial cli tests
rklaehn Mar 15, 2024
e9ee2d6
Merge branch 'main' into bao-file
rklaehn Mar 15, 2024
8406e26
remove more dead code
rklaehn Mar 15, 2024
c570af5
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 15, 2024
554d6a1
Merge branch 'main' into bao-file
rklaehn Mar 19, 2024
935a082
Merge branch 'bao-file' into fast-redb-store
rklaehn Mar 19, 2024
110b204
Merge branch 'main' into fast-redb-store
rklaehn Mar 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
572 changes: 343 additions & 229 deletions iroh-bytes/src/store/file.rs

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions iroh-bytes/src/store/file/import_flat_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl ActorState {
}

let txn = db.begin_write()?;
let mut tables = Tables::new(&txn)?;
let mut delete_after_commit = Default::default();
let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
for (hash, entry) in index {
if tables.blobs.get(hash)?.is_some() {
tracing::info!("hash {} already exists in the db", hash.to_hex());
Expand All @@ -232,15 +233,15 @@ impl ActorState {
None
};
if let Err(cause) =
std::fs::rename(data_path, self.path_options.owned_data_path(&hash))
std::fs::rename(data_path, self.options.path.owned_data_path(&hash))
{
tracing::error!("failed to move data file: {}", cause);
continue;
}
if let Some(outboard_path) = outboard_path {
if let Err(cause) = copy_outboard(
&outboard_path,
&self.path_options.owned_outboard_path(&hash),
&self.options.path.owned_outboard_path(&hash),
) {
tracing::error!("failed to move outboard file: {}", cause);
continue;
Expand Down Expand Up @@ -281,7 +282,7 @@ impl ActorState {
if let Some(outboard_path) = outboard_path {
if let Err(cause) = copy_outboard(
&outboard_path,
&self.path_options.owned_outboard_path(&hash),
&self.options.path.owned_outboard_path(&hash),
) {
tracing::error!("failed to move outboard file: {}", cause);
continue;
Expand Down Expand Up @@ -321,15 +322,15 @@ impl ActorState {
None
};
if let Err(cause) =
std::fs::rename(data_path, self.path_options.owned_data_path(&hash))
std::fs::rename(data_path, self.options.path.owned_data_path(&hash))
{
tracing::error!("failed to move data file: {}", cause);
continue;
}
if let Some(outboard_path) = outboard_path {
if let Err(cause) = copy_outboard(
&outboard_path,
&self.path_options.owned_outboard_path(&hash),
&self.options.path.owned_outboard_path(&hash),
) {
tracing::error!("failed to move outboard file: {}", cause);
continue;
Expand Down
69 changes: 67 additions & 2 deletions iroh-bytes/src/store/file/tables.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! Table definitions and accessors for the redb database.
use std::collections::BTreeSet;

use redb::{ReadableTable, TableDefinition, TableError};

use iroh_base::hash::{Hash, HashAndFormat};

use super::EntryState;
use super::{EntryState, PathOptions};
use crate::util::Tag;

pub(super) const BLOBS_TABLE: TableDefinition<Hash, EntryState> = TableDefinition::new("blobs-0");
Expand Down Expand Up @@ -33,15 +35,27 @@ pub(super) struct Tables<'a, 'b> {
pub tags: redb::Table<'a, 'b, Tag, HashAndFormat>,
pub inline_data: redb::Table<'a, 'b, Hash, &'static [u8]>,
pub inline_outboard: redb::Table<'a, 'b, Hash, &'static [u8]>,
pub delete_after_commit: &'b mut DeleteSet,
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(super) enum BaoFilePart {
Outboard,
Data,
Sizes,
}

impl<'db, 'txn> Tables<'db, 'txn> {
pub fn new(tx: &'txn redb::WriteTransaction<'db>) -> std::result::Result<Self, TableError> {
pub fn new(
tx: &'txn redb::WriteTransaction<'db>,
delete_after_commit: &'txn mut DeleteSet,
) -> std::result::Result<Self, TableError> {
Ok(Self {
blobs: tx.open_table(BLOBS_TABLE)?,
tags: tx.open_table(TAGS_TABLE)?,
inline_data: tx.open_table(INLINE_DATA_TABLE)?,
inline_outboard: tx.open_table(INLINE_OUTBOARD_TABLE)?,
delete_after_commit,
})
}
}
Expand Down Expand Up @@ -95,3 +109,54 @@ impl ReadableTables for ReadOnlyTables<'_> {
&self.inline_outboard
}
}

/// Helper to keep track of files to delete after a transaction is committed.
#[derive(Debug, Default)]
pub(super) struct DeleteSet(BTreeSet<(Hash, BaoFilePart)>);

impl DeleteSet {
/// Mark a file as to be deleted after the transaction is committed.
pub fn insert(&mut self, hash: Hash, parts: impl IntoIterator<Item = BaoFilePart>) {
for part in parts {
self.0.insert((hash, part));
}
}

/// Mark a file as to be kept after the transaction is committed.
///
/// This will cancel any previous delete for the same file in the same transaction.
pub fn remove(&mut self, hash: Hash, parts: impl IntoIterator<Item = BaoFilePart>) {
for part in parts {
self.0.remove(&(hash, part));
}
}

/// Get the inner set of files to delete.
pub fn into_inner(self) -> BTreeSet<(Hash, BaoFilePart)> {
self.0
}

/// Apply the delete set and clear it.
///
/// This will delete all files marked for deletion and then clear the set.
/// Errors will just be logged.
pub fn apply_and_clear(&mut self, options: &PathOptions) {
for (hash, to_delete) in &self.0 {
tracing::info!("deleting {:?}", to_delete);
let path = match to_delete {
BaoFilePart::Data => options.owned_data_path(hash),
BaoFilePart::Outboard => options.owned_outboard_path(hash),
BaoFilePart::Sizes => options.owned_sizes_path(hash),
};
if let Err(cause) = std::fs::remove_file(&path) {
tracing::warn!(
"failed to delete {:?} {}: {}",
to_delete,
path.display(),
cause
);
}
}
self.0.clear();
}
}
18 changes: 9 additions & 9 deletions iroh-bytes/src/store/file/test_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl StoreInner {
.send_async(ActorMessage::Blobs { filter, tx })
.await?;
let blobs = rx.await?;
let res = blobs
let res = blobs?
.into_iter()
.map(|r| {
r.map(|(hash, _)| hash)
Expand All @@ -163,9 +163,9 @@ impl ActorState {
tables: &impl ReadableTables,
hash: Hash,
) -> ActorResult<Option<EntryData>> {
let data_path = self.path_options.owned_data_path(&hash);
let outboard_path = self.path_options.owned_outboard_path(&hash);
let sizes_path = self.path_options.owned_sizes_path(&hash);
let data_path = self.options.path.owned_data_path(&hash);
let outboard_path = self.options.path.owned_outboard_path(&hash);
let sizes_path = self.options.path.owned_sizes_path(&hash);
let entry = match tables.blobs().get(hash)? {
Some(guard) => match guard.value() {
EntryState::Complete {
Expand Down Expand Up @@ -243,9 +243,9 @@ impl ActorState {
hash: Hash,
entry: Option<EntryData>,
) -> ActorResult<()> {
let data_path = self.path_options.owned_data_path(&hash);
let outboard_path = self.path_options.owned_outboard_path(&hash);
let sizes_path = self.path_options.owned_sizes_path(&hash);
let data_path = self.options.path.owned_data_path(&hash);
let outboard_path = self.options.path.owned_outboard_path(&hash);
let sizes_path = self.options.path.owned_sizes_path(&hash);
// tabula rasa
std::fs::remove_file(&outboard_path).ok();
std::fs::remove_file(&data_path).ok();
Expand All @@ -260,15 +260,15 @@ impl ActorState {
let entry = match entry {
EntryData::Complete { data, outboard } => {
let data_size = data.len() as u64;
let data_location = if data_size > self.inline_options.max_data_inlined {
let data_location = if data_size > self.options.inline.max_data_inlined {
std::fs::write(data_path, &data)?;
DataLocation::Owned(data_size)
} else {
tables.inline_data.insert(hash, data.as_slice())?;
DataLocation::Inline(())
};
let outboard_size = outboard.len() as u64;
let outboard_location = if outboard_size > self.inline_options.max_outboard_inlined
let outboard_location = if outboard_size > self.options.inline.max_outboard_inlined
{
std::fs::write(outboard_path, &outboard)?;
OutboardLocation::Owned
Expand Down
2 changes: 2 additions & 0 deletions iroh-bytes/src/store/file/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async fn create_test_db() -> (tempfile::TempDir, Store) {
let db_path = testdir.path().join("db.redb");
let options = Options {
path: PathOptions::new(testdir.path()),
batch: Default::default(),
inline: Default::default(),
};
let db = Store::new(db_path, options).await.unwrap();
Expand Down Expand Up @@ -803,6 +804,7 @@ async fn actor_store_smoke() {
let db_path = testdir.path().join("test.redb");
let options = Options {
path: PathOptions::new(testdir.path()),
batch: Default::default(),
inline: Default::default(),
};
let db = Store::new(db_path, options).await.unwrap();
Expand Down
118 changes: 118 additions & 0 deletions iroh-bytes/src/store/file/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
fs::OpenOptions,
io::{self, Write},
path::Path,
time::{Duration, Instant},
};

/// A reader that calls a callback with the number of bytes read after each read.
Expand Down Expand Up @@ -63,3 +64,120 @@ pub fn read_and_remove(path: &Path) -> io::Result<Vec<u8>> {
std::fs::remove_file(path)?;
Ok(data)
}

/// A wrapper for a flume receiver that allows peeking at the next message.
#[derive(Debug)]
pub(super) struct PeekableFlumeReceiver<T> {
msg: Option<T>,
recv: flume::Receiver<T>,
}

#[allow(dead_code)]
impl<T> PeekableFlumeReceiver<T> {
pub fn new(recv: flume::Receiver<T>) -> Self {
Self { msg: None, recv }
}

/// Peek at the next message.
///
/// Will block if there are no messages.
/// Returns None only if there are no more messages (sender is dropped).
pub fn peek(&mut self) -> Option<&T> {
if self.msg.is_none() {
self.msg = self.recv.recv().ok();
}
self.msg.as_ref()
}

/// Receive the next message.
///
/// Will block if there are no messages.
/// Returns None only if there are no more messages (sender is dropped).
pub fn recv(&mut self) -> Option<T> {
if let Some(msg) = self.msg.take() {
return Some(msg);
}
self.recv.recv().ok()
}

/// Try to peek at the next message.
///
/// Will not block.
/// Returns None if reading would block, or if there are no more messages (sender is dropped).
pub fn try_peek(&mut self) -> Option<&T> {
if self.msg.is_none() {
self.msg = self.recv.try_recv().ok();
}
self.msg.as_ref()
}

/// Try to receive the next message.
///
/// Will not block.
/// Returns None if reading would block, or if there are no more messages (sender is dropped).
pub fn try_recv(&mut self) -> Option<T> {
if let Some(msg) = self.msg.take() {
return Some(msg);
}
self.recv.try_recv().ok()
}

pub fn recv_timeout(&mut self, timeout: std::time::Duration) -> Option<T> {
if let Some(msg) = self.msg.take() {
return Some(msg);
}
self.recv.recv_timeout(timeout).ok()
}

/// Create an iterator that pulls messages from the receiver for at most
/// `count` messages or `max_duration` time.
pub fn batch_iter(&mut self, count: usize, max_duration: Duration) -> BatchIter<T> {
BatchIter::new(self, count, max_duration)
}

/// Push back a message. This will only work if there is room for it.
/// Otherwise, it will fail and return the message.
pub fn push_back(&mut self, msg: T) -> std::result::Result<(), T> {
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
if self.msg.is_none() {
self.msg = Some(msg);
Ok(())
} else {
Err(msg)
}
}
}

pub(super) struct BatchIter<'a, T> {
recv: &'a mut PeekableFlumeReceiver<T>,
start: Instant,
remaining: usize,
max_duration: Duration,
}

impl<'a, T> BatchIter<'a, T> {
fn new(recv: &'a mut PeekableFlumeReceiver<T>, count: usize, max_duration: Duration) -> Self {
Self {
recv,
start: Instant::now(),
remaining: count,
max_duration,
}
}
}

impl<'a, T> Iterator for BatchIter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
let elapsed = self.start.elapsed();
if elapsed >= self.max_duration {
return None;
}
let remaining_time = self.max_duration - elapsed;
self.remaining -= 1;
self.recv.recv_timeout(remaining_time)
}
}
Loading
Loading