Skip to content

Commit

Permalink
fix: actually transfer newer entries for identical keys (#1630)
Browse files Browse the repository at this point in the history
## Description

We had a bug that just was never tested for: When both peers have an
entry with identical ranger keys (i.e. identical namespace, author and
key), when peer A received an entry from peer B, it would never send
their matching entry, even if their entry has a higher timestamp.

This PR fixes this! The actual fix is a single line:
/~https://github.com/n0-computer/iroh/pull/1630/files#diff-be755583e5a892a8b9a6329e59cfc17e9633447fb7992db74eef670ba1508ccbR378

In addition, the PR adds both a manual test for this and a proptest
strategy, which are both properly failing without the change.

It also renames the prefix methods on the ranger store trait that were
introduced in #1535 to have a clearer naming (which helped me while
tracking this down), and improves naming and logic around the prefix in
other places.

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
  • Loading branch information
Frando authored Oct 12, 2023
1 parent d187827 commit ef8c64b
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 68 deletions.
1 change: 1 addition & 0 deletions iroh-sync/proptest-regressions/ranger.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
# everyone who runs the test benefits from these saved cases.
cc 797e83179f8684388880e25a6fac7b4047eb15b03c55c1fb725b82bdbd0a4369 # shrinks to a = {TestKey("3"): ()}, b = {TestKey(""): (), TestKey("3"): (), TestKey("4"): (), TestKey("5"): (), TestKey("a"): (), TestKey("b"): (), TestKey("c"): ()}
cc f5b7604319ead6181c2ff42e53f05e2c6f0298adf0b38ea4ae4710c43abb7663 # shrinks to input = _SimpleStoreSyncArgs { alice: [(3, ()), (a, ())], bob: [(, ()), (0, ()), (b, ())] }
cc 41d9d33f002235dfe4bed83621fe79348725bbe00931451782025d98c1b81522 # shrinks to input = _SimpleStoreSyncU8Args { alice: [("", 58)], bob: [("", 0)] }
185 changes: 128 additions & 57 deletions iroh-sync/src/ranger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub trait RangeKey: Sized + Debug + Ord + PartialEq + Clone + 'static {
fn is_prefix_of(&self, other: &Self) -> bool;

/// Returns true if `other` is a prefix of `self`.
fn has_prefix(&self, other: &Self) -> bool {
fn is_prefixed_by(&self, other: &Self) -> bool {
other.is_prefix_of(self)
}
}
Expand Down Expand Up @@ -249,10 +249,10 @@ pub trait Store<E: RangeEntry>: Sized {
fn get_range(&self, range: Range<E::Key>) -> Result<Self::RangeIterator<'_>, Self::Error>;

/// Returns all entries whose key starts with the given `prefix`.
fn get_prefix(&self, prefix: &E::Key) -> Result<Self::RangeIterator<'_>, Self::Error>;
fn prefixed_by(&self, prefix: &E::Key) -> Result<Self::RangeIterator<'_>, Self::Error>;

/// Returns all entries that share a prefix with `key`, including the entry for `key` itself.
fn get_with_parents(&self, key: &E::Key) -> Result<Self::ParentIterator<'_>, Self::Error>;
fn prefixes_of(&self, key: &E::Key) -> Result<Self::ParentIterator<'_>, Self::Error>;

/// Get all entries in the store
fn all(&self) -> Result<Self::RangeIterator<'_>, Self::Error>;
Expand Down Expand Up @@ -366,15 +366,18 @@ where
None
} else {
Some(
// we get the range of the item form our store. from this set, we remove all
// entries that whose key is contained in the peer's set and where our value is
// lower than the peer entry's value.
self.store
.get_range(range.clone())?
.filter_map(|existing| match existing {
Ok(existing) => {
if !values
.iter()
.any(|(entry, _)| existing.key() == entry.key())
{
Some(Ok(existing))
.filter_map(|our_entry| match our_entry {
Ok(our_entry) => {
if !values.iter().any(|(their_entry, _)| {
our_entry.key() == their_entry.key()
&& their_entry.value() >= our_entry.value()
}) {
Some(Ok(our_entry))
} else {
None
}
Expand Down Expand Up @@ -563,23 +566,23 @@ where
/// Returns `true` if the entry was inserted.
/// Returns `false` if it was not inserted.
pub fn put(&mut self, entry: E) -> Result<InsertOutcome, S::Error> {
let parents = self.store.get_with_parents(entry.key())?;
let prefix_entry = self.store.prefixes_of(entry.key())?;
// First we check if our entry is strictly greater than all parent elements.
// From the willow spec:
// "Remove all entries whose timestamp is strictly less than the timestamp of any other entry [..]
// whose path is a prefix of p." and then "remove all but those whose record has the greatest hash component".
// This is the contract of the `Ord` impl for `E::Value`.
for e in parents {
let e = e?;
if entry.value() < e.value() {
for prefix_entry in prefix_entry {
let prefix_entry = prefix_entry?;
if entry.value() <= prefix_entry.value() {
return Ok(InsertOutcome::NotInserted);
}
}

// Now we remove all entries that have our key as a prefix and are older than our entry.
let removed = self
.store
.remove_prefix_filtered(entry.key(), |value| value < entry.value())?;
.remove_prefix_filtered(entry.key(), |value| entry.value() >= value)?;

// Insert our new entry.
self.store.put(entry)?;
Expand Down Expand Up @@ -659,17 +662,18 @@ mod tests {

impl RangeKey for &'static str {
fn is_prefix_of(&self, other: &Self) -> bool {
self.starts_with(other)
other.starts_with(self)
}
}
impl RangeKey for String {
fn is_prefix_of(&self, other: &Self) -> bool {
self.starts_with(other)
other.starts_with(self)
}
}

impl RangeValue for &'static [u8] {}
impl RangeValue for i32 {}
impl RangeValue for u8 {}
impl RangeValue for () {}

impl<K, V> Store<(K, V)> for SimpleStore<K, V>
Expand Down Expand Up @@ -745,7 +749,7 @@ mod tests {
}

// TODO: Not horrible.
fn get_with_parents(&self, key: &K) -> Result<Self::ParentIterator<'_>, Self::Error> {
fn prefixes_of(&self, key: &K) -> Result<Self::ParentIterator<'_>, Self::Error> {
let mut res = vec![];
for (k, v) in self.data.iter() {
if k.is_prefix_of(key) {
Expand All @@ -755,7 +759,7 @@ mod tests {
Ok(res.into_iter())
}

fn get_prefix(&self, prefix: &K) -> Result<Self::RangeIterator<'_>, Self::Error> {
fn prefixed_by(&self, prefix: &K) -> Result<Self::RangeIterator<'_>, Self::Error> {
let iter = self.data.iter();
Ok(SimpleRangeIterator {
iter,
Expand Down Expand Up @@ -841,7 +845,6 @@ mod tests {
assert_eq!(res.bob_to_alice[0].parts.len(), 2);
assert!(res.bob_to_alice[0].parts[0].is_range_fingerprint());
assert!(res.bob_to_alice[0].parts[1].is_range_fingerprint());

// Last response from Alice
assert_eq!(res.alice_to_bob[1].parts.len(), 3);
assert!(res.alice_to_bob[1].parts[0].is_range_fingerprint());
Expand Down Expand Up @@ -940,6 +943,16 @@ mod tests {
assert_eq!(res.bob_to_alice.len(), 1, "B -> A message count");
}

#[test]
fn test_equal_key_higher_value() {
let alice_set = [("foo", 2)];
let bob_set = [("foo", 1)];

let res = sync(&alice_set, &bob_set);
assert_eq!(res.alice_to_bob.len(), 2, "A -> B message count");
assert_eq!(res.bob_to_alice.len(), 1, "B -> A message count");
}

#[test]
fn test_multikey() {
/// Uses the blanket impl of [`RangeKey]` for `T: AsRef<[u8]>` in this module.
Expand Down Expand Up @@ -1176,13 +1189,23 @@ mod tests {
sync_with_validate_cb_and_assert(alice_set, bob_set, &alice_validate_cb, &bob_validate_cb)
}

fn insert_if_larger<K: Ord, V: Ord>(map: &mut BTreeMap<K, V>, k: K, v: V) {
let insert = match map.get(&k) {
None => true,
Some(v2) => v > *v2,
};
fn insert_if_larger<K: RangeKey, V: RangeValue>(map: &mut BTreeMap<K, V>, key: K, value: V) {
let mut insert = true;
for (k, v) in map.iter() {
if k.is_prefix_of(&key) && v >= &value {
insert = false;
}
}
if insert {
map.insert(k, v);
#[allow(clippy::needless_bool)]
map.retain(|k, v| {
if key.is_prefix_of(k) && value >= *v {
false
} else {
true
}
});
map.insert(key, value);
}
}

Expand All @@ -1198,37 +1221,58 @@ mod tests {
F1: Fn(&SimpleStore<K, V>, &(K, V), ContentStatus) -> bool,
F2: Fn(&SimpleStore<K, V>, &(K, V), ContentStatus) -> bool,
{
let mut expected_set = BTreeMap::new();

let mut alice = Peer::<(K, V), SimpleStore<K, V>>::default();
for e in alice_set {
alice.put(e.clone()).unwrap();
insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone());
}

let mut bob = Peer::<(K, V), SimpleStore<K, V>>::default();
for e in bob_set {
bob.put(e.clone()).unwrap();
insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone());
}

let res = sync_exchange_messages(alice, bob, alice_validate_cb, bob_validate_cb, 100);
let expected_set = {
let mut expected_set = BTreeMap::new();
let mut alice_expected = BTreeMap::new();
for e in alice_set {
alice.put(e.clone()).unwrap();
insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone());
insert_if_larger(&mut alice_expected, e.0.clone(), e.1.clone());
}
let alice_expected = alice_expected.into_iter().collect::<Vec<_>>();
let alice_now: Vec<_> = alice.all().unwrap().collect::<Result<_, _>>().unwrap();
assert_eq!(
alice_expected, alice_now,
"alice initial set does not match"
);

res.print_messages();
let mut bob_expected = BTreeMap::new();
for e in bob_set {
bob.put(e.clone()).unwrap();
insert_if_larger(&mut expected_set, e.0.clone(), e.1.clone());
insert_if_larger(&mut bob_expected, e.0.clone(), e.1.clone());
}
let bob_expected = bob_expected.into_iter().collect::<Vec<_>>();
let bob_now: Vec<_> = bob.all().unwrap().collect::<Result<_, _>>().unwrap();
assert_eq!(bob_expected, bob_now, "bob initial set does not match");

expected_set.into_iter().collect::<Vec<_>>()
};

let res = sync_exchange_messages(alice, bob, alice_validate_cb, bob_validate_cb, 100);

let alice_now: Vec<_> = res.alice.all().unwrap().collect::<Result<_, _>>().unwrap();
assert_eq!(
alice_now.into_iter().collect::<Vec<_>>(),
expected_set.clone().into_iter().collect::<Vec<_>>(),
"alice"
);
if alice_now != expected_set {
res.print_messages();
println!("alice_init: {alice_set:?}");
println!("bob_init: {bob_set:?}");
println!("expected: {expected_set:?}");
println!("alice_now: {alice_now:?}");
panic!("alice_now does not match expected");
}

let bob_now: Vec<_> = res.bob.all().unwrap().collect::<Result<_, _>>().unwrap();
assert_eq!(
bob_now.into_iter().collect::<Vec<_>>(),
expected_set.into_iter().collect::<Vec<_>>(),
"bob"
);
if bob_now != expected_set {
res.print_messages();
println!("alice_init: {alice_set:?}");
println!("bob_init: {bob_set:?}");
println!("expected: {expected_set:?}");
println!("bob_now: {bob_now:?}");
panic!("bob_now does not match expected");
}

// Check that values were never sent twice
let mut alice_sent = BTreeMap::new();
Expand Down Expand Up @@ -1374,26 +1418,38 @@ mod tests {
assert_eq!(excluded[3].0, "hog");
}

type TestSet = BTreeMap<String, ()>;
type TestSetStringUnit = BTreeMap<String, ()>;
type TestSetStringU8 = BTreeMap<String, u8>;

fn test_key() -> impl Strategy<Value = String> {
"[a-z0-9]{0,5}"
}

fn test_set() -> impl Strategy<Value = TestSet> {
fn test_set_string_unit() -> impl Strategy<Value = TestSetStringUnit> {
prop::collection::btree_map(test_key(), Just(()), 0..10)
}

fn test_vec() -> impl Strategy<Value = Vec<(String, ())>> {
test_set().prop_map(|m| m.into_iter().collect::<Vec<_>>())
fn test_set_string_u8() -> impl Strategy<Value = TestSetStringU8> {
prop::collection::btree_map(test_key(), test_value_u8(), 0..10)
}

fn test_value_u8() -> impl Strategy<Value = u8> {
0u8..u8::MAX
}

fn test_vec_string_unit() -> impl Strategy<Value = Vec<(String, ())>> {
test_set_string_unit().prop_map(|m| m.into_iter().collect::<Vec<_>>())
}
fn test_vec_string_u8() -> impl Strategy<Value = Vec<(String, u8)>> {
test_set_string_u8().prop_map(|m| m.into_iter().collect::<Vec<_>>())
}

fn test_range() -> impl Strategy<Value = Range<String>> {
// ranges with x > y are explicitly allowed - they wrap around
(test_key(), test_key()).prop_map(|(x, y)| Range::new(x, y))
}

fn mk_test_set(values: impl IntoIterator<Item = impl AsRef<str>>) -> TestSet {
fn mk_test_set(values: impl IntoIterator<Item = impl AsRef<str>>) -> TestSetStringUnit {
values
.into_iter()
.map(|v| v.as_ref().to_string())
Expand All @@ -1412,6 +1468,13 @@ mod tests {
let _res = sync(&alice, &bob);
}

#[test]
fn simple_store_sync_x() {
let alice = mk_test_vec(["1", "3"]);
let bob = mk_test_vec(["2"]);
let _res = sync(&alice, &bob);
}

#[test]
fn simple_store_sync_2() {
let alice = mk_test_vec(["1", "3"]);
Expand All @@ -1428,8 +1491,16 @@ mod tests {

#[proptest]
fn simple_store_sync(
#[strategy(test_vec())] alice: Vec<(String, ())>,
#[strategy(test_vec())] bob: Vec<(String, ())>,
#[strategy(test_vec_string_unit())] alice: Vec<(String, ())>,
#[strategy(test_vec_string_unit())] bob: Vec<(String, ())>,
) {
let _res = sync(&alice, &bob);
}

#[proptest]
fn simple_store_sync_u8(
#[strategy(test_vec_string_u8())] alice: Vec<(String, u8)>,
#[strategy(test_vec_string_u8())] bob: Vec<(String, u8)>,
) {
let _res = sync(&alice, &bob);
}
Expand Down Expand Up @@ -1466,7 +1537,7 @@ mod tests {

#[proptest]
fn simple_store_get_ranges(
#[strategy(test_set())] contents: BTreeMap<String, ()>,
#[strategy(test_set_string_unit())] contents: BTreeMap<String, ()>,
#[strategy(test_range())] range: Range<String>,
) {
let (expected, actual) = store_get_ranges_test::<SimpleStore<_, _>, _>(contents, range);
Expand Down
7 changes: 2 additions & 5 deletions iroh-sync/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,10 +648,7 @@ impl crate::ranger::Store<SignedEntry> for StoreInstance {
}

type ParentIterator<'a> = ParentIterator<'a>;
fn get_with_parents(
&self,
id: &RecordIdentifier,
) -> Result<Self::ParentIterator<'_>, Self::Error> {
fn prefixes_of(&self, id: &RecordIdentifier) -> Result<Self::ParentIterator<'_>, Self::Error> {
ParentIterator::create(
&self.store.db,
id.namespace(),
Expand All @@ -660,7 +657,7 @@ impl crate::ranger::Store<SignedEntry> for StoreInstance {
)
}

fn get_prefix(&self, prefix: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
fn prefixed_by(&self, prefix: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
let start = prefix.as_byte_tuple();
let end = prefix_range_end(&start);
let iter = RangeIterator::with_range(
Expand Down
7 changes: 2 additions & 5 deletions iroh-sync/src/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,7 @@ impl crate::ranger::Store<SignedEntry> for ReplicaStoreInstance {

// TODO: Not horrible.
type ParentIterator<'a> = std::vec::IntoIter<Result<SignedEntry, Infallible>>;
fn get_with_parents(
&self,
id: &RecordIdentifier,
) -> Result<Self::ParentIterator<'_>, Self::Error> {
fn prefixes_of(&self, id: &RecordIdentifier) -> Result<Self::ParentIterator<'_>, Self::Error> {
let mut entries = vec![];
let mut key = id.key().to_vec();
while !key.is_empty() {
Expand All @@ -534,7 +531,7 @@ impl crate::ranger::Store<SignedEntry> for ReplicaStoreInstance {
Ok(entries.into_iter())
}

fn get_prefix(
fn prefixed_by(
&self,
prefix: &RecordIdentifier,
) -> std::result::Result<Self::RangeIterator<'_>, Self::Error> {
Expand Down
2 changes: 1 addition & 1 deletion iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ impl Debug for RecordIdentifier {

impl RangeKey for RecordIdentifier {
fn is_prefix_of(&self, other: &Self) -> bool {
self.as_bytes().starts_with(other.as_bytes())
other.as_bytes().starts_with(self.as_bytes())
}
}

Expand Down

0 comments on commit ef8c64b

Please sign in to comment.