From bd8198be9e7c02717940a439fce9b713a900ff0e Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 9 Oct 2023 11:25:34 +0200 Subject: [PATCH] better logging --- iroh-sync/src/keys.rs | 13 +++++++++- iroh-sync/src/net.rs | 46 ++++++++++++++++++++++----------- iroh-sync/src/net/codec.rs | 25 +++++++++++------- iroh/src/sync_engine/live.rs | 50 +++++++++++++++++++++--------------- 4 files changed, 88 insertions(+), 46 deletions(-) diff --git a/iroh-sync/src/keys.rs b/iroh-sync/src/keys.rs index 3fbb52218d..70be5c1e70 100644 --- a/iroh-sync/src/keys.rs +++ b/iroh-sync/src/keys.rs @@ -338,7 +338,6 @@ pub(super) mod base32 { let len = bytes.as_ref().len().min(10); let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]); text.make_ascii_lowercase(); - text.push('…'); text } /// Parse from a base32 string into a byte array @@ -413,6 +412,12 @@ impl AuthorId { pub fn into_public_key(&self) -> Result { AuthorPublicKey::from_bytes(&self.0) } + + /// Convert to a base32 string limited to the first 10 bytes for a friendly string + /// representation of the key. + pub fn fmt_short(&self) -> String { + base32::fmt_short(&self.0) + } } impl NamespaceId { @@ -442,6 +447,12 @@ impl NamespaceId { pub fn into_public_key(&self) -> Result { NamespacePublicKey::from_bytes(&self.0) } + + /// Convert to a base32 string limited to the first 10 bytes for a friendly string + /// representation of the key. + pub fn fmt_short(&self) -> String { + base32::fmt_short(&self.0) + } } impl From<&[u8; 32]> for NamespaceId { diff --git a/iroh-sync/src/net.rs b/iroh-sync/src/net.rs index e06506a94e..3552b77a76 100644 --- a/iroh-sync/src/net.rs +++ b/iroh-sync/src/net.rs @@ -34,7 +34,7 @@ pub async fn connect_and_sync( ) -> Result { let t_start = Instant::now(); let peer_id = peer.peer_id; - debug!(?peer_id, "sync[dial]: connect"); + debug!(peer = %peer_id.fmt_short(), "sync[dial]: connect"); let namespace = replica.namespace(); let connection = endpoint .connect(peer, SYNC_ALPN) @@ -45,7 +45,7 @@ pub async fn connect_and_sync( connection.open_bi().await.map_err(ConnectError::connect)?; let t_connect = t_start.elapsed(); - debug!(?peer_id, ?namespace, ?t_connect, "sync[dial]: connected"); + debug!(peer = %peer_id.fmt_short(), namespace = %namespace.fmt_short(), ?t_connect, "sync[dial]: connected"); let res = run_alice::(&mut send_stream, &mut recv_stream, replica, peer_id).await; @@ -63,14 +63,29 @@ pub async fn connect_and_sync( } let t_process = t_start.elapsed() - t_connect; - debug!( - ?peer_id, - ?namespace, - ?res, - ?t_connect, - ?t_process, - "sync[dial]: done" - ); + match &res { + Ok(res) => { + debug!( + peer = %peer_id.fmt_short(), + namespace = %namespace.fmt_short(), + ?t_connect, + ?t_process, + sent = %res.num_sent, + recv = %res.num_recv, + "sync[dial]: done, sucess" + ); + } + Err(err) => { + debug!( + peer = %peer_id.fmt_short(), + namespace = %namespace.fmt_short(), + ?t_connect, + ?t_process, + ?err, + "sync[dial]: done, failed" + ); + } + } let outcome = res?; @@ -139,16 +154,17 @@ where .await .map_err(|error| AcceptError::close(peer, namespace, error))?; - let namespace = res?; - let t_process = t_start.elapsed() - t_connect; + let namespace = res?; debug!( - ?peer, - ?namespace, + peer = %peer.fmt_short(), + namespace = %namespace.fmt_short(), ?t_process, ?t_connect, - "sync[accept]: done" + sent = %progress.num_sent, + recv = %progress.num_recv, + "sync[accept]: done, success" ); let timings = Timings { diff --git a/iroh-sync/src/net/codec.rs b/iroh-sync/src/net/codec.rs index bf265d6fb7..39ea1bf68b 100644 --- a/iroh-sync/src/net/codec.rs +++ b/iroh-sync/src/net/codec.rs @@ -94,7 +94,7 @@ pub(super) async fn run_alice, peer: PublicKey, ) -> Result { - let peer = *peer.as_bytes(); + let peer_bytes = *peer.as_bytes(); let mut reader = FramedRead::new(reader, SyncCodec); let mut writer = FramedWrite::new(writer, SyncCodec); @@ -109,7 +109,7 @@ pub(super) async fn run_alice { - trace!(?peer, "run_alice: recv process message"); + trace!(peer = %peer.fmt_short(), "run_alice: recv process message"); let reply = alice - .sync_process_message(msg, peer, &mut progress) + .sync_process_message(msg, peer_bytes, &mut progress) .await .map_err(ConnectError::sync)?; if let Some(msg) = reply { - trace!(?peer, "run_alice: send process message"); + trace!(peer = %peer.fmt_short(), "run_alice: send process message"); writer .send(Message::Sync(msg)) .await @@ -223,7 +223,7 @@ impl BobState { }); } }; - trace!(?namespace, peer = ?self.peer, "run_bob: recv init message"); + trace!(namespace = %namespace.fmt_short(), peer = ?self.peer, "run_bob: recv init message"); let next = replica .sync_process_message(message, *self.peer.as_bytes(), &mut self.progress) .await; @@ -231,7 +231,7 @@ impl BobState { next } (Message::Sync(msg), Some(replica)) => { - trace!(namespace = ?replica.namespace(), peer = ?self.peer, "run_bob: recv process message"); + trace!(namespace = %replica.namespace().fmt_short(), peer = %self.peer.fmt_short(), "run_bob: recv process message"); replica .sync_process_message(msg, *self.peer.as_bytes(), &mut self.progress) .await @@ -249,7 +249,7 @@ impl BobState { let next = next.map_err(|e| self.fail(e))?; match next { Some(msg) => { - trace!(namespace = ?self.namespace(), peer = ?self.peer, "run_bob: send process message"); + trace!(namespace = %self.fmt_namespace(), peer = %self.peer.fmt_short(), "run_bob: send process message"); writer .send(Message::Sync(msg)) .await @@ -259,7 +259,7 @@ impl BobState { } } - trace!(namespace = ?self.namespace().unwrap(), peer = ?self.peer, "run_bob: finished"); + trace!(namespace = %self.fmt_namespace(), peer = %self.peer.fmt_short(), "run_bob: finished"); self.namespace() .ok_or_else(|| self.fail(anyhow!("Stream closed before init message"))) @@ -270,6 +270,13 @@ impl BobState { self.replica.as_ref().map(|r| r.namespace()).to_owned() } + pub fn fmt_namespace(&self) -> String { + match self.namespace() { + Some(ns) => ns.fmt_short(), + None => "none".to_string(), + } + } + /// Consume self and get the [`SyncOutcome`] for this connection. pub fn into_outcome(self) -> SyncOutcome { self.progress diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index efea5fbeec..8ec119a442 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -467,7 +467,7 @@ impl Actor { let (topic, event) = event.context("gossip_events closed")??; if let Err(err) = self.on_gossip_event(topic, event).await { let namespace: NamespaceId = topic.as_bytes().into(); - error!(?namespace, ?err, "Failed to process gossip event"); + error!(namespace = %namespace.fmt_short(), ?err, "Failed to process gossip event"); } }, event = self.replica_events.next(), if !self.replica_events.is_empty() => { @@ -475,14 +475,14 @@ impl Actor { let (origin, entry) = event.context("replica_events closed")?; let namespace = entry.namespace(); if let Err(err) = self.on_replica_event(origin, entry).await { - error!(?namespace, ?err, "Failed to process replica event"); + error!(namespace = %namespace.fmt_short(), ?err, "Failed to process replica event"); } } res = self.running_sync_connect.next(), if !self.running_sync_connect.is_empty() => { trace!(?i, "tick: on_sync_via_connect_finished"); let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?; if let Err(err) = self.on_sync_via_connect_finished(namespace, peer, reason, res).await { - error!(?namespace, ?err, "Failed to process outgoing sync request"); + error!(namespace = %namespace.fmt_short(), ?err, "Failed to process outgoing sync request"); } } @@ -495,11 +495,11 @@ impl Actor { } res = self.pending_joins.next(), if !self.pending_joins.is_empty() => { trace!(?i, "tick: pending_joins"); - let (namespace, res )= res.context("pending_joins closed")?; + let (namespace, res) = res.context("pending_joins closed")?; if let Err(err) = res { - error!(?namespace, %err, "failed to join gossip"); + error!(namespace = %namespace.fmt_short(), %err, "failed to join gossip"); } else { - debug!(?namespace, "joined gossip"); + debug!(namespace = %namespace.fmt_short(), "joined gossip"); } // TODO: maintain some join state } @@ -567,7 +567,7 @@ impl Actor { _ => return, }, }; - debug!(?namespace, peer = %peer.fmt_short(), ?reason, last_state = ?self.get_sync_state(namespace, peer), "sync[dial]: start"); + debug!(namespace = %namespace.fmt_short(), peer = %peer.fmt_short(), ?reason, last_state = ?self.get_sync_state(namespace, peer), "sync[dial]: start"); self.set_sync_state(namespace, peer, SyncState::Dialing); let endpoint = self.endpoint.clone(); @@ -742,9 +742,13 @@ impl Actor { let peer_ids = peer_ids.clone(); let gossip = self.gossip.clone(); async move { - match gossip.join(namespace.into(), peer_ids).await { + match gossip.join(namespace.into(), peer_ids.clone()).await { Err(err) => (namespace, Err(err)), - Ok(fut) => (namespace, fut.await), + Ok(fut) => { + let res = (namespace, fut.await); + debug!(?res, ?peer_ids, namespace = %namespace.fmt_short(), "gossip join"); + res + } } } .boxed() @@ -768,7 +772,7 @@ impl Actor { Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => { debug!( peer = %peer.fmt_short(), - ?namespace, + namespace = %namespace.fmt_short(), ?reason, "sync[dial]: remote abort, already syncing" ); @@ -801,7 +805,7 @@ impl Actor { reason, }) if reason == AbortReason::AlreadySyncing => { // In case we aborted the sync: do nothing (our outgoing sync is in progress) - debug!(peer = %peer.fmt_short(), ?namespace, ?reason, "sync[accept]: aborted by us"); + debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), ?reason, "sync[accept]: aborted by us"); Ok(()) } Err(err) => { @@ -833,7 +837,9 @@ impl Actor { // debug log the result, warn in case of errors match &result { Ok(res) => log_finished(&origin, res), - Err(err) => warn!(?peer, ?namespace, ?err, ?origin, "sync failed"), + Err(err) => { + warn!(?peer, namespace = %namespace.fmt_short(), ?err, ?origin, "sync failed") + } } let state = match result { Ok(_) => { @@ -860,8 +866,9 @@ impl Actor { }; let op = Op::SyncReport(report); debug!( - ?namespace, - "broadcast to neighbors: sync report from {peer:?})" + namespace = %namespace.fmt_short(), + from_peer = %peer.fmt_short(), + "broadcast sync report to neighbors" ); let msg = postcard::to_stdvec(&op)?; // TODO: We should debounce and merge these neighbor announcements likely. @@ -871,7 +878,8 @@ impl Actor { .await { error!( - ?namespace, + namespace = %namespace.fmt_short(), + from_peer = %peer.fmt_short(), ?err, "Failed to broadcast SyncReport to neighbors" ); @@ -907,7 +915,7 @@ impl Actor { let op: Op = postcard::from_bytes(&msg.content)?; match op { Op::Put(entry) => { - debug!(peer = ?msg.delivered_from, ?namespace, "received entry via gossip"); + debug!(peer = ?msg.delivered_from, namespace = %namespace.fmt_short(), "received entry via gossip"); // Insert the entry into our replica. // If the message was broadcast with neighbor scope, or is received // directly from the author, we assume that the content is available at @@ -939,14 +947,14 @@ impl Actor { .replica_store .has_news_for_us(report.namespace, &report.heads)? { - debug!(?namespace, peer = %peer.fmt_short(), "recv sync report: have news, sync now"); + debug!(namespace = %namespace.fmt_short(), peer = %peer.fmt_short(), "recv sync report: have news, sync now"); self.sync_with_peer( report.namespace, peer, SyncReason::ResyncAfterReport, ); } else { - debug!(?namespace, peer = %peer.fmt_short(), "recv sync report: no news"); + debug!(namespace = %namespace.fmt_short(), peer = %peer.fmt_short(), "recv sync report: no news"); } } } @@ -955,14 +963,14 @@ impl Actor { // [Self::sync_with_peer] will check to not resync with peers synced previously in the // same session. TODO: Maybe this is too broad and leads to too many sync requests. Event::NeighborUp(peer) => { - debug!(peer = %peer.fmt_short(), ?namespace, "neighbor up"); + debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up"); self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor); if let Some(subs) = self.event_subscriptions.get_mut(&namespace) { notify_all(subs, LiveEvent::NeighborUp(peer)).await; } } Event::NeighborDown(peer) => { - debug!(peer = %peer.fmt_short(), ?namespace, "neighbor down"); + debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down"); if let Some(subs) = self.event_subscriptions.get_mut(&namespace) { notify_all(subs, LiveEvent::NeighborDown(peer)).await; } @@ -986,7 +994,7 @@ impl Actor { // A new entry was inserted locally. Broadcast a gossip message. let op = Op::Put(signed_entry); let message = postcard::to_stdvec(&op)?.into(); - debug!(?namespace, "broadcast new entry"); + debug!(namespace = %namespace.fmt_short(), "broadcast new entry"); self.gossip.broadcast(topic, message).await?; // Notify subscribers about the event