From ef8c64b79c480ee6f133f1bb0386aea3c1686576 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Thu, 12 Oct 2023 19:51:49 +0200 Subject: [PATCH] fix: actually transfer newer entries for identical keys (#1630) ## Description We had a bug that just was never tested for: When both peers have an entry with identical ranger keys (i.e. identical namespace, author and key), when peer A received an entry from peer B, it would never send their matching entry, even if their entry has a higher timestamp. This PR fixes this! The actual fix is a single line: /~https://github.com/n0-computer/iroh/pull/1630/files#diff-be755583e5a892a8b9a6329e59cfc17e9633447fb7992db74eef670ba1508ccbR378 In addition, the PR adds both a manual test for this and a proptest strategy, which are both properly failing without the change. It also renames the prefix methods on the ranger store trait that were introduced in #1535 to have a clearer naming (which helped me while tracking this down), and improves naming and logic around the prefix in other places. ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. --- iroh-sync/proptest-regressions/ranger.txt | 1 + iroh-sync/src/ranger.rs | 185 +++++++++++++++------- iroh-sync/src/store/fs.rs | 7 +- iroh-sync/src/store/memory.rs | 7 +- iroh-sync/src/sync.rs | 2 +- 5 files changed, 134 insertions(+), 68 deletions(-) diff --git a/iroh-sync/proptest-regressions/ranger.txt b/iroh-sync/proptest-regressions/ranger.txt index 5986c689de..8e8177c0e4 100644 --- a/iroh-sync/proptest-regressions/ranger.txt +++ b/iroh-sync/proptest-regressions/ranger.txt @@ -6,3 +6,4 @@ # everyone who runs the test benefits from these saved cases. cc 797e83179f8684388880e25a6fac7b4047eb15b03c55c1fb725b82bdbd0a4369 # shrinks to a = {TestKey("3"): ()}, b = {TestKey(""): (), TestKey("3"): (), TestKey("4"): (), TestKey("5"): (), TestKey("a"): (), TestKey("b"): (), TestKey("c"): ()} cc f5b7604319ead6181c2ff42e53f05e2c6f0298adf0b38ea4ae4710c43abb7663 # shrinks to input = _SimpleStoreSyncArgs { alice: [(3, ()), (a, ())], bob: [(, ()), (0, ()), (b, ())] } +cc 41d9d33f002235dfe4bed83621fe79348725bbe00931451782025d98c1b81522 # shrinks to input = _SimpleStoreSyncU8Args { alice: [("", 58)], bob: [("", 0)] } diff --git a/iroh-sync/src/ranger.rs b/iroh-sync/src/ranger.rs index 96d0fade14..446b7613bc 100644 --- a/iroh-sync/src/ranger.rs +++ b/iroh-sync/src/ranger.rs @@ -44,7 +44,7 @@ pub trait RangeKey: Sized + Debug + Ord + PartialEq + Clone + 'static { fn is_prefix_of(&self, other: &Self) -> bool; /// Returns true if `other` is a prefix of `self`. - fn has_prefix(&self, other: &Self) -> bool { + fn is_prefixed_by(&self, other: &Self) -> bool { other.is_prefix_of(self) } } @@ -249,10 +249,10 @@ pub trait Store: Sized { fn get_range(&self, range: Range) -> Result, Self::Error>; /// Returns all entries whose key starts with the given `prefix`. - fn get_prefix(&self, prefix: &E::Key) -> Result, Self::Error>; + fn prefixed_by(&self, prefix: &E::Key) -> Result, Self::Error>; /// Returns all entries that share a prefix with `key`, including the entry for `key` itself. - fn get_with_parents(&self, key: &E::Key) -> Result, Self::Error>; + fn prefixes_of(&self, key: &E::Key) -> Result, Self::Error>; /// Get all entries in the store fn all(&self) -> Result, Self::Error>; @@ -366,15 +366,18 @@ where None } else { Some( + // we get the range of the item form our store. from this set, we remove all + // entries that whose key is contained in the peer's set and where our value is + // lower than the peer entry's value. self.store .get_range(range.clone())? - .filter_map(|existing| match existing { - Ok(existing) => { - if !values - .iter() - .any(|(entry, _)| existing.key() == entry.key()) - { - Some(Ok(existing)) + .filter_map(|our_entry| match our_entry { + Ok(our_entry) => { + if !values.iter().any(|(their_entry, _)| { + our_entry.key() == their_entry.key() + && their_entry.value() >= our_entry.value() + }) { + Some(Ok(our_entry)) } else { None } @@ -563,15 +566,15 @@ where /// Returns `true` if the entry was inserted. /// Returns `false` if it was not inserted. pub fn put(&mut self, entry: E) -> Result { - let parents = self.store.get_with_parents(entry.key())?; + let prefix_entry = self.store.prefixes_of(entry.key())?; // First we check if our entry is strictly greater than all parent elements. // From the willow spec: // "Remove all entries whose timestamp is strictly less than the timestamp of any other entry [..] // whose path is a prefix of p." and then "remove all but those whose record has the greatest hash component". // This is the contract of the `Ord` impl for `E::Value`. - for e in parents { - let e = e?; - if entry.value() < e.value() { + for prefix_entry in prefix_entry { + let prefix_entry = prefix_entry?; + if entry.value() <= prefix_entry.value() { return Ok(InsertOutcome::NotInserted); } } @@ -579,7 +582,7 @@ where // Now we remove all entries that have our key as a prefix and are older than our entry. let removed = self .store - .remove_prefix_filtered(entry.key(), |value| value < entry.value())?; + .remove_prefix_filtered(entry.key(), |value| entry.value() >= value)?; // Insert our new entry. self.store.put(entry)?; @@ -659,17 +662,18 @@ mod tests { impl RangeKey for &'static str { fn is_prefix_of(&self, other: &Self) -> bool { - self.starts_with(other) + other.starts_with(self) } } impl RangeKey for String { fn is_prefix_of(&self, other: &Self) -> bool { - self.starts_with(other) + other.starts_with(self) } } impl RangeValue for &'static [u8] {} impl RangeValue for i32 {} + impl RangeValue for u8 {} impl RangeValue for () {} impl Store<(K, V)> for SimpleStore @@ -745,7 +749,7 @@ mod tests { } // TODO: Not horrible. - fn get_with_parents(&self, key: &K) -> Result, Self::Error> { + fn prefixes_of(&self, key: &K) -> Result, Self::Error> { let mut res = vec![]; for (k, v) in self.data.iter() { if k.is_prefix_of(key) { @@ -755,7 +759,7 @@ mod tests { Ok(res.into_iter()) } - fn get_prefix(&self, prefix: &K) -> Result, Self::Error> { + fn prefixed_by(&self, prefix: &K) -> Result, Self::Error> { let iter = self.data.iter(); Ok(SimpleRangeIterator { iter, @@ -841,7 +845,6 @@ mod tests { assert_eq!(res.bob_to_alice[0].parts.len(), 2); assert!(res.bob_to_alice[0].parts[0].is_range_fingerprint()); assert!(res.bob_to_alice[0].parts[1].is_range_fingerprint()); - // Last response from Alice assert_eq!(res.alice_to_bob[1].parts.len(), 3); assert!(res.alice_to_bob[1].parts[0].is_range_fingerprint()); @@ -940,6 +943,16 @@ mod tests { assert_eq!(res.bob_to_alice.len(), 1, "B -> A message count"); } + #[test] + fn test_equal_key_higher_value() { + let alice_set = [("foo", 2)]; + let bob_set = [("foo", 1)]; + + let res = sync(&alice_set, &bob_set); + assert_eq!(res.alice_to_bob.len(), 2, "A -> B message count"); + assert_eq!(res.bob_to_alice.len(), 1, "B -> A message count"); + } + #[test] fn test_multikey() { /// Uses the blanket impl of [`RangeKey]` for `T: AsRef<[u8]>` in this module. @@ -1176,13 +1189,23 @@ mod tests { sync_with_validate_cb_and_assert(alice_set, bob_set, &alice_validate_cb, &bob_validate_cb) } - fn insert_if_larger(map: &mut BTreeMap, k: K, v: V) { - let insert = match map.get(&k) { - None => true, - Some(v2) => v > *v2, - }; + fn insert_if_larger(map: &mut BTreeMap, key: K, value: V) { + let mut insert = true; + for (k, v) in map.iter() { + if k.is_prefix_of(&key) && v >= &value { + insert = false; + } + } if insert { - map.insert(k, v); + #[allow(clippy::needless_bool)] + map.retain(|k, v| { + if key.is_prefix_of(k) && value >= *v { + false + } else { + true + } + }); + map.insert(key, value); } } @@ -1198,37 +1221,58 @@ mod tests { F1: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, F2: Fn(&SimpleStore, &(K, V), ContentStatus) -> bool, { - let mut expected_set = BTreeMap::new(); - let mut alice = Peer::<(K, V), SimpleStore>::default(); - for e in alice_set { - alice.put(e.clone()).unwrap(); - insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); - } - let mut bob = Peer::<(K, V), SimpleStore>::default(); - for e in bob_set { - bob.put(e.clone()).unwrap(); - insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); - } - let res = sync_exchange_messages(alice, bob, alice_validate_cb, bob_validate_cb, 100); + let expected_set = { + let mut expected_set = BTreeMap::new(); + let mut alice_expected = BTreeMap::new(); + for e in alice_set { + alice.put(e.clone()).unwrap(); + insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); + insert_if_larger(&mut alice_expected, e.0.clone(), e.1.clone()); + } + let alice_expected = alice_expected.into_iter().collect::>(); + let alice_now: Vec<_> = alice.all().unwrap().collect::>().unwrap(); + assert_eq!( + alice_expected, alice_now, + "alice initial set does not match" + ); - res.print_messages(); + let mut bob_expected = BTreeMap::new(); + for e in bob_set { + bob.put(e.clone()).unwrap(); + insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone()); + insert_if_larger(&mut bob_expected, e.0.clone(), e.1.clone()); + } + let bob_expected = bob_expected.into_iter().collect::>(); + let bob_now: Vec<_> = bob.all().unwrap().collect::>().unwrap(); + assert_eq!(bob_expected, bob_now, "bob initial set does not match"); + + expected_set.into_iter().collect::>() + }; + + let res = sync_exchange_messages(alice, bob, alice_validate_cb, bob_validate_cb, 100); let alice_now: Vec<_> = res.alice.all().unwrap().collect::>().unwrap(); - assert_eq!( - alice_now.into_iter().collect::>(), - expected_set.clone().into_iter().collect::>(), - "alice" - ); + if alice_now != expected_set { + res.print_messages(); + println!("alice_init: {alice_set:?}"); + println!("bob_init: {bob_set:?}"); + println!("expected: {expected_set:?}"); + println!("alice_now: {alice_now:?}"); + panic!("alice_now does not match expected"); + } let bob_now: Vec<_> = res.bob.all().unwrap().collect::>().unwrap(); - assert_eq!( - bob_now.into_iter().collect::>(), - expected_set.into_iter().collect::>(), - "bob" - ); + if bob_now != expected_set { + res.print_messages(); + println!("alice_init: {alice_set:?}"); + println!("bob_init: {bob_set:?}"); + println!("expected: {expected_set:?}"); + println!("bob_now: {bob_now:?}"); + panic!("bob_now does not match expected"); + } // Check that values were never sent twice let mut alice_sent = BTreeMap::new(); @@ -1374,18 +1418,30 @@ mod tests { assert_eq!(excluded[3].0, "hog"); } - type TestSet = BTreeMap; + type TestSetStringUnit = BTreeMap; + type TestSetStringU8 = BTreeMap; fn test_key() -> impl Strategy { "[a-z0-9]{0,5}" } - fn test_set() -> impl Strategy { + fn test_set_string_unit() -> impl Strategy { prop::collection::btree_map(test_key(), Just(()), 0..10) } - fn test_vec() -> impl Strategy> { - test_set().prop_map(|m| m.into_iter().collect::>()) + fn test_set_string_u8() -> impl Strategy { + prop::collection::btree_map(test_key(), test_value_u8(), 0..10) + } + + fn test_value_u8() -> impl Strategy { + 0u8..u8::MAX + } + + fn test_vec_string_unit() -> impl Strategy> { + test_set_string_unit().prop_map(|m| m.into_iter().collect::>()) + } + fn test_vec_string_u8() -> impl Strategy> { + test_set_string_u8().prop_map(|m| m.into_iter().collect::>()) } fn test_range() -> impl Strategy> { @@ -1393,7 +1449,7 @@ mod tests { (test_key(), test_key()).prop_map(|(x, y)| Range::new(x, y)) } - fn mk_test_set(values: impl IntoIterator>) -> TestSet { + fn mk_test_set(values: impl IntoIterator>) -> TestSetStringUnit { values .into_iter() .map(|v| v.as_ref().to_string()) @@ -1412,6 +1468,13 @@ mod tests { let _res = sync(&alice, &bob); } + #[test] + fn simple_store_sync_x() { + let alice = mk_test_vec(["1", "3"]); + let bob = mk_test_vec(["2"]); + let _res = sync(&alice, &bob); + } + #[test] fn simple_store_sync_2() { let alice = mk_test_vec(["1", "3"]); @@ -1428,8 +1491,16 @@ mod tests { #[proptest] fn simple_store_sync( - #[strategy(test_vec())] alice: Vec<(String, ())>, - #[strategy(test_vec())] bob: Vec<(String, ())>, + #[strategy(test_vec_string_unit())] alice: Vec<(String, ())>, + #[strategy(test_vec_string_unit())] bob: Vec<(String, ())>, + ) { + let _res = sync(&alice, &bob); + } + + #[proptest] + fn simple_store_sync_u8( + #[strategy(test_vec_string_u8())] alice: Vec<(String, u8)>, + #[strategy(test_vec_string_u8())] bob: Vec<(String, u8)>, ) { let _res = sync(&alice, &bob); } @@ -1466,7 +1537,7 @@ mod tests { #[proptest] fn simple_store_get_ranges( - #[strategy(test_set())] contents: BTreeMap, + #[strategy(test_set_string_unit())] contents: BTreeMap, #[strategy(test_range())] range: Range, ) { let (expected, actual) = store_get_ranges_test::, _>(contents, range); diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index bae2c3f751..e608080449 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -648,10 +648,7 @@ impl crate::ranger::Store for StoreInstance { } type ParentIterator<'a> = ParentIterator<'a>; - fn get_with_parents( - &self, - id: &RecordIdentifier, - ) -> Result, Self::Error> { + fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { ParentIterator::create( &self.store.db, id.namespace(), @@ -660,7 +657,7 @@ impl crate::ranger::Store for StoreInstance { ) } - fn get_prefix(&self, prefix: &RecordIdentifier) -> Result> { + fn prefixed_by(&self, prefix: &RecordIdentifier) -> Result> { let start = prefix.as_byte_tuple(); let end = prefix_range_end(&start); let iter = RangeIterator::with_range( diff --git a/iroh-sync/src/store/memory.rs b/iroh-sync/src/store/memory.rs index 7a1b183dd9..baf6742528 100644 --- a/iroh-sync/src/store/memory.rs +++ b/iroh-sync/src/store/memory.rs @@ -516,10 +516,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { // TODO: Not horrible. type ParentIterator<'a> = std::vec::IntoIter>; - fn get_with_parents( - &self, - id: &RecordIdentifier, - ) -> Result, Self::Error> { + fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { let mut entries = vec![]; let mut key = id.key().to_vec(); while !key.is_empty() { @@ -534,7 +531,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { Ok(entries.into_iter()) } - fn get_prefix( + fn prefixed_by( &self, prefix: &RecordIdentifier, ) -> std::result::Result, Self::Error> { diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index bbd5092c85..e85c4626fa 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -775,7 +775,7 @@ impl Debug for RecordIdentifier { impl RangeKey for RecordIdentifier { fn is_prefix_of(&self, other: &Self) -> bool { - self.as_bytes().starts_with(other.as_bytes()) + other.as_bytes().starts_with(self.as_bytes()) } }