From b8155763dbbd0fa7be2c2ca4047e7f241e881a0c Mon Sep 17 00:00:00 2001 From: Kasey Date: Thu, 12 Oct 2023 12:35:11 -0400 Subject: [PATCH 1/5] fix: `doc export` exports the latest entry at a given key (#1629) ## Description We currently only export via the latest key & the author id, but in the CLI and console, we don't currently have a way of viewing other authors. This needs to be fixed asap for demo purposes, and we should have a longer discussion about how we expose the other peers' author ids to the CLI and console: /~https://github.com/n0-computer/iroh/issues/1628 ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. --- iroh/src/commands/sync.rs | 50 +++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/iroh/src/commands/sync.rs b/iroh/src/commands/sync.rs index 36c8d65c03..747e50e6be 100644 --- a/iroh/src/commands/sync.rs +++ b/iroh/src/commands/sync.rs @@ -175,7 +175,7 @@ pub enum DocCommands { #[clap(short, long)] in_place: bool, }, - /// Export data from a document + /// Export the most recent data for a key from a document Export { /// Document to operate on. /// @@ -183,13 +183,9 @@ pub enum DocCommands { /// Within the Iroh console, the active document can also be set with `doc switch`. #[clap(short, long)] doc: Option, - /// Author of the entry. - /// - /// Required unless the author is set through the IROH_AUTHOR environment variable. - /// Within the Iroh console, the active author can also be set with `author switch`. - #[clap(short, long)] - author: Option, /// Key to the entry (parsed as UTF-8 string) + /// + /// When just the key is present, will export the latest entry for that key. key: String, /// Path to export to #[clap(short, long)] @@ -423,21 +419,19 @@ impl DocCommands { import_coordinator(doc, author, root_prefix, prefix, stream, size, files).await?; println!("Success! ({})", HumanDuration(start.elapsed())); } - Self::Export { - doc, - author, - key, - out, - } => { + Self::Export { doc, key, out } => { let doc = get_doc(iroh, env, doc).await?; - let author = env.author(author)?; let key_str = key.clone(); let key = key.as_bytes().to_vec(); let path: PathBuf = canonicalize_path(&out)?; - let entry = doc - .get_one(author, key) - .await? - .ok_or_else(|| anyhow!(""))?; + let stream = doc.get_many(GetFilter::Key(key)).await?; + let entry = match get_latest(stream).await? { + None => { + println!(""); + return Ok(()); + } + Some(e) => e, + }; match doc.read(&entry).await { Ok(mut content) => { if let Some(dir) = path.parent() { @@ -857,3 +851,23 @@ impl ImportProgressBar { self.mp.clear().ok(); } } + +/// Get the latest entry for a key. If `None`, then an entry of the given key +/// could not be found. +async fn get_latest(stream: impl Stream>) -> Result> { + let entry = stream + .try_fold(None, |acc: Option, cur: Entry| async move { + match acc { + None => Ok(Some(cur)), + Some(prev) => { + if cur.timestamp() > prev.timestamp() { + Ok(Some(cur)) + } else { + Ok(Some(prev)) + } + } + } + }) + .await?; + Ok(entry) +} From 550303cfaef0a16968e8992f1f04415325d71c22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 12 Oct 2023 20:32:29 +0300 Subject: [PATCH 2/5] fix(iroh-bytes): handle case of 0 sent bytes in send stats (#1625) before it would panic when 0 bytes were sent. /~https://github.com/n0-computer/iroh/issues/1624 ## Description use checked_div ## Notes & open questions ## Change checklist - [x] Self-review. - [x] ~Documentation updates if relevant.~ - [x] ~Tests if relevant.~ --- iroh-bytes/src/provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-bytes/src/provider.rs b/iroh-bytes/src/provider.rs index 8781b6fd9d..21ad4c2f7e 100644 --- a/iroh-bytes/src/provider.rs +++ b/iroh-bytes/src/provider.rs @@ -532,7 +532,7 @@ impl ResponseWriter { let other_duration = total_duration .saturating_sub(send_duration) .saturating_sub(read_duration); - let avg_send_size = total_sent_bytes / send.stats.count; + let avg_send_size = total_sent_bytes.checked_div(send.stats.count).unwrap_or(0); info!( "sent {} bytes in {}s", total_sent_bytes, From d187827ef28bbe8ec67b0ac3a1f506e495fc2f50 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Thu, 12 Oct 2023 19:48:48 +0200 Subject: [PATCH 3/5] chore(derp): update default derpers (#1622) ## Description We have a new set of derpers that are now on a more properly managed stack. We should default to those and will aim to keep them fresh. ## Notes & open questions ## Change checklist - [ ] Self-review. - [ ] Documentation updates if relevant. - [ ] Tests if relevant. --- iroh-net/src/defaults.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iroh-net/src/defaults.rs b/iroh-net/src/defaults.rs index 544dc0be73..2861c412bc 100644 --- a/iroh-net/src/defaults.rs +++ b/iroh-net/src/defaults.rs @@ -3,16 +3,16 @@ use crate::derp::{DerpMap, DerpNode, DerpRegion, UseIpv4, UseIpv6}; /// Hostname of the default NA Derp. -pub const NA_DERP_HOSTNAME: &str = "derp.iroh.network."; +pub const NA_DERP_HOSTNAME: &str = "use1-1.derp.iroh.network."; /// IPv4 of the default NA Derp. -pub const NA_DERP_IPV4: std::net::Ipv4Addr = std::net::Ipv4Addr::new(35, 175, 99, 113); +pub const NA_DERP_IPV4: std::net::Ipv4Addr = std::net::Ipv4Addr::new(34, 207, 161, 128); /// NA region id pub const NA_REGION_ID: u16 = 1; /// Hostname of the default EU Derp. -pub const EU_DERP_HOSTNAME: &str = "eu1.derp.iroh.network."; +pub const EU_DERP_HOSTNAME: &str = "euw1-1.derp.iroh.network."; /// IPv4 of the default EU Derp. -pub const EU_DERP_IPV4: std::net::Ipv4Addr = std::net::Ipv4Addr::new(52, 30, 229, 248); +pub const EU_DERP_IPV4: std::net::Ipv4Addr = std::net::Ipv4Addr::new(34, 253, 75, 5); /// EU region id pub const EU_REGION_ID: u16 = 2; From ef8c64b79c480ee6f133f1bb0386aea3c1686576 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Thu, 12 Oct 2023 19:51:49 +0200 Subject: [PATCH 4/5] fix: actually transfer newer entries for identical keys (#1630) ## Description We had a bug that just was never tested for: When both peers have an entry with identical ranger keys (i.e. identical namespace, author and key), when peer A received an entry from peer B, it would never send their matching entry, even if their entry has a higher timestamp. This PR fixes this! The actual fix is a single line: /~https://github.com/n0-computer/iroh/pull/1630/files#diff-be755583e5a892a8b9a6329e59cfc17e9633447fb7992db74eef670ba1508ccbR378 In addition, the PR adds both a manual test for this and a proptest strategy, which are both properly failing without the change. It also renames the prefix methods on the ranger store trait that were introduced in #1535 to have a clearer naming (which helped me while tracking this down), and improves naming and logic around the prefix in other places. ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. --- iroh-sync/proptest-regressions/ranger.txt | 1 + iroh-sync/src/ranger.rs | 185 +++++++++++++++------- iroh-sync/src/store/fs.rs | 7 +- iroh-sync/src/store/memory.rs | 7 +- iroh-sync/src/sync.rs | 2 +- 5 files changed, 134 insertions(+), 68 deletions(-) diff --git a/iroh-sync/proptest-regressions/ranger.txt b/iroh-sync/proptest-regressions/ranger.txt index 5986c689de..8e8177c0e4 100644 --- a/iroh-sync/proptest-regressions/ranger.txt +++ b/iroh-sync/proptest-regressions/ranger.txt @@ -6,3 +6,4 @@ # everyone who runs the test benefits from these saved cases. cc 797e83179f8684388880e25a6fac7b4047eb15b03c55c1fb725b82bdbd0a4369 # shrinks to a = {TestKey("3"): ()}, b = {TestKey(""): (), TestKey("3"): (), TestKey("4"): (), TestKey("5"): (), TestKey("a"): (), TestKey("b"): (), TestKey("c"): ()} cc f5b7604319ead6181c2ff42e53f05e2c6f0298adf0b38ea4ae4710c43abb7663 # shrinks to input = _SimpleStoreSyncArgs { alice: [(3, ()), (a, ())], bob: [(, ()), (0, ()), (b, ())] } +cc 41d9d33f002235dfe4bed83621fe79348725bbe00931451782025d98c1b81522 # shrinks to input = _SimpleStoreSyncU8Args { alice: [("", 58)], bob: [("", 0)] } diff --git a/iroh-sync/src/ranger.rs b/iroh-sync/src/ranger.rs index 96d0fade14..446b7613bc 100644 --- a/iroh-sync/src/ranger.rs +++ b/iroh-sync/src/ranger.rs @@ -44,7 +44,7 @@ pub trait RangeKey: Sized + Debug + Ord + PartialEq + Clone + 'static { fn is_prefix_of(&self, other: &Self) -> bool; /// Returns true if `other` is a prefix of `self`. - fn has_prefix(&self, other: &Self) -> bool { + fn is_prefixed_by(&self, other: &Self) -> bool { other.is_prefix_of(self) } } @@ -249,10 +249,10 @@ pub trait Store: Sized { fn get_range(&self, range: Range) -> Result, Self::Error>; /// Returns all entries whose key starts with the given `prefix`. - fn get_prefix(&self, prefix: &E::Key) -> Result, Self::Error>; + fn prefixed_by(&self, prefix: &E::Key) -> Result, Self::Error>; /// Returns all entries that share a prefix with `key`, including the entry for `key` itself. - fn get_with_parents(&self, key: &E::Key) -> Result, Self::Error>; + fn prefixes_of(&self, key: &E::Key) -> Result, Self::Error>; /// Get all entries in the store fn all(&self) -> Result, Self::Error>; @@ -366,15 +366,18 @@ where None } else { Some( + // we get the range of the item form our store. from this set, we remove all + // entries that whose key is contained in the peer's set and where our value is + // lower than the peer entry's value. self.store .get_range(range.clone())? - .filter_map(|existing| match existing { - Ok(existing) => { - if !values - .iter() - .any(|(entry, _)| existing.key() == entry.key()) - { - Some(Ok(existing)) + .filter_map(|our_entry| match our_entry { + Ok(our_entry) => { + if !values.iter().any(|(their_entry, _)| { + our_entry.key() == their_entry.key() + && their_entry.value() >= our_entry.value() + }) { + Some(Ok(our_entry)) } else { None } @@ -563,15 +566,15 @@ where /// Returns `true` if the entry was inserted. /// Returns `false` if it was not inserted. pub fn put(&mut self, entry: E) -> Result { - let parents = self.store.get_with_parents(entry.key())?; + let prefix_entry = self.store.prefixes_of(entry.key())?; // First we check if our entry is strictly greater than all parent elements. // From the willow spec: // "Remove all entries whose timestamp is strictly less than the timestamp of any other entry [..] // whose path is a prefix of p." and then "remove all but those whose record has the greatest hash component". // This is the contract of the `Ord` impl for `E::Value`. - for e in parents { - let e = e?; - if entry.value() < e.value() { + for prefix_entry in prefix_entry { + let prefix_entry = prefix_entry?; + if entry.value() <= prefix_entry.value() { return Ok(InsertOutcome::NotInserted); } } @@ -579,7 +582,7 @@ where // Now we remove all entries that have our key as a prefix and are older than our entry. let removed = self .store - .remove_prefix_filtered(entry.key(), |value| value < entry.value())?; + .remove_prefix_filtered(entry.key(), |value| entry.value() >= value)?; // Insert our new entry. self.store.put(entry)?; @@ -659,17 +662,18 @@ mod tests { impl RangeKey for &'static str { fn is_prefix_of(&self, other: &Self) -> bool { - self.starts_with(other) + other.starts_with(self) } } impl RangeKey for String { fn is_prefix_of(&self, other: &Self) -> bool { - self.starts_with(other) + other.starts_with(self) } } impl RangeValue for &'static [u8] {} impl RangeValue for i32 {} + impl RangeValue for u8 {} impl RangeValue for () {} impl Store<(K, V)> for SimpleStore @@ -745,7 +749,7 @@ mod tests { } // TODO: Not horrible. - fn get_with_parents(&self, key: &K) -> Result, Self::Error> { + fn prefixes_of(&self, key: &K) -> Result, Self::Error> { let mut res = vec![]; for (k, v) in self.data.iter() { if k.is_prefix_of(key) { @@ -755,7 +759,7 @@ mod tests { Ok(res.into_iter()) } - fn get_prefix(&self, prefix: &K) -> Result, Self::Error> { + fn prefixed_by(&self, prefix: &K) -> Result, Self::Error> { let iter = self.data.iter(); Ok(SimpleRangeIterator { iter, @@ -841,7 +845,6 @@ mod tests { assert_eq!(res.bob_to_alice[0].parts.len(), 2); assert!(res.bob_to_alice[0].parts[0].is_range_fingerprint()); assert!(res.bob_to_alice[0].parts[1].is_range_fingerprint()); - // Last response from Alice assert_eq!(res.alice_to_bob[1].parts.len(), 3); assert!(res.alice_to_bob[1].parts[0].is_range_fingerprint()); @@ -940,6 +943,16 @@ mod tests { assert_eq!(res.bob_to_alice.len(), 1, "B -> A message count"); } + #[test] + fn test_equal_key_higher_value() { + let alice_set = [("foo", 2)]; + let bob_set = [("foo", 1)]; + + let res = sync(&alice_set, &bob_set); + assert_eq!(res.alice_to_bob.len(), 2, "A -> B message count"); + assert_eq!(res.bob_to_alice.len(), 1, "B -> A message count"); + } + #[test] fn test_multikey() { /// Uses the blanket impl of [`RangeKey]` for `T: AsRef<[u8]>` in this module. @@ -1176,13 +1189,23 @@ mod tests { sync_with_validate_cb_and_assert(alice_set, bob_set, &alice_validate_cb, &bob_validate_cb) } - fn insert_if_larger(map: &mut BTreeMap, k: K, v: V) { - let insert = match map.get(&k) { - None => true, - Some(v2) => v > *v2, - }; + fn insert_if_larger(map: &mut BTreeMap, key: K, value: V) { + let mut insert = true; + for (k, v) in map.iter() { + if k.is_prefix_of(&key) && v >= &value { + insert = false; + } + } if insert { - map.insert(k, v); + #[allow(clippy::needless_bool)] + map.retain(|k, v| { + if key.is_prefix_of(k) && value >= *v { + false + } else { + true + } + }); + map.insert(key, value); } } @@ -1198,37 +1221,58 @@ mod tests { F1: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, F2: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, { - let mut expected_set = BTreeMap::new(); - let mut alice = Peer::<(K, V), SimpleStore>::default(); - for e in alice_set { - alice.put(e.clone()).unwrap(); - insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); - } - let mut bob = Peer::<(K, V), SimpleStore>::default(); - for e in bob_set { - bob.put(e.clone()).unwrap(); - insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); - } - let res = sync_exchange_messages(alice, bob, alice_validate_cb, bob_validate_cb, 100); + let expected_set = { + let mut expected_set = BTreeMap::new(); + let mut alice_expected = BTreeMap::new(); + for e in alice_set { + alice.put(e.clone()).unwrap(); + insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); + insert_if_larger(&mut alice_expected, e.0.clone(), e.1.clone()); + } + let alice_expected = alice_expected.into_iter().collect::>(); + let alice_now: Vec<_> = alice.all().unwrap().collect::>().unwrap(); + assert_eq!( + alice_expected, alice_now, + "alice initial set does not match" + ); - res.print_messages(); + let mut bob_expected = BTreeMap::new(); + for e in bob_set { + bob.put(e.clone()).unwrap(); + insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); + insert_if_larger(&mut bob_expected, e.0.clone(), e.1.clone()); + } + let bob_expected = bob_expected.into_iter().collect::>(); + let bob_now: Vec<_> = bob.all().unwrap().collect::>().unwrap(); + assert_eq!(bob_expected, bob_now, "bob initial set does not match"); + + expected_set.into_iter().collect::>() + }; + + let res = sync_exchange_messages(alice, bob, alice_validate_cb, bob_validate_cb, 100); let alice_now: Vec<_> = res.alice.all().unwrap().collect::>().unwrap(); - assert_eq!( - alice_now.into_iter().collect::>(), - expected_set.clone().into_iter().collect::>(), - "alice" - ); + if alice_now != expected_set { + res.print_messages(); + println!("alice_init: {alice_set:?}"); + println!("bob_init: {bob_set:?}"); + println!("expected: {expected_set:?}"); + println!("alice_now: {alice_now:?}"); + panic!("alice_now does not match expected"); + } let bob_now: Vec<_> = res.bob.all().unwrap().collect::>().unwrap(); - assert_eq!( - bob_now.into_iter().collect::>(), - expected_set.into_iter().collect::>(), - "bob" - ); + if bob_now != expected_set { + res.print_messages(); + println!("alice_init: {alice_set:?}"); + println!("bob_init: {bob_set:?}"); + println!("expected: {expected_set:?}"); + println!("bob_now: {bob_now:?}"); + panic!("bob_now does not match expected"); + } // Check that values were never sent twice let mut alice_sent = BTreeMap::new(); @@ -1374,18 +1418,30 @@ mod tests { assert_eq!(excluded[3].0, "hog"); } - type TestSet = BTreeMap; + type TestSetStringUnit = BTreeMap; + type TestSetStringU8 = BTreeMap; fn test_key() -> impl Strategy { "[a-z0-9]{0,5}" } - fn test_set() -> impl Strategy { + fn test_set_string_unit() -> impl Strategy { prop::collection::btree_map(test_key(), Just(()), 0..10) } - fn test_vec() -> impl Strategy> { - test_set().prop_map(|m| m.into_iter().collect::>()) + fn test_set_string_u8() -> impl Strategy { + prop::collection::btree_map(test_key(), test_value_u8(), 0..10) + } + + fn test_value_u8() -> impl Strategy { + 0u8..u8::MAX + } + + fn test_vec_string_unit() -> impl Strategy> { + test_set_string_unit().prop_map(|m| m.into_iter().collect::>()) + } + fn test_vec_string_u8() -> impl Strategy> { + test_set_string_u8().prop_map(|m| m.into_iter().collect::>()) } fn test_range() -> impl Strategy> { @@ -1393,7 +1449,7 @@ mod tests { (test_key(), test_key()).prop_map(|(x, y)| Range::new(x, y)) } - fn mk_test_set(values: impl IntoIterator>) -> TestSet { + fn mk_test_set(values: impl IntoIterator>) -> TestSetStringUnit { values .into_iter() .map(|v| v.as_ref().to_string()) @@ -1412,6 +1468,13 @@ mod tests { let _res = sync(&alice, &bob); } + #[test] + fn simple_store_sync_x() { + let alice = mk_test_vec(["1", "3"]); + let bob = mk_test_vec(["2"]); + let _res = sync(&alice, &bob); + } + #[test] fn simple_store_sync_2() { let alice = mk_test_vec(["1", "3"]); @@ -1428,8 +1491,16 @@ mod tests { #[proptest] fn simple_store_sync( - #[strategy(test_vec())] alice: Vec<(String, ())>, - #[strategy(test_vec())] bob: Vec<(String, ())>, + #[strategy(test_vec_string_unit())] alice: Vec<(String, ())>, + #[strategy(test_vec_string_unit())] bob: Vec<(String, ())>, + ) { + let _res = sync(&alice, &bob); + } + + #[proptest] + fn simple_store_sync_u8( + #[strategy(test_vec_string_u8())] alice: Vec<(String, u8)>, + #[strategy(test_vec_string_u8())] bob: Vec<(String, u8)>, ) { let _res = sync(&alice, &bob); } @@ -1466,7 +1537,7 @@ mod tests { #[proptest] fn simple_store_get_ranges( - #[strategy(test_set())] contents: BTreeMap, + #[strategy(test_set_string_unit())] contents: BTreeMap, #[strategy(test_range())] range: Range, ) { let (expected, actual) = store_get_ranges_test::, _>(contents, range); diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index bae2c3f751..e608080449 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -648,10 +648,7 @@ impl crate::ranger::Store for StoreInstance { } type ParentIterator<'a> = ParentIterator<'a>; - fn get_with_parents( - &self, - id: &RecordIdentifier, - ) -> Result, Self::Error> { + fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { ParentIterator::create( &self.store.db, id.namespace(), @@ -660,7 +657,7 @@ impl crate::ranger::Store for StoreInstance { ) } - fn get_prefix(&self, prefix: &RecordIdentifier) -> Result> { + fn prefixed_by(&self, prefix: &RecordIdentifier) -> Result> { let start = prefix.as_byte_tuple(); let end = prefix_range_end(&start); let iter = RangeIterator::with_range( diff --git a/iroh-sync/src/store/memory.rs b/iroh-sync/src/store/memory.rs index 7a1b183dd9..baf6742528 100644 --- a/iroh-sync/src/store/memory.rs +++ b/iroh-sync/src/store/memory.rs @@ -516,10 +516,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { // TODO: Not horrible. type ParentIterator<'a> = std::vec::IntoIter>; - fn get_with_parents( - &self, - id: &RecordIdentifier, - ) -> Result, Self::Error> { + fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { let mut entries = vec![]; let mut key = id.key().to_vec(); while !key.is_empty() { @@ -534,7 +531,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { Ok(entries.into_iter()) } - fn get_prefix( + fn prefixed_by( &self, prefix: &RecordIdentifier, ) -> std::result::Result, Self::Error> { diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index bbd5092c85..e85c4626fa 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -775,7 +775,7 @@ impl Debug for RecordIdentifier { impl RangeKey for RecordIdentifier { fn is_prefix_of(&self, other: &Self) -> bool { - self.as_bytes().starts_with(other.as_bytes()) + other.as_bytes().starts_with(self.as_bytes()) } } From 14c4497309b084c06f7be185b3297fd99dfc1f90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 12 Oct 2023 21:04:26 +0300 Subject: [PATCH 5/5] Move bao stores (#1626) ## Description Move the bao store implementations to iroh-bytes, and a number of smaller cleanups. ## Notes & open questions Should we do a reexport of the 2 stores somewhere in iroh with a full name, like `DocStore` and `BaoStore`? ~@divagant-martian : should the downloader move as well?~ nope, it is using iroh-net, and we don't want an iroh-net dependency in iroh-bytes. ## Change checklist - [ ] Self-review. - [ ] Documentation updates if relevant. - [ ] Tests if relevant. --- Cargo.lock | 2 +- iroh-bytes/Cargo.toml | 6 +- iroh-bytes/src/get.rs | 2 +- iroh-bytes/src/hashseq.rs | 2 +- iroh-bytes/src/lib.rs | 4 +- iroh-bytes/src/protocol.rs | 2 +- iroh-bytes/src/provider.rs | 2 +- iroh/src/baomap.rs => iroh-bytes/src/store.rs | 41 +++++---- .../baomap => iroh-bytes/src/store}/flat.rs | 24 ++--- .../baomap => iroh-bytes/src/store}/mem.rs | 37 +++----- .../src/store}/readonly_mem.rs | 24 ++--- iroh-bytes/src/{baomap.rs => store/traits.rs} | 84 +---------------- iroh-bytes/src/util.rs | 91 +++++++++++++++++-- iroh/Cargo.toml | 3 +- iroh/examples/client.rs | 2 +- iroh/examples/collection.rs | 4 +- iroh/examples/hello-world.rs | 4 +- iroh/examples/rpc.rs | 6 +- iroh/examples/sync.rs | 16 ++-- iroh/src/client.rs | 6 +- iroh/src/collection.rs | 8 +- iroh/src/commands.rs | 3 +- iroh/src/commands/add.rs | 4 +- iroh/src/commands/get.rs | 8 +- iroh/src/commands/node.rs | 13 ++- iroh/src/commands/sync.rs | 8 +- iroh/src/commands/validate.rs | 2 +- iroh/src/dial.rs | 3 +- iroh/src/downloader.rs | 7 +- iroh/src/downloader/get.rs | 9 +- iroh/src/get.rs | 7 +- iroh/src/lib.rs | 1 - iroh/src/node.rs | 36 ++++---- iroh/src/rpc_protocol.rs | 13 ++- iroh/src/sync_engine.rs | 2 +- iroh/src/sync_engine/live.rs | 12 +-- iroh/src/sync_engine/rpc.rs | 2 +- iroh/src/util/path.rs | 6 +- iroh/tests/cli.rs | 12 +-- iroh/tests/gc.rs | 37 ++++---- iroh/tests/provide.rs | 18 ++-- iroh/tests/sync.rs | 10 +- 42 files changed, 285 insertions(+), 298 deletions(-) rename iroh/src/baomap.rs => iroh-bytes/src/store.rs (61%) rename {iroh/src/baomap => iroh-bytes/src/store}/flat.rs (99%) rename {iroh/src/baomap => iroh-bytes/src/store}/mem.rs (96%) rename {iroh/src/baomap => iroh-bytes/src/store}/readonly_mem.rs (97%) rename iroh-bytes/src/{baomap.rs => store/traits.rs} (88%) diff --git a/Cargo.lock b/Cargo.lock index 43fa5e0e36..6b8ad2a7fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2009,7 +2009,6 @@ dependencies = [ "quinn", "rand", "range-collections", - "reflink-copy", "regex", "rustyline", "serde", @@ -2067,6 +2066,7 @@ dependencies = [ "quinn", "rand", "range-collections", + "reflink-copy", "self_cell", "serde", "serde-error", diff --git a/iroh-bytes/Cargo.toml b/iroh-bytes/Cargo.toml index 673c15c20b..4376837f23 100644 --- a/iroh-bytes/Cargo.toml +++ b/iroh-bytes/Cargo.toml @@ -30,13 +30,14 @@ postcard = { version = "1", default-features = false, features = ["alloc", "use- quinn = "0.10" rand = "0.8" range-collections = "0.4.0" +reflink-copy = { version = "0.1.8", optional = true } self_cell = "1.0.1" serde = { version = "1", features = ["derive"] } serde-error = "0.1.2" smallvec = { version = "1.10.0", features = ["serde", "const_new"] } subtle = "2.4" thiserror = "1" -tokio = { version = "1", features = [] } +tokio = { version = "1" } tokio-util = { version = "0.7", features = ["io-util", "io", "rt"] } tracing = "0.1" tracing-futures = "0.2.5" @@ -49,4 +50,5 @@ serde_test = "1.0.176" tokio = { version = "1", features = ["macros", "test-util"] } [features] -default = [] +default = ["flat-db"] +flat-db = ["reflink-copy", "tokio/fs"] diff --git a/iroh-bytes/src/get.rs b/iroh-bytes/src/get.rs index 012e67fd5a..185f852deb 100644 --- a/iroh-bytes/src/get.rs +++ b/iroh-bytes/src/get.rs @@ -14,7 +14,7 @@ use std::error::Error; use std::fmt::{self, Debug}; use std::time::{Duration, Instant}; -use crate::util::Hash; +use crate::Hash; use anyhow::Result; use bao_tree::io::fsm::BaoContentItem; use bao_tree::ChunkNum; diff --git a/iroh-bytes/src/hashseq.rs b/iroh-bytes/src/hashseq.rs index a7c757f5cd..2ed440c1c5 100644 --- a/iroh-bytes/src/hashseq.rs +++ b/iroh-bytes/src/hashseq.rs @@ -1,5 +1,5 @@ //! traits related to collections of blobs -use crate::util::Hash; +use crate::Hash; use bytes::Bytes; use iroh_io::{AsyncSliceReader, AsyncSliceReaderExt}; use std::{fmt::Debug, io}; diff --git a/iroh-bytes/src/lib.rs b/iroh-bytes/src/lib.rs index 1f53ddcde1..ff2a547d51 100644 --- a/iroh-bytes/src/lib.rs +++ b/iroh-bytes/src/lib.rs @@ -3,14 +3,14 @@ #![deny(missing_docs, rustdoc::broken_intra_doc_links)] #![recursion_limit = "256"] -pub mod baomap; pub mod get; pub mod hashseq; pub mod protocol; pub mod provider; +pub mod store; pub mod util; -pub use crate::util::Hash; +pub use crate::util::{BlobFormat, Hash, HashAndFormat, Tag, TempTag}; use bao_tree::BlockSize; /// Block size used by iroh, 2^4*1024 = 16KiB diff --git a/iroh-bytes/src/protocol.rs b/iroh-bytes/src/protocol.rs index 3fbb459079..ce532071e6 100644 --- a/iroh-bytes/src/protocol.rs +++ b/iroh-bytes/src/protocol.rs @@ -356,7 +356,7 @@ use serde::{Deserialize, Serialize}; mod range_spec; pub use range_spec::{NonEmptyRequestRangeSpecIter, RangeSpec, RangeSpecSeq}; -use crate::util::Hash; +use crate::Hash; /// Maximum message size is limited to 100MiB for now. pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024 * 100; diff --git a/iroh-bytes/src/provider.rs b/iroh-bytes/src/provider.rs index 21ad4c2f7e..8d815f6fb1 100644 --- a/iroh-bytes/src/provider.rs +++ b/iroh-bytes/src/provider.rs @@ -14,9 +14,9 @@ use serde::{Deserialize, Serialize}; use tracing::{debug, debug_span, info, trace, warn}; use tracing_futures::Instrument; -use crate::baomap::*; use crate::hashseq::parse_hash_seq; use crate::protocol::{GetRequest, RangeSpec, Request, RequestToken}; +use crate::store::*; use crate::util::{BlobFormat, RpcError, Tag}; use crate::Hash; diff --git a/iroh/src/baomap.rs b/iroh-bytes/src/store.rs similarity index 61% rename from iroh/src/baomap.rs rename to iroh-bytes/src/store.rs index ba2821b144..b72841d665 100644 --- a/iroh/src/baomap.rs +++ b/iroh-bytes/src/store.rs @@ -1,9 +1,16 @@ -//! Various database implementations for storing blob data +//! Implementations of blob stores +use crate::{ + util::{BlobFormat, HashAndFormat}, + Hash, +}; +pub mod mem; +pub mod readonly_mem; + #[cfg(feature = "flat-db")] pub mod flat; -pub mod mem; -pub mod readonly_mem; +mod traits; +pub use traits::*; fn flatten_to_io( e: std::result::Result, tokio::task::JoinError>, @@ -34,19 +41,19 @@ struct TempCounters { } impl TempCounters { - fn counter(&mut self, format: iroh_bytes::util::BlobFormat) -> &mut u64 { + fn counter(&mut self, format: BlobFormat) -> &mut u64 { match format { - iroh_bytes::util::BlobFormat::Raw => &mut self.raw, - iroh_bytes::util::BlobFormat::HashSeq => &mut self.hash_seq, + BlobFormat::Raw => &mut self.raw, + BlobFormat::HashSeq => &mut self.hash_seq, } } - fn inc(&mut self, format: iroh_bytes::util::BlobFormat) { + fn inc(&mut self, format: BlobFormat) { let counter = self.counter(format); *counter = counter.checked_add(1).unwrap(); } - fn dec(&mut self, format: iroh_bytes::util::BlobFormat) { + fn dec(&mut self, format: BlobFormat) { let counter = self.counter(format); *counter = counter.saturating_sub(1); } @@ -57,16 +64,16 @@ impl TempCounters { } #[derive(Debug, Clone, Default)] -struct TempCounterMap(std::collections::BTreeMap); +struct TempCounterMap(std::collections::BTreeMap); impl TempCounterMap { - fn inc(&mut self, value: &iroh_bytes::util::HashAndFormat) { - let iroh_bytes::util::HashAndFormat { hash, format } = value; + fn inc(&mut self, value: &HashAndFormat) { + let HashAndFormat { hash, format } = value; self.0.entry(*hash).or_default().inc(*format) } - fn dec(&mut self, value: &iroh_bytes::util::HashAndFormat) { - let iroh_bytes::util::HashAndFormat { hash, format } = value; + fn dec(&mut self, value: &HashAndFormat) { + let HashAndFormat { hash, format } = value; let counters = self.0.get_mut(hash).unwrap(); counters.dec(*format); if counters.is_empty() { @@ -74,18 +81,18 @@ impl TempCounterMap { } } - fn contains(&self, hash: &iroh_bytes::Hash) -> bool { + fn contains(&self, hash: &Hash) -> bool { self.0.contains_key(hash) } - fn keys(&self) -> impl Iterator { + fn keys(&self) -> impl Iterator { let mut res = Vec::new(); for (k, v) in self.0.iter() { if v.raw > 0 { - res.push(iroh_bytes::util::HashAndFormat::raw(*k)); + res.push(HashAndFormat::raw(*k)); } if v.hash_seq > 0 { - res.push(iroh_bytes::util::HashAndFormat::hash_seq(*k)); + res.push(HashAndFormat::hash_seq(*k)); } } res.into_iter() diff --git a/iroh/src/baomap/flat.rs b/iroh-bytes/src/store/flat.rs similarity index 99% rename from iroh/src/baomap/flat.rs rename to iroh-bytes/src/store/flat.rs index f759d9bf9a..5939b49010 100644 --- a/iroh/src/baomap/flat.rs +++ b/iroh-bytes/src/store/flat.rs @@ -130,6 +130,13 @@ use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; use std::time::SystemTime; +use super::{ + EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, PartialMap, + PartialMapEntry, ReadableStore, ValidateProgress, +}; +use crate::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender}; +use crate::util::{BlobFormat, HashAndFormat, LivenessTracker, Tag}; +use crate::{Hash, TempTag, IROH_BLOCK_SIZE}; use bao_tree::io::outboard::{PostOrderMemOutboard, PreOrderOutboard}; use bao_tree::io::sync::ReadAt; use bao_tree::{blake3, ChunkRanges}; @@ -138,13 +145,6 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::future::Either; use futures::{Future, FutureExt, Stream, StreamExt}; -use iroh_bytes::baomap::{ - self, EntryStatus, ExportMode, ImportMode, ImportProgress, LivenessTracker, Map, MapEntry, - PartialMap, PartialMapEntry, ReadableStore, TempTag, ValidateProgress, -}; -use iroh_bytes::util::progress::{IdGenerator, IgnoreProgressSender, ProgressSender}; -use iroh_bytes::util::{BlobFormat, HashAndFormat, Tag}; -use iroh_bytes::{Hash, IROH_BLOCK_SIZE}; use iroh_io::{AsyncSliceReader, AsyncSliceWriter, File}; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; @@ -685,7 +685,7 @@ impl ReadableStore for Store { } } -impl baomap::Store for Store { +impl super::Store for Store { fn import_file( &self, path: PathBuf, @@ -916,7 +916,7 @@ impl Store { Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?) })?; progress.blocking_send(ImportProgress::OutboardDone { id, hash })?; - use baomap::Store; + use super::Store; // from here on, everything related to the hash is protected by the temp tag let tag = self.temp_tag(HashAndFormat { hash, format }); let hash = *tag.hash(); @@ -1195,7 +1195,7 @@ impl Store { complete_path: PathBuf, partial_path: PathBuf, meta_path: PathBuf, - rt: iroh_bytes::util::runtime::Handle, + rt: crate::util::runtime::Handle, ) -> anyhow::Result { tracing::debug!( "loading database from {} {}", @@ -1465,7 +1465,7 @@ impl Store { complete_path: impl AsRef, partial_path: impl AsRef, meta_path: impl AsRef, - rt: &iroh_bytes::util::runtime::Handle, + rt: &crate::util::runtime::Handle, ) -> anyhow::Result { let complete_path = complete_path.as_ref().to_path_buf(); let partial_path = partial_path.as_ref().to_path_buf(); @@ -1480,7 +1480,7 @@ impl Store { complete_path: impl AsRef, partial_path: impl AsRef, meta_path: impl AsRef, - rt: &iroh_bytes::util::runtime::Handle, + rt: &crate::util::runtime::Handle, ) -> anyhow::Result { let complete_path = complete_path.as_ref().to_path_buf(); let partial_path = partial_path.as_ref().to_path_buf(); diff --git a/iroh/src/baomap/mem.rs b/iroh-bytes/src/store/mem.rs similarity index 96% rename from iroh/src/baomap/mem.rs rename to iroh-bytes/src/store/mem.rs index 7ca4ad519d..caf1c632e1 100644 --- a/iroh/src/baomap/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -15,6 +15,17 @@ use std::time::SystemTime; use super::flatten_to_io; use super::temp_name; use super::TempCounterMap; +use crate::{ + store::{ + EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, PartialMap, + PartialMapEntry, ReadableStore, ValidateProgress, + }, + util::{ + progress::{IdGenerator, IgnoreProgressSender, ProgressSender}, + runtime, BlobFormat, HashAndFormat, LivenessTracker, + }, + Hash, Tag, TempTag, IROH_BLOCK_SIZE, +}; use bao_tree::blake3; use bao_tree::io::fsm::Outboard; use bao_tree::io::outboard::PreOrderOutboard; @@ -28,27 +39,7 @@ use derive_more::From; use futures::future::BoxFuture; use futures::FutureExt; use futures::{Stream, StreamExt}; -use iroh_bytes::baomap; -use iroh_bytes::baomap::EntryStatus; -use iroh_bytes::baomap::ExportMode; -use iroh_bytes::baomap::ImportMode; -use iroh_bytes::baomap::ImportProgress; -use iroh_bytes::baomap::LivenessTracker; -use iroh_bytes::baomap::PartialMap; -use iroh_bytes::baomap::PartialMapEntry; -use iroh_bytes::baomap::TempTag; -use iroh_bytes::baomap::ValidateProgress; -use iroh_bytes::baomap::{Map, MapEntry, ReadableStore}; -use iroh_bytes::util::progress::IdGenerator; -use iroh_bytes::util::progress::IgnoreProgressSender; -use iroh_bytes::util::progress::ProgressSender; -use iroh_bytes::util::runtime; -use iroh_bytes::util::BlobFormat; -use iroh_bytes::util::HashAndFormat; -use iroh_bytes::util::Tag; -use iroh_bytes::{Hash, IROH_BLOCK_SIZE}; -use iroh_io::AsyncSliceReader; -use iroh_io::AsyncSliceWriter; +use iroh_io::{AsyncSliceReader, AsyncSliceWriter}; use tokio::sync::mpsc; /// A mutable file like object that can be used for partial entries. @@ -458,7 +449,7 @@ impl PartialMap for Store { } } -impl baomap::Store for Store { +impl super::Store for Store { fn import_file( &self, path: std::path::PathBuf, @@ -619,7 +610,7 @@ impl Store { data: outboard.into(), }; let hash = hash.into(); - use baomap::Store; + use super::Store; let tag = self.temp_tag(HashAndFormat { hash, format }); self.0 .state diff --git a/iroh/src/baomap/readonly_mem.rs b/iroh-bytes/src/store/readonly_mem.rs similarity index 97% rename from iroh/src/baomap/readonly_mem.rs rename to iroh-bytes/src/store/readonly_mem.rs index f10053cc13..3140904387 100644 --- a/iroh/src/baomap/readonly_mem.rs +++ b/iroh-bytes/src/store/readonly_mem.rs @@ -8,6 +8,17 @@ use std::{ sync::Arc, }; +use crate::{ + store::{ + EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, PartialMap, + PartialMapEntry, ReadableStore, ValidateProgress, + }, + util::{ + progress::{IdGenerator, ProgressSender}, + BlobFormat, HashAndFormat, Tag, + }, + Hash, TempTag, IROH_BLOCK_SIZE, +}; use bao_tree::{ blake3, io::{ @@ -21,17 +32,6 @@ use futures::{ future::{self, BoxFuture}, FutureExt, Stream, }; -use iroh_bytes::{ - baomap::{ - self, EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, PartialMap, - PartialMapEntry, ReadableStore, TempTag, ValidateProgress, - }, - util::{ - progress::{IdGenerator, ProgressSender}, - BlobFormat, HashAndFormat, Tag, - }, - Hash, IROH_BLOCK_SIZE, -}; use tokio::{io::AsyncWriteExt, sync::mpsc}; /// A readonly in memory database for iroh-bytes. @@ -320,7 +320,7 @@ impl PartialMapEntry for PartialEntry { } } -impl baomap::Store for Store { +impl super::Store for Store { fn import_file( &self, data: PathBuf, diff --git a/iroh-bytes/src/baomap.rs b/iroh-bytes/src/store/traits.rs similarity index 88% rename from iroh-bytes/src/baomap.rs rename to iroh-bytes/src/store/traits.rs index 1b3d2451b7..3fc850f905 100644 --- a/iroh-bytes/src/baomap.rs +++ b/iroh-bytes/src/store/traits.rs @@ -1,5 +1,5 @@ //! Traits for in-memory or persistent maps of blob with bao encoded outboards. -use std::{collections::BTreeSet, io, path::PathBuf, sync::Arc}; +use std::{collections::BTreeSet, io, path::PathBuf}; use crate::{ hashseq::parse_hash_seq, @@ -7,7 +7,7 @@ use crate::{ progress::{IdGenerator, ProgressSender}, BlobFormat, HashAndFormat, RpcError, Tag, }, - Hash, + Hash, TempTag, }; use bao_tree::{blake3, ChunkRanges}; use bytes::Bytes; @@ -283,86 +283,6 @@ pub trait Store: ReadableStore + PartialMap { fn delete(&self, hash: &Hash) -> BoxFuture<'_, io::Result<()>>; } -/// A trait for things that can track liveness of blobs and collections. -/// -/// This trait works together with [TempTag] to keep track of the liveness of a -/// blob or collection. -/// -/// It is important to include the format in the liveness tracking, since -/// protecting a collection means protecting the blob and all its children, -/// whereas protecting a raw blob only protects the blob itself. -pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { - /// Called on clone - fn on_clone(&self, inner: &HashAndFormat); - /// Called on drop - fn on_drop(&self, inner: &HashAndFormat); -} - -/// A hash and format pair that is protected from garbage collection. -/// -/// If format is raw, this will protect just the blob -/// If format is collection, this will protect the collection and all blobs in it -#[derive(Debug)] -pub struct TempTag { - /// The hash and format we are pinning - inner: HashAndFormat, - /// liveness tracker - liveness: Option>, -} - -impl TempTag { - /// Create a new temp tag for the given hash and format - /// - /// This should only be used by store implementations. - /// - /// The caller is responsible for increasing the refcount on creation and to - /// make sure that temp tags that are created between a mark phase and a sweep - /// phase are protected. - pub fn new(inner: HashAndFormat, liveness: Option>) -> Self { - if let Some(liveness) = liveness.as_ref() { - liveness.on_clone(&inner); - } - Self { inner, liveness } - } - - /// The hash of the pinned item - pub fn inner(&self) -> &HashAndFormat { - &self.inner - } - - /// The hash of the pinned item - pub fn hash(&self) -> &Hash { - &self.inner.hash - } - - /// The format of the pinned item - pub fn format(&self) -> BlobFormat { - self.inner.format - } - - /// Keep the item alive until the end of the process - pub fn leak(mut self) { - // set the liveness tracker to None, so that the refcount is not decreased - // during drop. This means that the refcount will never reach 0 and the - // item will not be gced until the end of the process. - self.liveness = None; - } -} - -impl Clone for TempTag { - fn clone(&self) -> Self { - Self::new(self.inner, self.liveness.clone()) - } -} - -impl Drop for TempTag { - fn drop(&mut self) { - if let Some(liveness) = self.liveness.as_ref() { - liveness.on_drop(&self.inner); - } - } -} - /// Implementation of the gc method. async fn gc_mark_task<'a>( store: &'a impl Store, diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index 644223b35d..ebf5b8dfdd 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -9,7 +9,7 @@ use serde::{ ser::SerializeTuple, Deserialize, Deserializer, Serialize, Serializer, }; -use std::{borrow::Borrow, fmt, result, str::FromStr, time::SystemTime}; +use std::{borrow::Borrow, fmt, result, str::FromStr, sync::Arc, time::SystemTime}; use thiserror::Error; pub mod io; pub mod progress; @@ -102,15 +102,6 @@ impl Tag { } } -/// Option for commands that allow setting a tag -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub enum SetTagOption { - /// A tag will be automatically generated - Auto, - /// The tag is explicitly named - Named(Tag), -} - /// A hash and format pair #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct HashAndFormat { @@ -369,6 +360,86 @@ impl MaxSize for Hash { const POSTCARD_MAX_SIZE: usize = 32; } +/// A trait for things that can track liveness of blobs and collections. +/// +/// This trait works together with [TempTag] to keep track of the liveness of a +/// blob or collection. +/// +/// It is important to include the format in the liveness tracking, since +/// protecting a collection means protecting the blob and all its children, +/// whereas protecting a raw blob only protects the blob itself. +pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { + /// Called on clone + fn on_clone(&self, inner: &HashAndFormat); + /// Called on drop + fn on_drop(&self, inner: &HashAndFormat); +} + +/// A hash and format pair that is protected from garbage collection. +/// +/// If format is raw, this will protect just the blob +/// If format is collection, this will protect the collection and all blobs in it +#[derive(Debug)] +pub struct TempTag { + /// The hash and format we are pinning + inner: HashAndFormat, + /// liveness tracker + liveness: Option>, +} + +impl TempTag { + /// Create a new temp tag for the given hash and format + /// + /// This should only be used by store implementations. + /// + /// The caller is responsible for increasing the refcount on creation and to + /// make sure that temp tags that are created between a mark phase and a sweep + /// phase are protected. + pub fn new(inner: HashAndFormat, liveness: Option>) -> Self { + if let Some(liveness) = liveness.as_ref() { + liveness.on_clone(&inner); + } + Self { inner, liveness } + } + + /// The hash of the pinned item + pub fn inner(&self) -> &HashAndFormat { + &self.inner + } + + /// The hash of the pinned item + pub fn hash(&self) -> &Hash { + &self.inner.hash + } + + /// The format of the pinned item + pub fn format(&self) -> BlobFormat { + self.inner.format + } + + /// Keep the item alive until the end of the process + pub fn leak(mut self) { + // set the liveness tracker to None, so that the refcount is not decreased + // during drop. This means that the refcount will never reach 0 and the + // item will not be gced until the end of the process. + self.liveness = None; + } +} + +impl Clone for TempTag { + fn clone(&self) -> Self { + Self::new(self.inner, self.liveness.clone()) + } +} + +impl Drop for TempTag { + fn drop(&mut self) { + if let Some(liveness) = self.liveness.as_ref() { + liveness.on_drop(&self.inner); + } + } +} + const CID_PREFIX: [u8; 4] = [ 0x01, // version 0x55, // raw codec diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index e8ce7e3d4f..470f13d0d1 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -68,13 +68,12 @@ colored = { version = "2.0.4", optional = true } # Examples ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"], optional = true } -reflink-copy = { version = "0.1.8", optional = true } [features] default = ["cli", "metrics"] cli = ["clap", "config", "console", "dirs-next", "indicatif", "multibase", "quic-rpc/quinn-transport", "tokio/rt-multi-thread", "tracing-subscriber", "flat-db", "shell-words", "shellexpand", "rustyline", "colored", "toml", "human-time", "comfy-table", "dialoguer"] metrics = ["iroh-metrics"] -flat-db = ["reflink-copy"] +flat-db = ["iroh-bytes/flat-db"] test = [] example-sync = ["cli"] diff --git a/iroh/examples/client.rs b/iroh/examples/client.rs index b20706a335..45618ed29a 100644 --- a/iroh/examples/client.rs +++ b/iroh/examples/client.rs @@ -14,7 +14,7 @@ use tokio_stream::StreamExt; #[tokio::main] async fn main() -> anyhow::Result<()> { let rt = runtime::Handle::from_current(1)?; - let db = iroh::baomap::mem::Store::new(rt.clone()); + let db = iroh_bytes::store::mem::Store::new(rt.clone()); let store = iroh_sync::store::memory::Store::default(); let node = Node::builder(db.clone(), store) .runtime(&rt) diff --git a/iroh/examples/collection.rs b/iroh/examples/collection.rs index 7d503f0cea..d85fb36afc 100644 --- a/iroh/examples/collection.rs +++ b/iroh/examples/collection.rs @@ -8,7 +8,7 @@ //! $ cargo run -p collection use iroh::bytes::util::runtime; use iroh::collection::{Blob, Collection}; -use iroh_bytes::util::BlobFormat; +use iroh_bytes::BlobFormat; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -24,7 +24,7 @@ pub fn setup_logging() { async fn main() -> anyhow::Result<()> { setup_logging(); // create a new database and add two blobs - let (mut db, names) = iroh::baomap::readonly_mem::Store::new([ + let (mut db, names) = iroh_bytes::store::readonly_mem::Store::new([ ("blob1", b"the first blob of bytes".to_vec()), ("blob2", b"the second blob of bytes".to_vec()), ]); diff --git a/iroh/examples/hello-world.rs b/iroh/examples/hello-world.rs index 534b9aaf23..7d82289bab 100644 --- a/iroh/examples/hello-world.rs +++ b/iroh/examples/hello-world.rs @@ -6,7 +6,7 @@ //! run this example from the project root: //! $ cargo run --example hello-world use iroh::bytes::util::runtime; -use iroh_bytes::util::BlobFormat; +use iroh_bytes::BlobFormat; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -22,7 +22,7 @@ pub fn setup_logging() { async fn main() -> anyhow::Result<()> { setup_logging(); // create a new, empty in memory database - let mut db = iroh::baomap::readonly_mem::Store::default(); + let mut db = iroh_bytes::store::readonly_mem::Store::default(); // create an in-memory doc store (not used in the example) let doc_store = iroh_sync::store::memory::Store::default(); // create a new iroh runtime with 1 worker thread, reusing the existing tokio runtime diff --git a/iroh/examples/rpc.rs b/iroh/examples/rpc.rs index 94660ced56..8225065639 100644 --- a/iroh/examples/rpc.rs +++ b/iroh/examples/rpc.rs @@ -11,7 +11,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use clap::Parser; use iroh::rpc_protocol::{ProviderRequest, ProviderResponse}; use iroh::{bytes::util::runtime, rpc_protocol::ProviderService}; -use iroh_bytes::baomap::Store; +use iroh_bytes::store::Store; use iroh_net::key::SecretKey; use quic_rpc::transport::quinn::QuinnServerEndpoint; use quic_rpc::ServiceEndpoint; @@ -92,11 +92,11 @@ async fn main() -> anyhow::Result<()> { match args.path { Some(path) => { tokio::fs::create_dir_all(&path).await?; - let db = iroh::baomap::flat::Store::load(&path, &path, &path, &rt).await?; + let db = iroh_bytes::store::flat::Store::load(&path, &path, &path, &rt).await?; run(db).await } None => { - let db = iroh::baomap::mem::Store::new(rt); + let db = iroh_bytes::store::mem::Store::new(rt); run(db).await } } diff --git a/iroh/examples/sync.rs b/iroh/examples/sync.rs index 393b0e83fc..4485559dbf 100644 --- a/iroh/examples/sync.rs +++ b/iroh/examples/sync.rs @@ -27,9 +27,8 @@ use iroh::{ }; use iroh_bytes::util::runtime; use iroh_bytes::{ - baomap::{ImportMode, Map, MapEntry, Store as BaoStore}, - util::progress::IgnoreProgressSender, - util::BlobFormat, + store::{ImportMode, Map, MapEntry, Store as BaoStore}, + util::{progress::IgnoreProgressSender, BlobFormat}, }; use iroh_gossip::{ net::{Gossip, GOSSIP_ALPN}, @@ -228,7 +227,7 @@ async fn run(args: Args) -> anyhow::Result<()> { // create a bao store for the iroh-bytes blobs let blob_path = storage_path.join("blobs"); std::fs::create_dir_all(&blob_path)?; - let db = iroh::baomap::flat::Store::load(&blob_path, &blob_path, &blob_path, &rt).await?; + let db = iroh_bytes::store::flat::Store::load(&blob_path, &blob_path, &blob_path, &rt).await?; // create the live syncer let downloader = Downloader::new(db.clone(), endpoint.clone(), rt.clone()).await; @@ -348,7 +347,7 @@ struct ReplState { store: store::fs::Store, author: Author, doc: Doc, - db: iroh::baomap::flat::Store, + db: iroh_bytes::store::flat::Store, ticket: Ticket, log_filter: LogLevelReload, current_watch: Arc>>, @@ -1003,13 +1002,16 @@ mod iroh_bytes_handlers { #[derive(Debug, Clone)] pub struct IrohBytesHandlers { - db: iroh::baomap::flat::Store, + db: iroh_bytes::store::flat::Store, rt: iroh_bytes::util::runtime::Handle, event_sender: NoopEventSender, auth_handler: Arc, } impl IrohBytesHandlers { - pub fn new(rt: iroh_bytes::util::runtime::Handle, db: iroh::baomap::flat::Store) -> Self { + pub fn new( + rt: iroh_bytes::util::runtime::Handle, + db: iroh_bytes::store::flat::Store, + ) -> Self { Self { db, rt, diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 1f4032cebd..2392bf1d8d 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -13,10 +13,10 @@ use anyhow::{anyhow, Result}; use bytes::Bytes; use futures::stream::BoxStream; use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; -use iroh_bytes::baomap::ValidateProgress; use iroh_bytes::provider::AddProgress; -use iroh_bytes::util::{BlobFormat, SetTagOption, Tag}; +use iroh_bytes::store::ValidateProgress; use iroh_bytes::Hash; +use iroh_bytes::{BlobFormat, Tag}; use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, PeerAddr}; use iroh_sync::{store::GetFilter, AuthorId, Entry, NamespaceId}; use quic_rpc::{RpcClient, ServiceConnection}; @@ -35,7 +35,7 @@ use crate::rpc_protocol::{ DocStartSyncRequest, DocSubscribeRequest, DocTicket, GetProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, - NodeStatusResponse, ProviderService, ShareMode, WrapOption, + NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, }; use crate::sync_engine::{LiveEvent, LiveStatus}; diff --git a/iroh/src/collection.rs b/iroh/src/collection.rs index d9b08619ea..d1bcedf3b4 100644 --- a/iroh/src/collection.rs +++ b/iroh/src/collection.rs @@ -4,12 +4,12 @@ use std::collections::BTreeMap; use anyhow::Context; use bao_tree::blake3; use bytes::Bytes; -use iroh_bytes::baomap::{MapEntry, TempTag}; use iroh_bytes::get::fsm::EndBlobNext; use iroh_bytes::get::Stats; use iroh_bytes::hashseq::HashSeq; +use iroh_bytes::store::MapEntry; use iroh_bytes::util::BlobFormat; -use iroh_bytes::{baomap, Hash}; +use iroh_bytes::{Hash, TempTag}; use iroh_io::AsyncSliceReaderExt; use serde::{Deserialize, Serialize}; @@ -113,7 +113,7 @@ impl Collection { /// It does not require that all child blobs are stored in the store. pub async fn load(db: &D, root: &Hash) -> anyhow::Result where - D: baomap::Map, + D: iroh_bytes::store::Map, { let links_entry = db.get(root).context("links not found")?; anyhow::ensure!(links_entry.is_complete(), "links not complete"); @@ -135,7 +135,7 @@ impl Collection { /// as a TempTag. pub async fn store(self, db: &D) -> anyhow::Result where - D: baomap::Store, + D: iroh_bytes::store::Store, { let (links, meta) = self.into_parts(); let meta_bytes = postcard::to_stdvec(&meta)?; diff --git a/iroh/src/commands.rs b/iroh/src/commands.rs index 18e0d61a52..65e755178b 100644 --- a/iroh/src/commands.rs +++ b/iroh/src/commands.rs @@ -16,8 +16,7 @@ use indicatif::{ use iroh::client::quic::Iroh; use iroh::dial::Ticket; use iroh::rpc_protocol::*; -use iroh_bytes::util::{BlobFormat, SetTagOption, Tag}; -use iroh_bytes::{protocol::RequestToken, util::runtime, Hash}; +use iroh_bytes::{protocol::RequestToken, util::runtime, BlobFormat, Hash, Tag}; use iroh_net::PeerAddr; use iroh_net::{ key::{PublicKey, SecretKey}, diff --git a/iroh/src/commands/add.rs b/iroh/src/commands/add.rs index b538f473c0..d8bf070a61 100644 --- a/iroh/src/commands/add.rs +++ b/iroh/src/commands/add.rs @@ -10,12 +10,12 @@ use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressStyle}; use iroh::{ client::Iroh, dial::Ticket, - rpc_protocol::{ProviderService, WrapOption}, + rpc_protocol::{ProviderService, SetTagOption, WrapOption}, }; use iroh_bytes::{ protocol::RequestToken, provider::AddProgress, - util::{BlobFormat, HashAndFormat, SetTagOption, Tag}, + util::{BlobFormat, HashAndFormat, Tag}, Hash, }; use quic_rpc::ServiceConnection; diff --git a/iroh/src/commands/get.rs b/iroh/src/commands/get.rs index e3b60475fe..12d3919223 100644 --- a/iroh/src/commands/get.rs +++ b/iroh/src/commands/get.rs @@ -5,7 +5,7 @@ use bao_tree::ChunkRanges; use futures::StreamExt; use iroh::{ collection::Collection, - rpc_protocol::{BlobDownloadRequest, DownloadLocation}, + rpc_protocol::{BlobDownloadRequest, DownloadLocation, SetTagOption}, util::progress::ProgressSliceWriter, }; use iroh_bytes::{ @@ -20,7 +20,7 @@ use iroh_bytes::{ provider::GetProgress, util::{ progress::{FlumeProgressSender, IdGenerator, ProgressSender}, - BlobFormat, SetTagOption, + BlobFormat, }, }; use iroh_io::ConcatenateSliceWriter; @@ -64,8 +64,8 @@ impl GetInteractive { } } tokio::fs::create_dir_all(&temp_dir).await?; - let db: iroh::baomap::flat::Store = - iroh::baomap::flat::Store::load(&temp_dir, &temp_dir, &temp_dir, &self.rt).await?; + let db = + iroh_bytes::store::flat::Store::load(&temp_dir, &temp_dir, &temp_dir, &self.rt).await?; // TODO: we don't need sync here, maybe disable completely? let doc_store = iroh_sync::store::memory::Store::default(); // spin up temp node and ask it to download the data for us diff --git a/iroh/src/commands/node.rs b/iroh/src/commands/node.rs index fcaa310be6..c872db5b87 100644 --- a/iroh/src/commands/node.rs +++ b/iroh/src/commands/node.rs @@ -8,13 +8,15 @@ use std::{ use anyhow::{ensure, Context, Result}; use iroh::{ - baomap::flat::{self, Store as BaoFsStore}, client::quic::RPC_ALPN, node::{Node, StaticTokenAuthHandler}, rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}, util::{fs::load_secret_key, path::IrohPaths}, }; -use iroh_bytes::{baomap::Store as BaoStore, protocol::RequestToken, util::runtime}; +use iroh_bytes::{ + protocol::RequestToken, store::flat::Store as BaoFsStore, store::Store as BaoStore, + util::runtime, +}; use iroh_net::{ derp::{DerpMap, DerpMode}, key::SecretKey, @@ -95,9 +97,10 @@ async fn start_daemon_node( let peer_data_path = path_with_env(IrohPaths::PeerData)?; tokio::fs::create_dir_all(&blob_dir).await?; tokio::fs::create_dir_all(&partial_blob_dir).await?; - let bao_store = flat::Store::load(&blob_dir, &partial_blob_dir, &meta_dir, rt) - .await - .with_context(|| format!("Failed to load iroh database from {}", blob_dir.display()))?; + let bao_store = + iroh_bytes::store::flat::Store::load(&blob_dir, &partial_blob_dir, &meta_dir, rt) + .await + .with_context(|| format!("Failed to load iroh database from {}", blob_dir.display()))?; let key = Some(path_with_env(IrohPaths::SecretKey)?); let doc_store = iroh_sync::store::fs::Store::new(path_with_env(IrohPaths::DocsDatabase)?)?; spawn_daemon_node(rt, bao_store, doc_store, key, peer_data_path, opts).await diff --git a/iroh/src/commands/sync.rs b/iroh/src/commands/sync.rs index 747e50e6be..6b835cc322 100644 --- a/iroh/src/commands/sync.rs +++ b/iroh/src/commands/sync.rs @@ -16,15 +16,11 @@ use tokio::io::AsyncReadExt; use iroh::{ client::quic::{Doc, Iroh}, - rpc_protocol::{DocTicket, ShareMode, WrapOption}, + rpc_protocol::{DocTicket, SetTagOption, ShareMode, WrapOption}, sync_engine::{LiveEvent, Origin}, util::fs::{path_content_info, PathContent}, }; -use iroh_bytes::{ - provider::AddProgress, - util::{SetTagOption, Tag}, - Hash, -}; +use iroh_bytes::{provider::AddProgress, Hash, Tag}; use iroh_sync::{store::GetFilter, AuthorId, Entry, NamespaceId}; use crate::config::ConsoleEnv; diff --git a/iroh/src/commands/validate.rs b/iroh/src/commands/validate.rs index e493950658..3cf94cf4b7 100644 --- a/iroh/src/commands/validate.rs +++ b/iroh/src/commands/validate.rs @@ -5,7 +5,7 @@ use console::{style, Emoji}; use futures::StreamExt; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use iroh::client::quic::Iroh; -use iroh_bytes::{baomap::ValidateProgress, Hash}; +use iroh_bytes::{store::ValidateProgress, Hash}; pub async fn run(iroh: &Iroh, repair: bool) -> Result<()> { let mut state = ValidateProgressState::new(); diff --git a/iroh/src/dial.rs b/iroh/src/dial.rs index 120fd17467..17943fad37 100644 --- a/iroh/src/dial.rs +++ b/iroh/src/dial.rs @@ -8,8 +8,7 @@ use std::str::FromStr; use anyhow::{ensure, Context, Result}; use iroh_bytes::protocol::RequestToken; -use iroh_bytes::util::BlobFormat; -use iroh_bytes::Hash; +use iroh_bytes::{BlobFormat, Hash}; use iroh_net::derp::{DerpMap, DerpMode}; use iroh_net::key::SecretKey; use iroh_net::PeerAddr; diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 5c918f8732..f3251906fa 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -33,12 +33,7 @@ use std::{ use bao_tree::ChunkRanges; use futures::{future::LocalBoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; -use iroh_bytes::{ - baomap::{Store, TempTag}, - protocol::RangeSpecSeq, - util::HashAndFormat, - Hash, -}; +use iroh_bytes::{protocol::RangeSpecSeq, store::Store, Hash, HashAndFormat, TempTag}; use iroh_net::{key::PublicKey, MagicEndpoint}; use tokio::sync::{mpsc, oneshot}; use tokio_util::{sync::CancellationToken, time::delay_queue}; diff --git a/iroh/src/downloader/get.rs b/iroh/src/downloader/get.rs index 625e4ba413..92d5698de7 100644 --- a/iroh/src/downloader/get.rs +++ b/iroh/src/downloader/get.rs @@ -4,19 +4,16 @@ use anyhow::Context; use bao_tree::io::fsm::OutboardMut; use bao_tree::ChunkRanges; use futures::FutureExt; -use iroh_bytes::baomap::TempTag; -use iroh_bytes::hashseq::parse_hash_seq; -use iroh_bytes::util::{BlobFormat, HashAndFormat}; use iroh_bytes::{ - baomap::{MapEntry, PartialMapEntry, Store}, get::{ self, fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext}, Stats, }, + hashseq::parse_hash_seq, protocol::{GetRequest, RangeSpecSeq}, - util::Hash, - IROH_BLOCK_SIZE, + store::{MapEntry, PartialMapEntry, Store}, + BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE, }; #[cfg(feature = "metrics")] use iroh_metrics::{inc, inc_by}; diff --git a/iroh/src/get.rs b/iroh/src/get.rs index 9b40f43322..2300b4ba62 100644 --- a/iroh/src/get.rs +++ b/iroh/src/get.rs @@ -5,11 +5,9 @@ use std::io; use anyhow::Context; use bao_tree::io::fsm::OutboardMut; use bao_tree::{ByteNum, ChunkRanges}; -use iroh_bytes::baomap::range_collections::range_set::RangeSetRange; use iroh_bytes::hashseq::parse_hash_seq; -use iroh_bytes::util::{BlobFormat, HashAndFormat}; +use iroh_bytes::store::range_collections::range_set::RangeSetRange; use iroh_bytes::{ - baomap::{MapEntry, PartialMap, PartialMapEntry, Store as BaoStore}, get::{ self, fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext}, @@ -17,11 +15,12 @@ use iroh_bytes::{ }, protocol::{GetRequest, RangeSpecSeq}, provider::GetProgress, + store::{MapEntry, PartialMap, PartialMapEntry, Store as BaoStore}, util::{ progress::{IdGenerator, ProgressSender}, Hash, }, - IROH_BLOCK_SIZE, + BlobFormat, HashAndFormat, IROH_BLOCK_SIZE, }; use iroh_io::AsyncSliceReader; use tracing::trace; diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 7fd5e80a98..4230c9e702 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -5,7 +5,6 @@ pub use iroh_bytes as bytes; pub use iroh_net as net; pub use iroh_sync as sync; -pub mod baomap; pub mod client; pub mod collection; pub mod dial; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 8319c1bbab..17286f698d 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -18,19 +18,20 @@ use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; use futures::future::{BoxFuture, Shared}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; -use iroh_bytes::baomap::{ +use iroh_bytes::hashseq::parse_hash_seq; +use iroh_bytes::provider::GetProgress; +use iroh_bytes::store::{ ExportMode, GcMarkEvent, GcSweepEvent, ImportProgress, Map, MapEntry, ReadableStore, Store as BaoStore, ValidateProgress, }; -use iroh_bytes::hashseq::parse_hash_seq; -use iroh_bytes::provider::GetProgress; use iroh_bytes::util::progress::{FlumeProgressSender, IdGenerator, ProgressSender}; -use iroh_bytes::util::{BlobFormat, HashAndFormat, RpcResult, SetTagOption}; +use iroh_bytes::util::RpcResult; use iroh_bytes::{ protocol::{Closed, Request, RequestToken}, provider::{AddProgress, RequestAuthorizationHandler}, util::runtime, util::Hash, + BlobFormat, HashAndFormat, TempTag, }; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_io::AsyncSliceReader; @@ -64,7 +65,7 @@ use crate::rpc_protocol::{ NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, - ProviderResponse, ProviderService, + ProviderResponse, ProviderService, SetTagOption, }; use crate::sync_engine::{SyncEngine, SYNC_ALPN}; @@ -103,8 +104,12 @@ impl Default for GcPolicy { /// Builder for the [`Node`]. /// -/// You must supply a blob store. Various store implementations are available -/// in [`crate::baomap`]. Everything else is optional. +/// You must supply a blob store and a document store. +/// +/// Blob store implementations are available in [`iroh_bytes::store`]. +/// Document store implementations are available in [`iroh_sync::store`]. +/// +/// Everything else is optional. /// /// Finally you can create and run the node by calling [`Builder::spawn`]. /// @@ -520,7 +525,7 @@ where tokio::time::sleep(gc_period).await; tracing::debug!("Starting GC"); callbacks - .send(Event::Db(iroh_bytes::baomap::Event::GcStarted)) + .send(Event::Db(iroh_bytes::store::Event::GcStarted)) .await; db.clear_live(); let doc_hashes = match ds.content_hashes() { @@ -579,7 +584,7 @@ where } } callbacks - .send(Event::Db(iroh_bytes::baomap::Event::GcCompleted)) + .send(Event::Db(iroh_bytes::store::Event::GcCompleted)) .await; } } @@ -682,7 +687,7 @@ pub enum Event { /// Events from the iroh-bytes transfer protocol. ByteProvide(iroh_bytes::provider::Event), /// Events from database - Db(iroh_bytes::baomap::Event), + Db(iroh_bytes::store::Event), } impl Node { @@ -1093,7 +1098,7 @@ impl RpcHandler { rpc_protocol::WrapOption, }; use futures::TryStreamExt; - use iroh_bytes::baomap::{ImportMode, TempTag}; + use iroh_bytes::store::ImportMode; use std::collections::BTreeMap; let progress = FlumeProgressSender::new(progress); @@ -1664,7 +1669,7 @@ mod tests { #[tokio::test] async fn test_ticket_multiple_addrs() { let rt = test_runtime(); - let (db, hashes) = crate::baomap::readonly_mem::Store::new([("test", b"hello")]); + let (db, hashes) = iroh_bytes::store::readonly_mem::Store::new([("test", b"hello")]); let doc_store = iroh_sync::store::memory::Store::default(); let hash = hashes["test"].into(); let node = Node::builder(db, doc_store) @@ -1681,10 +1686,9 @@ mod tests { #[tokio::test] async fn test_node_add_blob_stream() -> Result<()> { - use iroh_bytes::util::SetTagOption; use std::io::Cursor; let rt = runtime::Handle::from_current(1)?; - let db = crate::baomap::mem::Store::new(rt); + let db = iroh_bytes::store::mem::Store::new(rt); let doc_store = iroh_sync::store::memory::Store::default(); let node = Node::builder(db, doc_store) .bind_addr((Ipv4Addr::UNSPECIFIED, 0).into()) @@ -1706,10 +1710,8 @@ mod tests { #[tokio::test] async fn test_node_add_tagged_blob_event() -> Result<()> { - use iroh_bytes::util::SetTagOption; - let rt = runtime::Handle::from_current(1)?; - let db = crate::baomap::mem::Store::new(rt); + let db = iroh_bytes::store::mem::Store::new(rt); let doc_store = iroh_sync::store::memory::Store::default(); let node = Node::builder(db, doc_store) .bind_addr((Ipv4Addr::UNSPECIFIED, 0).into()) diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index eb17740265..865d339940 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -11,7 +11,7 @@ use std::{collections::HashMap, fmt, net::SocketAddr, path::PathBuf, str::FromSt use bytes::Bytes; use derive_more::{From, TryInto}; -use iroh_bytes::util::{BlobFormat, SetTagOption, Tag}; +use iroh_bytes::util::{BlobFormat, Tag}; pub use iroh_bytes::{protocol::RequestToken, provider::GetProgress, Hash}; use iroh_gossip::proto::util::base32; use iroh_net::{ @@ -30,13 +30,22 @@ use quic_rpc::{ }; use serde::{Deserialize, Serialize}; -pub use iroh_bytes::{baomap::ValidateProgress, provider::AddProgress, util::RpcResult}; +pub use iroh_bytes::{provider::AddProgress, store::ValidateProgress, util::RpcResult}; use crate::sync_engine::{LiveEvent, LiveStatus}; /// A 32-byte key or token pub type KeyBytes = [u8; 32]; +/// Option for commands that allow setting a tag +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum SetTagOption { + /// A tag will be automatically generated + Auto, + /// The tag is explicitly named + Named(Tag), +} + /// A request to the node to provide the data at the given path /// /// Will produce a stream of [`AddProgress`] messages. diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 959c634202..ffb9e428a9 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -3,7 +3,7 @@ //! [`iroh_sync::Replica`] is also called documents here. use anyhow::anyhow; -use iroh_bytes::{baomap::Store as BaoStore, util::runtime::Handle}; +use iroh_bytes::{store::Store as BaoStore, util::runtime::Handle}; use iroh_gossip::net::Gossip; use iroh_net::{MagicEndpoint, PeerAddr}; use iroh_sync::{ diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 5c12b26751..a34f16d6d3 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -12,11 +12,7 @@ use futures::{ stream::{BoxStream, FuturesUnordered, StreamExt}, FutureExt, TryFutureExt, }; -use iroh_bytes::{ - baomap::{self, EntryStatus}, - util::runtime::Handle, - Hash, -}; +use iroh_bytes::{store::EntryStatus, util::runtime::Handle, Hash}; use iroh_gossip::{ net::{Event, Gossip}, proto::TopicId, @@ -181,7 +177,7 @@ impl LiveSync { /// /// This spawn a background actor to handle gossip events and forward operations over broadcast /// messages. - pub fn spawn( + pub fn spawn( rt: Handle, endpoint: MagicEndpoint, replica_store: S, @@ -310,7 +306,7 @@ impl LiveSync { } // Currently peers might double-sync in both directions. -struct Actor { +struct Actor { endpoint: MagicEndpoint, gossip: Gossip, bao_store: B, @@ -360,7 +356,7 @@ struct Actor { #[derive(Debug, Clone, Copy)] pub struct RemovalToken(u64); -impl Actor { +impl Actor { pub fn new( endpoint: MagicEndpoint, gossip: Gossip, diff --git a/iroh/src/sync_engine/rpc.rs b/iroh/src/sync_engine/rpc.rs index 309c0f3720..9533fd1c45 100644 --- a/iroh/src/sync_engine/rpc.rs +++ b/iroh/src/sync_engine/rpc.rs @@ -3,7 +3,7 @@ use anyhow::anyhow; use futures::{FutureExt, Stream}; use iroh_bytes::{ - baomap::Store as BaoStore, + store::Store as BaoStore, util::{BlobFormat, RpcError}, }; use iroh_sync::{store::Store, sync::Namespace}; diff --git a/iroh/src/util/path.rs b/iroh/src/util/path.rs index f07b708f13..30d2f27115 100644 --- a/iroh/src/util/path.rs +++ b/iroh/src/util/path.rs @@ -9,13 +9,13 @@ pub enum IrohPaths { /// Path to the node's secret key for the [`iroh_net::key::PublicKey`]. #[strum(serialize = "keypair")] SecretKey, - /// Path to the node's [flat-file store](crate::baomap::flat) for complete blobs. + /// Path to the node's [flat-file store](iroh_bytes::store::flat) for complete blobs. #[strum(serialize = "blobs.v0")] BaoFlatStoreComplete, - /// Path to the node's [flat-file store](crate::baomap::flat) for partial blobs. + /// Path to the node's [flat-file store](iroh_bytes::store::flat) for partial blobs. #[strum(serialize = "blobs-partial.v0")] BaoFlatStorePartial, - /// Path to the node's [flat-file store](crate::baomap::flat) for metadata such as the tags table. + /// Path to the node's [flat-file store](iroh_bytes::store::flat) for metadata such as the tags table. #[strum(serialize = "blobs-meta.v0")] BaoFlatStoreMeta, /// Path to the [iroh-sync document database](iroh_sync::store::fs::Store) diff --git a/iroh/tests/cli.rs b/iroh/tests/cli.rs index 56f1549d77..190d8c6e52 100644 --- a/iroh/tests/cli.rs +++ b/iroh/tests/cli.rs @@ -149,7 +149,7 @@ fn make_partial( dir: impl AsRef, op: impl Fn(Hash, u64) -> MakePartialResult, ) -> io::Result<()> { - use iroh::baomap::flat::FileName; + use iroh_bytes::store::flat::FileName; let dir = dir.as_ref(); let mut files = BTreeMap::, bool)>::new(); for entry in std::fs::read_dir(dir)? { @@ -159,15 +159,15 @@ fn make_partial( } let name = entry.file_name(); let Some(name) = name.to_str() else { continue }; - let Ok(name) = iroh::baomap::flat::FileName::from_str(name) else { + let Ok(name) = iroh_bytes::store::flat::FileName::from_str(name) else { continue; }; match name { - iroh::baomap::flat::FileName::Data(hash) => { + iroh_bytes::store::flat::FileName::Data(hash) => { let data = files.entry(hash).or_default(); data.0 = Some(entry.metadata()?.len()); } - iroh::baomap::flat::FileName::Outboard(hash) => { + iroh_bytes::store::flat::FileName::Outboard(hash) => { let data = files.entry(hash).or_default(); data.1 = true; } @@ -317,8 +317,8 @@ fn cli_provide_from_stdin_to_stdout() -> Result<()> { #[cfg(all(unix, feature = "cli"))] #[test] fn cli_provide_persistence() -> anyhow::Result<()> { - use iroh::baomap::flat::Store; - use iroh_bytes::baomap::ReadableStore; + use iroh_bytes::store::flat::Store; + use iroh_bytes::store::ReadableStore; use nix::{ sys::signal::{self, Signal}, unistd::Pid, diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index e60165c947..9b161cdb40 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -3,12 +3,12 @@ use std::time::Duration; use anyhow::Result; use bytes::Bytes; use futures::FutureExt; -use iroh::{baomap, node::Node}; +use iroh::node::Node; use rand::RngCore; use iroh_bytes::{ - baomap::{EntryStatus, Map, Store}, hashseq::HashSeq, + store::{EntryStatus, Map, Store}, util::{runtime, BlobFormat, HashAndFormat, Tag}, }; @@ -32,7 +32,7 @@ async fn wrap_in_node( gc_period: Duration, ) -> Node where - S: iroh_bytes::baomap::Store, + S: iroh_bytes::store::Store, { let doc_store = iroh_sync::store::memory::Store::default(); Node::builder(bao_store, doc_store) @@ -43,9 +43,9 @@ where .unwrap() } -async fn attach_db_events( +async fn attach_db_events( node: &Node, -) -> flume::Receiver { +) -> flume::Receiver { let (db_send, db_recv) = flume::unbounded(); node.subscribe(move |ev| { let db_send = db_send.clone(); @@ -62,25 +62,25 @@ async fn attach_db_events ( - Node, - baomap::mem::Store, - flume::Receiver, + Node, + iroh_bytes::store::mem::Store, + flume::Receiver, ) { let rt = test_runtime(); - let bao_store = baomap::mem::Store::new(rt.clone()); + let bao_store = iroh_bytes::store::mem::Store::new(rt.clone()); let node = wrap_in_node(bao_store.clone(), rt, Duration::from_millis(50)).await; let db_recv = attach_db_events(&node).await; (node, bao_store, db_recv) } -async fn step(evs: &flume::Receiver) { +async fn step(evs: &flume::Receiver) { while let Ok(ev) = evs.recv_async().await { - if let iroh_bytes::baomap::Event::GcCompleted = ev { + if let iroh_bytes::store::Event::GcCompleted = ev { break; } } while let Ok(ev) = evs.recv_async().await { - if let iroh_bytes::baomap::Event::GcCompleted = ev { + if let iroh_bytes::store::Event::GcCompleted = ev { break; } } @@ -205,15 +205,14 @@ mod flat { ChunkRanges, }; use bytes::Bytes; - use iroh::baomap; use iroh_io::AsyncSliceWriter; use testdir::testdir; use iroh_bytes::{ - baomap::{PartialMap, PartialMapEntry, Store, TempTag}, hashseq::HashSeq, + store::{PartialMap, PartialMapEntry, Store}, util::{BlobFormat, HashAndFormat, Tag}, - IROH_BLOCK_SIZE, + TempTag, IROH_BLOCK_SIZE, }; fn path(root: PathBuf, suffix: &'static str) -> impl Fn(&iroh_bytes::Hash) -> PathBuf { @@ -282,7 +281,8 @@ mod flat { let outboard_path = outboard_path(dir.clone()); let bao_store = - baomap::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt).await?; + iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt) + .await?; let node = wrap_in_node(bao_store.clone(), rt, Duration::from_millis(0)).await; let evs = attach_db_events(&node).await; let data1 = create_test_data(123456); @@ -381,7 +381,7 @@ mod flat { /// /// During this time, the partial entry is protected by a temp tag. #[allow(dead_code)] - async fn simulate_download_protected( + async fn simulate_download_protected( bao_store: &S, data: Bytes, ) -> io::Result { @@ -439,7 +439,8 @@ mod flat { let count_partial_outboard = count_partial_outboard(dir.clone()); let bao_store = - baomap::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt).await?; + iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt) + .await?; let node = wrap_in_node(bao_store.clone(), rt, Duration::from_millis(0)).await; let evs = attach_db_events(&node).await; diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index b7dce326f3..ee1df80001 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -24,7 +24,6 @@ use tokio::sync::mpsc; use bao_tree::{blake3, ChunkNum, ChunkRanges}; use iroh_bytes::{ - baomap::{PartialMap, Store}, get::{ fsm::ConnectedNext, fsm::{self, DecodeError}, @@ -32,6 +31,7 @@ use iroh_bytes::{ }, protocol::{GetRequest, RangeSpecSeq, RequestToken}, provider::{self, RequestAuthorizationHandler}, + store::{PartialMap, Store}, util::{runtime, BlobFormat}, Hash, }; @@ -152,7 +152,7 @@ async fn multiple_clients() -> Result<()> { let content = b"hello world!"; let addr = "127.0.0.1:0".parse().unwrap(); - let mut db = iroh::baomap::readonly_mem::Store::default(); + let mut db = iroh_bytes::store::readonly_mem::Store::default(); let expect_hash = db.insert(content.as_slice()); let expect_name = "hello_world".to_string(); let collection = Collection::new( @@ -224,7 +224,7 @@ where let mut expects = Vec::new(); let num_blobs = file_opts.len(); - let (mut mdb, lookup) = iroh::baomap::readonly_mem::Store::new(file_opts.clone()); + let (mut mdb, lookup) = iroh_bytes::store::readonly_mem::Store::new(file_opts.clone()); let mut blobs = Vec::new(); let mut total_blobs_size = 0u64; @@ -348,7 +348,7 @@ async fn test_server_close() { let rt = test_runtime(); // Prepare a Provider transferring a file. let _guard = iroh_test::logging::setup(); - let mut db = iroh::baomap::readonly_mem::Store::default(); + let mut db = iroh_bytes::store::readonly_mem::Store::default(); let child_hash = db.insert(b"hello there"); let collection = Collection::new( vec![Blob { @@ -409,8 +409,8 @@ async fn test_server_close() { /// returns the database and the root hash of the collection fn create_test_db( entries: impl IntoIterator, impl AsRef<[u8]>)>, -) -> (iroh::baomap::readonly_mem::Store, Hash) { - let (mut db, hashes) = iroh::baomap::readonly_mem::Store::new(entries); +) -> (iroh_bytes::store::readonly_mem::Store, Hash) { + let (mut db, hashes) = iroh_bytes::store::readonly_mem::Store::new(entries); let collection = Collection::new( hashes .into_iter() @@ -459,7 +459,7 @@ async fn test_not_found() { let _ = iroh_test::logging::setup(); let rt = test_runtime(); - let db = iroh::baomap::readonly_mem::Store::default(); + let db = iroh_bytes::store::readonly_mem::Store::default(); let hash = blake3::hash(b"hello").into(); let addr = (Ipv6Addr::UNSPECIFIED, 0).into(); let node = match test_node(db, addr).runtime(&rt).spawn().await { @@ -501,7 +501,7 @@ async fn test_chunk_not_found_1() { let _ = iroh_test::logging::setup(); let rt = test_runtime(); - let db = iroh::baomap::mem::Store::new(rt.clone()); + let db = iroh_bytes::store::mem::Store::new(rt.clone()); let data = (0..1024 * 64).map(|i| i as u8).collect::>(); let hash = blake3::hash(&data).into(); let _entry = db.get_or_create_partial(hash, data.len() as u64).unwrap(); @@ -666,7 +666,7 @@ async fn test_size_request_blob() { let rt = test_runtime(); let expected = make_test_data(1024 * 64 + 1234); let last_chunk = last_chunk(&expected); - let (db, hashes) = iroh::baomap::readonly_mem::Store::new([("test", &expected)]); + let (db, hashes) = iroh_bytes::store::readonly_mem::Store::new([("test", &expected)]); let hash = Hash::from(*hashes.values().next().unwrap()); let addr = "127.0.0.1:0".parse().unwrap(); let node = test_node(db, addr).runtime(&rt).spawn().await.unwrap(); diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 3790254c5c..6fe28723b0 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -30,8 +30,8 @@ fn test_runtime() -> runtime::Handle { fn test_node( rt: runtime::Handle, addr: SocketAddr, -) -> Builder { - let db = iroh::baomap::mem::Store::new(rt.clone()); +) -> Builder { + let db = iroh_bytes::store::mem::Store::new(rt.clone()); let store = iroh_sync::store::memory::Store::default(); Node::builder(db, store).runtime(&rt).bind_addr(addr) } @@ -39,7 +39,7 @@ fn test_node( async fn spawn_node( rt: runtime::Handle, i: usize, -) -> anyhow::Result> { +) -> anyhow::Result> { let node = test_node(rt, "127.0.0.1:0".parse()?); let node = node.spawn().await?; info!("spawned node {i} {:?}", node.peer_id()); @@ -49,7 +49,7 @@ async fn spawn_node( async fn spawn_nodes( rt: runtime::Handle, n: usize, -) -> anyhow::Result>> { +) -> anyhow::Result>> { futures::future::join_all((0..n).map(|i| spawn_node(rt.clone(), i))) .await .into_iter() @@ -307,7 +307,7 @@ async fn sync_subscribe_stop() -> Result<()> { #[tokio::test] async fn doc_delete() -> Result<()> { let rt = test_runtime(); - let db = iroh::baomap::mem::Store::new(rt.clone()); + let db = iroh_bytes::store::mem::Store::new(rt.clone()); let store = iroh_sync::store::memory::Store::default(); let addr = "127.0.0.1:0".parse().unwrap(); let node = Node::builder(db, store)