Skip to content

Commit

Permalink
better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 9, 2023
1 parent 4f3c573 commit bd8198b
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 46 deletions.
13 changes: 12 additions & 1 deletion iroh-sync/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ pub(super) mod base32 {
let len = bytes.as_ref().len().min(10);
let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]);
text.make_ascii_lowercase();
text.push('…');
text
}
/// Parse from a base32 string into a byte array
Expand Down Expand Up @@ -413,6 +412,12 @@ impl AuthorId {
pub fn into_public_key<S: PublicKeyStore>(&self) -> Result<AuthorPublicKey, SignatureError> {
AuthorPublicKey::from_bytes(&self.0)
}

/// Convert to a base32 string limited to the first 10 bytes for a friendly string
/// representation of the key.
pub fn fmt_short(&self) -> String {
base32::fmt_short(&self.0)
}
}

impl NamespaceId {
Expand Down Expand Up @@ -442,6 +447,12 @@ impl NamespaceId {
pub fn into_public_key<S: PublicKeyStore>(&self) -> Result<NamespacePublicKey, SignatureError> {
NamespacePublicKey::from_bytes(&self.0)
}

/// Convert to a base32 string limited to the first 10 bytes for a friendly string
/// representation of the key.
pub fn fmt_short(&self) -> String {
base32::fmt_short(&self.0)
}
}

impl From<&[u8; 32]> for NamespaceId {
Expand Down
46 changes: 31 additions & 15 deletions iroh-sync/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn connect_and_sync<S: store::Store>(
) -> Result<SyncFinished, ConnectError> {
let t_start = Instant::now();
let peer_id = peer.peer_id;
debug!(?peer_id, "sync[dial]: connect");
debug!(peer = %peer_id.fmt_short(), "sync[dial]: connect");
let namespace = replica.namespace();
let connection = endpoint
.connect(peer, SYNC_ALPN)
Expand All @@ -45,7 +45,7 @@ pub async fn connect_and_sync<S: store::Store>(
connection.open_bi().await.map_err(ConnectError::connect)?;

let t_connect = t_start.elapsed();
debug!(?peer_id, ?namespace, ?t_connect, "sync[dial]: connected");
debug!(peer = %peer_id.fmt_short(), namespace = %namespace.fmt_short(), ?t_connect, "sync[dial]: connected");

let res = run_alice::<S, _, _>(&mut send_stream, &mut recv_stream, replica, peer_id).await;

Expand All @@ -63,14 +63,29 @@ pub async fn connect_and_sync<S: store::Store>(
}

let t_process = t_start.elapsed() - t_connect;
debug!(
?peer_id,
?namespace,
?res,
?t_connect,
?t_process,
"sync[dial]: done"
);
match &res {
Ok(res) => {
debug!(
peer = %peer_id.fmt_short(),
namespace = %namespace.fmt_short(),
?t_connect,
?t_process,
sent = %res.num_sent,
recv = %res.num_recv,
"sync[dial]: done, sucess"
);
}
Err(err) => {
debug!(
peer = %peer_id.fmt_short(),
namespace = %namespace.fmt_short(),
?t_connect,
?t_process,
?err,
"sync[dial]: done, failed"
);
}
}

let outcome = res?;

Expand Down Expand Up @@ -139,16 +154,17 @@ where
.await
.map_err(|error| AcceptError::close(peer, namespace, error))?;

let namespace = res?;

let t_process = t_start.elapsed() - t_connect;
let namespace = res?;

debug!(
?peer,
?namespace,
peer = %peer.fmt_short(),
namespace = %namespace.fmt_short(),
?t_process,
?t_connect,
"sync[accept]: done"
sent = %progress.num_sent,
recv = %progress.num_recv,
"sync[accept]: done, success"
);

let timings = Timings {
Expand Down
25 changes: 16 additions & 9 deletions iroh-sync/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub(super) async fn run_alice<S: store::Store, R: AsyncRead + Unpin, W: AsyncWri
alice: &Replica<S::Instance>,
peer: PublicKey,
) -> Result<SyncOutcome, ConnectError> {
let peer = *peer.as_bytes();
let peer_bytes = *peer.as_bytes();
let mut reader = FramedRead::new(reader, SyncCodec);
let mut writer = FramedWrite::new(writer, SyncCodec);

Expand All @@ -109,7 +109,7 @@ pub(super) async fn run_alice<S: store::Store, R: AsyncRead + Unpin, W: AsyncWri
.await
.map_err(ConnectError::sync)?,
};
trace!(?peer, "run_alice: send init message");
trace!(peer = %peer.fmt_short(), "run_alice: send init message");
writer
.send(init_message)
.await
Expand All @@ -123,13 +123,13 @@ pub(super) async fn run_alice<S: store::Store, R: AsyncRead + Unpin, W: AsyncWri
return Err(ConnectError::sync(anyhow!("unexpected init message")));
}
Message::Sync(msg) => {
trace!(?peer, "run_alice: recv process message");
trace!(peer = %peer.fmt_short(), "run_alice: recv process message");
let reply = alice
.sync_process_message(msg, peer, &mut progress)
.sync_process_message(msg, peer_bytes, &mut progress)
.await
.map_err(ConnectError::sync)?;
if let Some(msg) = reply {
trace!(?peer, "run_alice: send process message");
trace!(peer = %peer.fmt_short(), "run_alice: send process message");
writer
.send(Message::Sync(msg))
.await
Expand Down Expand Up @@ -223,15 +223,15 @@ impl<S: store::Store> BobState<S> {
});
}
};
trace!(?namespace, peer = ?self.peer, "run_bob: recv init message");
trace!(namespace = %namespace.fmt_short(), peer = ?self.peer, "run_bob: recv init message");
let next = replica
.sync_process_message(message, *self.peer.as_bytes(), &mut self.progress)
.await;
self.replica = Some(replica);
next
}
(Message::Sync(msg), Some(replica)) => {
trace!(namespace = ?replica.namespace(), peer = ?self.peer, "run_bob: recv process message");
trace!(namespace = %replica.namespace().fmt_short(), peer = %self.peer.fmt_short(), "run_bob: recv process message");
replica
.sync_process_message(msg, *self.peer.as_bytes(), &mut self.progress)
.await
Expand All @@ -249,7 +249,7 @@ impl<S: store::Store> BobState<S> {
let next = next.map_err(|e| self.fail(e))?;
match next {
Some(msg) => {
trace!(namespace = ?self.namespace(), peer = ?self.peer, "run_bob: send process message");
trace!(namespace = %self.fmt_namespace(), peer = %self.peer.fmt_short(), "run_bob: send process message");
writer
.send(Message::Sync(msg))
.await
Expand All @@ -259,7 +259,7 @@ impl<S: store::Store> BobState<S> {
}
}

trace!(namespace = ?self.namespace().unwrap(), peer = ?self.peer, "run_bob: finished");
trace!(namespace = %self.fmt_namespace(), peer = %self.peer.fmt_short(), "run_bob: finished");

self.namespace()
.ok_or_else(|| self.fail(anyhow!("Stream closed before init message")))
Expand All @@ -270,6 +270,13 @@ impl<S: store::Store> BobState<S> {
self.replica.as_ref().map(|r| r.namespace()).to_owned()
}

pub fn fmt_namespace(&self) -> String {
match self.namespace() {
Some(ns) => ns.fmt_short(),
None => "none".to_string(),
}
}

/// Consume self and get the [`SyncOutcome`] for this connection.
pub fn into_outcome(self) -> SyncOutcome {
self.progress
Expand Down
50 changes: 29 additions & 21 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,22 +467,22 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
let (topic, event) = event.context("gossip_events closed")??;
if let Err(err) = self.on_gossip_event(topic, event).await {
let namespace: NamespaceId = topic.as_bytes().into();
error!(?namespace, ?err, "Failed to process gossip event");
error!(namespace = %namespace.fmt_short(), ?err, "Failed to process gossip event");
}
},
event = self.replica_events.next(), if !self.replica_events.is_empty() => {
trace!(?i, "tick: replica_event");
let (origin, entry) = event.context("replica_events closed")?;
let namespace = entry.namespace();
if let Err(err) = self.on_replica_event(origin, entry).await {
error!(?namespace, ?err, "Failed to process replica event");
error!(namespace = %namespace.fmt_short(), ?err, "Failed to process replica event");
}
}
res = self.running_sync_connect.next(), if !self.running_sync_connect.is_empty() => {
trace!(?i, "tick: on_sync_via_connect_finished");
let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
if let Err(err) = self.on_sync_via_connect_finished(namespace, peer, reason, res).await {
error!(?namespace, ?err, "Failed to process outgoing sync request");
error!(namespace = %namespace.fmt_short(), ?err, "Failed to process outgoing sync request");
}

}
Expand All @@ -495,11 +495,11 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
}
res = self.pending_joins.next(), if !self.pending_joins.is_empty() => {
trace!(?i, "tick: pending_joins");
let (namespace, res )= res.context("pending_joins closed")?;
let (namespace, res) = res.context("pending_joins closed")?;
if let Err(err) = res {
error!(?namespace, %err, "failed to join gossip");
error!(namespace = %namespace.fmt_short(), %err, "failed to join gossip");
} else {
debug!(?namespace, "joined gossip");
debug!(namespace = %namespace.fmt_short(), "joined gossip");
}
// TODO: maintain some join state
}
Expand Down Expand Up @@ -567,7 +567,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
_ => return,
},
};
debug!(?namespace, peer = %peer.fmt_short(), ?reason, last_state = ?self.get_sync_state(namespace, peer), "sync[dial]: start");
debug!(namespace = %namespace.fmt_short(), peer = %peer.fmt_short(), ?reason, last_state = ?self.get_sync_state(namespace, peer), "sync[dial]: start");

self.set_sync_state(namespace, peer, SyncState::Dialing);
let endpoint = self.endpoint.clone();
Expand Down Expand Up @@ -742,9 +742,13 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
let peer_ids = peer_ids.clone();
let gossip = self.gossip.clone();
async move {
match gossip.join(namespace.into(), peer_ids).await {
match gossip.join(namespace.into(), peer_ids.clone()).await {
Err(err) => (namespace, Err(err)),
Ok(fut) => (namespace, fut.await),
Ok(fut) => {
let res = (namespace, fut.await);
debug!(?res, ?peer_ids, namespace = %namespace.fmt_short(), "gossip join");
res
}
}
}
.boxed()
Expand All @@ -768,7 +772,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
debug!(
peer = %peer.fmt_short(),
?namespace,
namespace = %namespace.fmt_short(),
?reason,
"sync[dial]: remote abort, already syncing"
);
Expand Down Expand Up @@ -801,7 +805,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
reason,
}) if reason == AbortReason::AlreadySyncing => {
// In case we aborted the sync: do nothing (our outgoing sync is in progress)
debug!(peer = %peer.fmt_short(), ?namespace, ?reason, "sync[accept]: aborted by us");
debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), ?reason, "sync[accept]: aborted by us");
Ok(())
}
Err(err) => {
Expand Down Expand Up @@ -833,7 +837,9 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
// debug log the result, warn in case of errors
match &result {
Ok(res) => log_finished(&origin, res),
Err(err) => warn!(?peer, ?namespace, ?err, ?origin, "sync failed"),
Err(err) => {
warn!(?peer, namespace = %namespace.fmt_short(), ?err, ?origin, "sync failed")
}
}
let state = match result {
Ok(_) => {
Expand All @@ -860,8 +866,9 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
};
let op = Op::SyncReport(report);
debug!(
?namespace,
"broadcast to neighbors: sync report from {peer:?})"
namespace = %namespace.fmt_short(),
from_peer = %peer.fmt_short(),
"broadcast sync report to neighbors"
);
let msg = postcard::to_stdvec(&op)?;
// TODO: We should debounce and merge these neighbor announcements likely.
Expand All @@ -871,7 +878,8 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
.await
{
error!(
?namespace,
namespace = %namespace.fmt_short(),
from_peer = %peer.fmt_short(),
?err,
"Failed to broadcast SyncReport to neighbors"
);
Expand Down Expand Up @@ -907,7 +915,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
let op: Op = postcard::from_bytes(&msg.content)?;
match op {
Op::Put(entry) => {
debug!(peer = ?msg.delivered_from, ?namespace, "received entry via gossip");
debug!(peer = ?msg.delivered_from, namespace = %namespace.fmt_short(), "received entry via gossip");
// Insert the entry into our replica.
// If the message was broadcast with neighbor scope, or is received
// directly from the author, we assume that the content is available at
Expand Down Expand Up @@ -939,14 +947,14 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
.replica_store
.has_news_for_us(report.namespace, &report.heads)?
{
debug!(?namespace, peer = %peer.fmt_short(), "recv sync report: have news, sync now");
debug!(namespace = %namespace.fmt_short(), peer = %peer.fmt_short(), "recv sync report: have news, sync now");
self.sync_with_peer(
report.namespace,
peer,
SyncReason::ResyncAfterReport,
);
} else {
debug!(?namespace, peer = %peer.fmt_short(), "recv sync report: no news");
debug!(namespace = %namespace.fmt_short(), peer = %peer.fmt_short(), "recv sync report: no news");
}
}
}
Expand All @@ -955,14 +963,14 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
// [Self::sync_with_peer] will check to not resync with peers synced previously in the
// same session. TODO: Maybe this is too broad and leads to too many sync requests.
Event::NeighborUp(peer) => {
debug!(peer = %peer.fmt_short(), ?namespace, "neighbor up");
debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
if let Some(subs) = self.event_subscriptions.get_mut(&namespace) {
notify_all(subs, LiveEvent::NeighborUp(peer)).await;
}
}
Event::NeighborDown(peer) => {
debug!(peer = %peer.fmt_short(), ?namespace, "neighbor down");
debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
if let Some(subs) = self.event_subscriptions.get_mut(&namespace) {
notify_all(subs, LiveEvent::NeighborDown(peer)).await;
}
Expand All @@ -986,7 +994,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
// A new entry was inserted locally. Broadcast a gossip message.
let op = Op::Put(signed_entry);
let message = postcard::to_stdvec(&op)?.into();
debug!(?namespace, "broadcast new entry");
debug!(namespace = %namespace.fmt_short(), "broadcast new entry");
self.gossip.broadcast(topic, message).await?;

// Notify subscribers about the event
Expand Down

0 comments on commit bd8198b

Please sign in to comment.