Skip to content

Commit

Permalink
refactor: rename some things
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 3, 2023
1 parent e45e12f commit 76b345e
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 47 deletions.
34 changes: 15 additions & 19 deletions iroh-sync/src/state_vector.rs → iroh-sync/src/heads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ use serde::{Deserialize, Serialize};

use crate::AuthorId;

/// State vector for a replica
///
/// Contains the timestamp for the latest entry for each author.
///
// TODO: compressed binary storage if many authors, eg by hashing "old" authors.
/// Timestamps of the latest entry for each author.
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Default)]
pub struct StateVector {
pub struct AuthorHeads {
heads: BTreeMap<AuthorId, u64>,
}

impl StateVector {
impl AuthorHeads {
/// Insert a new timestamp.
pub fn insert(&mut self, author: AuthorId, timestamp: u64) {
self.heads
Expand All @@ -26,8 +22,8 @@ impl StateVector {
}
}

impl StateVector {
/// Can this state vector offer newer stuff to `other`?
impl AuthorHeads {
/// Can this state offer newer stuff to `other`?
pub fn has_news_for(&self, other: &Self) -> bool {
for (a, t) in self.heads.iter() {
match other.heads.get(a) {
Expand All @@ -42,26 +38,26 @@ impl StateVector {
false
}

/// Merge another state vector into this one.
/// Merge another author head state into this one.
pub fn merge(&mut self, other: &Self) {
for (a, t) in other.iter() {
self.insert(*a, *t);
}
}

/// Create an iterator over the entries in this state vector.
/// Create an iterator over the entries in this state.
pub fn iter(&self) -> std::collections::btree_map::Iter<AuthorId, u64> {
self.heads.iter()
}
}

/// Progress tracker for sync runs.
/// Outcome of a sync operation.
#[derive(Debug, Clone, Default)]
pub struct SyncProgress {
///
pub state_vector: StateVector,
///
pub entries_recv: usize,
///
pub entries_sent: usize,
pub struct SyncOutcome {
/// Timestamp of the latest entry for each author in the set we received.
pub heads_received: AuthorHeads,
/// Number of entries we received.
pub num_recv: usize,
/// Number of entries we sent.
pub num_sent: usize,
}
4 changes: 2 additions & 2 deletions iroh-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ pub mod metrics;
#[cfg(feature = "net")]
pub mod net;
mod ranger;
mod state_vector;
mod heads;
pub mod store;
pub mod sync;

pub use keys::*;
pub use state_vector::*;
pub use heads::*;
pub use sync::*;
4 changes: 2 additions & 2 deletions iroh-sync/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
net::codec::{run_alice, BobState},
store,
sync::AsyncReplica as Replica,
NamespaceId, SyncProgress,
NamespaceId, SyncOutcome,
};

#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -141,7 +141,7 @@ pub struct SyncFinished {
///
pub peer: PublicKey,
///
pub progress: SyncProgress,
pub progress: SyncOutcome,
}

/// Errors that may occur on handling incoming sync connections.
Expand Down
14 changes: 7 additions & 7 deletions iroh-sync/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::trace;

use crate::{
net::{AbortReason, AcceptError, AcceptOutcome, ConnectError},
store, AsyncReplica as Replica, NamespaceId, SyncProgress,
store, AsyncReplica as Replica, NamespaceId, SyncOutcome,
};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -93,12 +93,12 @@ pub(super) async fn run_alice<S: store::Store, R: AsyncRead + Unpin, W: AsyncWri
reader: &mut R,
alice: &Replica<S::Instance>,
peer: PublicKey,
) -> Result<SyncProgress, ConnectError> {
) -> Result<SyncOutcome, ConnectError> {
let peer = *peer.as_bytes();
let mut reader = FramedRead::new(reader, SyncCodec);
let mut writer = FramedWrite::new(writer, SyncCodec);

let mut progress = SyncProgress::default();
let mut progress = SyncOutcome::default();

// Init message

Expand Down Expand Up @@ -154,7 +154,7 @@ pub(super) async fn run_bob<S, R, W, F, Fut>(
reader: &mut R,
accept_cb: F,
other_peer_id: PublicKey,
) -> Result<(NamespaceId, SyncProgress), AcceptError>
) -> Result<(NamespaceId, SyncOutcome), AcceptError>
where
S: store::Store,
R: AsyncRead + Unpin,
Expand All @@ -171,7 +171,7 @@ where
pub struct BobState<S: store::Store> {
replica: Option<Replica<S::Instance>>,
peer: PublicKey,
progress: SyncProgress,
progress: SyncOutcome,
}

impl<S: store::Store> BobState<S> {
Expand Down Expand Up @@ -270,8 +270,8 @@ impl<S: store::Store> BobState<S> {
self.replica.as_ref().map(|r| r.namespace()).to_owned()
}

/// Consume self and get the [`SyncProgress`] for this connection.
pub fn into_progress(self) -> SyncProgress {
/// Consume self and get the [`SyncOutcome`] for this connection.
pub fn into_progress(self) -> SyncOutcome {
self.progress
}
}
Expand Down
6 changes: 3 additions & 3 deletions iroh-sync/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};

use crate::{
ranger,
state_vector::StateVector,
heads::AuthorHeads,
sync::{Author, Namespace, Replica, SignedEntry},
AuthorId, NamespaceId,
};
Expand Down Expand Up @@ -97,9 +97,9 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static {

/// Check if a state vector contains pointers that we do not have locally.
// TODO: This default impl is horrifyingly inefficient. Remove.
fn has_news_for_us(&self, namespace: NamespaceId, state_vector: &StateVector) -> Result<bool> {
fn has_news_for_us(&self, namespace: NamespaceId, state_vector: &AuthorHeads) -> Result<bool> {
let our_state_vector = {
let mut sv = StateVector::default();
let mut sv = AuthorHeads::default();
let all = self.get_many(namespace, GetFilter::All)?;
for e in all {
let e = e?;
Expand Down
20 changes: 11 additions & 9 deletions iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
ranger::{self, Fingerprint, Peer, RangeEntry, RangeKey},
store::PublicKeyStore,
};
use crate::{state_vector::SyncProgress, store};
use crate::{heads::SyncOutcome, store};

pub use crate::keys::*;

Expand Down Expand Up @@ -137,7 +137,7 @@ impl<S: ranger::Store<SignedEntry> + PublicKeyStore + 'static + Clone + Send + S
&self,
message: crate::ranger::Message<SignedEntry>,
from_peer: PeerIdBytes,
state: &mut SyncProgress,
state: &mut SyncOutcome,
) -> Result<Option<crate::ranger::Message<SignedEntry>>, S::Error> {
// TODO: no clones
let mut state2 = state.clone();
Expand Down Expand Up @@ -374,15 +374,17 @@ impl<S: ranger::Store<SignedEntry> + PublicKeyStore + 'static> Replica<S> {
&self,
message: crate::ranger::Message<SignedEntry>,
from_peer: PeerIdBytes,
state: &mut SyncProgress,
state: &mut SyncOutcome,
) -> Result<Option<crate::ranger::Message<SignedEntry>>, S::Error> {
let expected_namespace = self.namespace();
let now = system_time_now();

// update state with incoming data.
state.entries_recv += message.value_count();
state.num_recv += message.value_count();
for (entry, _content_status) in message.values() {
state.state_vector.insert(entry.author(), entry.timestamp());
state
.heads_received
.insert(entry.author(), entry.timestamp());
}

let reply = self.inner.write().peer.process_message(
Expand Down Expand Up @@ -414,7 +416,7 @@ impl<S: ranger::Store<SignedEntry> + PublicKeyStore + 'static> Replica<S> {

// update state with outgoing data.
if let Some(ref reply) = reply {
state.entries_sent += reply.value_count();
state.num_sent += reply.value_count();
}

Ok(reply)
Expand Down Expand Up @@ -919,7 +921,7 @@ mod tests {

use crate::{
ranger::{Range, Store as _},
state_vector::StateVector,
state_vector::AuthorHeads,
store::{self, GetFilter, Store},
};

Expand Down Expand Up @@ -1451,8 +1453,8 @@ mod tests {
) -> Result<()> {
let alice_peer_id = [1u8; 32];
let bob_peer_id = [2u8; 32];
let mut alice_state = SyncProgress::default();
let mut bob_state = SyncProgress::default();
let mut alice_state = SyncOutcome::default();
let mut bob_state = SyncOutcome::default();
// Sync alice - bob
let mut next_to_bob = Some(alice.sync_initial_message().map_err(Into::into)?);
let mut rounds = 0;
Expand Down
10 changes: 5 additions & 5 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use iroh_sync::{
},
store,
sync::{AsyncReplica as Replica, Entry, InsertOrigin, NamespaceId, SignedEntry},
StateVector,
AuthorHeads,
};
use serde::{Deserialize, Serialize};
use tokio::{
Expand Down Expand Up @@ -61,7 +61,7 @@ pub enum Op {
pub struct SyncReport {
peer: PublicKey,
namespace: NamespaceId,
heads: StateVector,
heads: AuthorHeads,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -834,11 +834,11 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {

// Broadcast a sync report to our neighbors, but only if we received new entries.
if let Ok(state) = &result {
if state.progress.entries_recv > 0 {
if state.progress.num_recv > 0 {
let report = SyncReport {
peer,
namespace,
heads: state.progress.state_vector.clone(),
heads: state.progress.heads_received.clone(),
};
let op = Op::SyncReport(report);
debug!(
Expand Down Expand Up @@ -1079,7 +1079,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
}
}

/// Outcome of a sync operation
/// Event emitted when a sync operation completes
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct SyncEvent {
/// Namespace that was synced
Expand Down

0 comments on commit 76b345e

Please sign in to comment.