Skip to content

Commit

Permalink
make the debug logs enjoyable 😆
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 11, 2023
1 parent dd63dfd commit b6de02b
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 113 deletions.
13 changes: 9 additions & 4 deletions iroh-sync/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,36 @@ use crate::{
Namespace, NamespaceId, PeerIdBytes, Replica, SignedEntry, SyncOutcome,
};

#[derive(derive_more::Debug, strum::Display)]
#[derive(derive_more::Debug, derive_more::Display)]
pub enum Action {
#[display("NewAuthor")]
NewAuthor {
author: Author,
#[debug("reply")]
reply: oneshot::Sender<Result<AuthorId>>,
},
#[display("NewReplica")]
NewReplica {
namespace: Namespace,
#[debug("reply")]
reply: oneshot::Sender<Result<NamespaceId>>,
},
#[display("ListAuthors")]
ListAuthors {
#[debug("reply")]
reply: flume::Sender<Result<Author>>,
},
#[display("ListReplicas")]
ListReplicas {
#[debug("reply")]
reply: flume::Sender<Result<NamespaceId>>,
},
#[display("Replica({}, {})", namespace.fmt_short(), action)]
Replica {
namespace: NamespaceId,
action: ReplicaAction,
},
#[display("Shutdown")]
Shutdown,
}

Expand Down Expand Up @@ -133,7 +139,7 @@ impl SyncHandle {
content_status_callback,
};
std::thread::spawn(move || {
let span = error_span!("sync_actor", %me);
let span = error_span!("sync", %me);
let _enter = span.enter();

if let Err(err) = actor.run() {
Expand Down Expand Up @@ -263,7 +269,7 @@ impl<S: store::Store> Actor<S> {
let Ok(action) = self.action_rx.recv() else {
break;
};
trace!("tick: {action}");
trace!(%action, "tick");
let is_shutdown = matches!(action, Action::Shutdown);
if let Err(err) = self.on_action(action) {
warn!("failed to send reply: {err}");
Expand Down Expand Up @@ -299,7 +305,6 @@ impl<S: store::Store> Actor<S> {
}

fn on_replica_action(&mut self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
trace!(namespace = %namespace.fmt_short(), "tick replica: {action}");
match action {
ReplicaAction::UpdateState { state, reply } => {
self.update_state(namespace, state)?;
Expand Down
71 changes: 39 additions & 32 deletions iroh-sync/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint, PeerAddr};
use serde::{Deserialize, Serialize};
use tracing::debug;
use tracing::{debug, error_span, trace, Instrument};

use crate::{
actor::SyncHandle,
Expand All @@ -34,7 +34,7 @@ pub async fn connect_and_sync(
) -> Result<SyncFinished, ConnectError> {
let t_start = Instant::now();
let peer_id = peer.peer_id;
debug!(peer = %peer_id.fmt_short(), "sync[dial]: connect");
trace!("connect");
let connection = endpoint
.connect(peer, SYNC_ALPN)
.await
Expand All @@ -44,7 +44,7 @@ pub async fn connect_and_sync(
connection.open_bi().await.map_err(ConnectError::connect)?;

let t_connect = t_start.elapsed();
debug!(peer = %peer_id.fmt_short(), namespace = %namespace.fmt_short(), ?t_connect, "sync[dial]: connected");
debug!(?t_connect, "connected");

let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;

Expand All @@ -65,24 +65,15 @@ pub async fn connect_and_sync(
match &res {
Ok(res) => {
debug!(
peer = %peer_id.fmt_short(),
namespace = %namespace.fmt_short(),
?t_connect,
?t_process,
sent = %res.num_sent,
recv = %res.num_recv,
"sync[dial]: done, sucess"
"done, ok"
);
}
Err(err) => {
debug!(
peer = %peer_id.fmt_short(),
namespace = %namespace.fmt_short(),
?t_connect,
?t_process,
?err,
"sync[dial]: done, failed"
);
debug!(?t_connect, ?t_process, ?err, "done, failed");
}
}

Expand All @@ -103,8 +94,14 @@ pub async fn connect_and_sync(
Ok(res)
}

/// What to do with incoming sync requests
pub type AcceptOutcome = Result<(), AbortReason>;
/// Whether we want to accept or reject an incoming sync request.
#[derive(Debug, Clone)]
pub enum AcceptOutcome {
/// Accept the sync request.
Allow,
/// Decline the sync request
Reject(AbortReason),
}

/// Handle an iroh-sync connection and sync all shared documents in the replica store.
pub async fn handle_connection<F, Fut>(
Expand All @@ -114,7 +111,7 @@ pub async fn handle_connection<F, Fut>(
) -> Result<SyncFinished, AcceptError>
where
F: Fn(NamespaceId, PublicKey) -> Fut,
Fut: Future<Output = anyhow::Result<AcceptOutcome>>,
Fut: Future<Output = AcceptOutcome>,
{
let t_start = Instant::now();
let connection = connecting.await.map_err(AcceptError::connect)?;
Expand All @@ -127,11 +124,15 @@ where
.map_err(|e| AcceptError::open(peer, e))?;

let t_connect = t_start.elapsed();
debug!(?peer, ?t_connect, "sync[accept]: handle");
let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
span.in_scope(|| {
debug!(?t_connect, "connection established");
});

let mut state = BobState::new(peer);
let res = state
.run(&mut send_stream, &mut recv_stream, sync, accept_cb)
.instrument(span.clone())
.await;

#[cfg(feature = "metrics")]
Expand All @@ -142,7 +143,7 @@ where
}

let namespace = state.namespace();
let progress = state.into_outcome();
let outcome = state.into_outcome();

send_stream
.finish()
Expand All @@ -154,26 +155,30 @@ where
.map_err(|error| AcceptError::close(peer, namespace, error))?;

let t_process = t_start.elapsed() - t_connect;
let namespace = res?;
span.in_scope(|| match &res {
Ok(_res) => {
debug!(
?t_connect,
?t_process,
sent = %outcome.num_sent,
recv = %outcome.num_recv,
"done, ok"
);
}
Err(err) => {
debug!(?t_connect, ?t_process, ?err, "done, failed");
}
});

debug!(
peer = %peer.fmt_short(),
namespace = %namespace.fmt_short(),
?t_process,
?t_connect,
sent = %progress.num_sent,
recv = %progress.num_recv,
"sync[accept]: done, success"
);
let namespace = res?;

let timings = Timings {
connect: t_connect,
process: t_process,
};

let res = SyncFinished {
namespace,
outcome: progress,
outcome,
peer,
timings,
};
Expand Down Expand Up @@ -276,9 +281,11 @@ pub enum ConnectError {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum AbortReason {
/// Namespace is not avaiable.
NotAvailable,
NotFound,
/// We are already syncing this namespace.
AlreadySyncing,
/// We experienced an error while trying to provide the requested resource
InternalServerError,
}

impl AcceptError {
Expand Down
69 changes: 34 additions & 35 deletions iroh-sync/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
use tracing::trace;
use tracing::{debug, trace, Span};

use crate::{
actor::SyncHandle,
Expand Down Expand Up @@ -109,7 +109,7 @@ pub(super) async fn run_alice<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
.await
.map_err(ConnectError::sync)?;
let init_message = Message::Init { namespace, message };
trace!(peer = %peer.fmt_short(), "run_alice: send init message");
trace!("send init message");
writer
.send(init_message)
.await
Expand All @@ -123,16 +123,15 @@ pub(super) async fn run_alice<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
return Err(ConnectError::sync(anyhow!("unexpected init message")));
}
Message::Sync(msg) => {
trace!(peer = %peer.fmt_short(), "run_alice: recv process message");
trace!("recv process message");
let current_progress = progress.take().unwrap();
let (reply, next_progress) = handle
.sync_process_message(namespace, msg, peer_bytes, current_progress)
.await
.map_err(ConnectError::sync)?;
trace!("SEND REPLY {reply:?}{next_progress:?}");
progress = Some(next_progress);
if let Some(msg) = reply {
trace!(peer = %peer.fmt_short(), "run_alice: send process message");
trace!("send process message");
writer
.send(Message::Sync(msg))
.await
Expand All @@ -147,7 +146,7 @@ pub(super) async fn run_alice<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
}
}

trace!(peer = %peer.fmt_short(), "run_alice: finished");
trace!("done");
Ok(progress.unwrap())
}

Expand All @@ -158,15 +157,15 @@ pub(super) async fn run_bob<R, W, F, Fut>(
reader: &mut R,
handle: SyncHandle,
accept_cb: F,
other_peer_id: PublicKey,
peer: PublicKey,
) -> Result<(NamespaceId, SyncOutcome), AcceptError>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
F: Fn(NamespaceId, PublicKey) -> Fut,
Fut: Future<Output = anyhow::Result<AcceptOutcome>>,
Fut: Future<Output = AcceptOutcome>,
{
let mut state = BobState::new(other_peer_id);
let mut state = BobState::new(peer);
let namespace = state.run(writer, reader, handle, accept_cb).await?;
Ok((namespace, state.into_outcome()))
}
Expand Down Expand Up @@ -204,28 +203,35 @@ impl BobState {
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
F: Fn(NamespaceId, PublicKey) -> Fut,
Fut: Future<Output = anyhow::Result<AcceptOutcome>>,
Fut: Future<Output = AcceptOutcome>,
{
let mut reader = FramedRead::new(reader, SyncCodec);
let mut writer = FramedWrite::new(writer, SyncCodec);
while let Some(msg) = reader.next().await {
let msg = msg.map_err(|e| self.fail(e))?;
let next = match (msg, self.namespace.as_ref()) {
(Message::Init { namespace, message }, None) => {
Span::current()
.record("namespace", tracing::field::display(&namespace.fmt_short()));
trace!("recv init message");
let accept = accept_cb(namespace, self.peer).await;
let accept = accept.map_err(|e| self.fail(e))?;
if let Err(reason) = accept {
writer
.send(Message::Abort { reason })
.await
.map_err(|e| self.fail(e))?;
return Err(AcceptError::Abort {
namespace,
peer: self.peer,
reason,
});
};
trace!(namespace = %namespace.fmt_short(), peer = ?self.peer, "run_bob: recv init message");
match accept {
AcceptOutcome::Allow => {
trace!("allow request");
}
AcceptOutcome::Reject(reason) => {
debug!(?reason, "reject request");
writer
.send(Message::Abort { reason })
.await
.map_err(|e| self.fail(e))?;
return Err(AcceptError::Abort {
namespace,
peer: self.peer,
reason,
});
}
}
let last_progress = self.progress.take().unwrap();
let next = sync
.sync_process_message(
Expand All @@ -239,7 +245,7 @@ impl BobState {
next
}
(Message::Sync(msg), Some(namespace)) => {
trace!(namespace = %namespace.fmt_short(), peer = %self.peer.fmt_short(), "run_bob: recv process message");
trace!("recv process message");
let last_progress = self.progress.take().unwrap();
sync.sync_process_message(*namespace, msg, *self.peer.as_bytes(), last_progress)
.await
Expand All @@ -258,7 +264,7 @@ impl BobState {
self.progress = Some(progress);
match reply {
Some(msg) => {
trace!(namespace = %self.fmt_namespace(), peer = %self.peer.fmt_short(), "run_bob: send process message");
trace!("send process message");
writer
.send(Message::Sync(msg))
.await
Expand All @@ -268,7 +274,7 @@ impl BobState {
}
}

trace!(namespace = %self.fmt_namespace(), peer = %self.peer.fmt_short(), "run_bob: finished");
trace!("done");

self.namespace()
.ok_or_else(|| self.fail(anyhow!("Stream closed before init message")))
Expand All @@ -279,13 +285,6 @@ impl BobState {
self.namespace.clone()
}

pub fn fmt_namespace(&self) -> String {
match self.namespace() {
Some(ns) => ns.fmt_short(),
None => "none".to_string(),
}
}

/// Consume self and get the [`SyncOutcome`] for this connection.
pub fn into_outcome(self) -> SyncOutcome {
self.progress.unwrap()
Expand Down Expand Up @@ -381,7 +380,7 @@ mod tests {
&mut bob_writer,
&mut bob_reader,
bob_handle2,
|_namespace, _peer| futures::future::ready(Ok(Ok(()))),
|_namespace, _peer| futures::future::ready(AcceptOutcome::Allow),
alice_peer_id,
)
.await
Expand Down Expand Up @@ -595,7 +594,7 @@ mod tests {
&mut bob_writer,
&mut bob_reader,
bob_handle,
|_namespace, _| futures::future::ready(Ok(Ok(()))),
|_namespace, _peer| futures::future::ready(AcceptOutcome::Allow),
alice_node_pubkey,
)
.await
Expand Down
Loading

0 comments on commit b6de02b

Please sign in to comment.