Skip to content

Commit

Permalink
fix: better cancellation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 3, 2023
1 parent d9d501d commit 9bd5108
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
30 changes: 22 additions & 8 deletions iroh-sync/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken;
use tracing::debug;

use crate::{
net::codec::{run_alice, BobState},
net::codec::{abort_alice, run_alice, BobState},
store,
sync::AsyncReplica as Replica,
NamespaceId, SyncOutcome,
Expand Down Expand Up @@ -37,16 +37,30 @@ pub async fn connect_and_sync<S: store::Store>(
let peer_id = peer.peer_id;
debug!(?peer_id, "sync[dial]: connect");
let namespace = doc.namespace();
let connection = tokio::select! {
biased;
_ = cancel.cancelled() => return Err(ConnectError::Cancelled),
res = endpoint.connect(peer, SYNC_ALPN) => {
res.map_err(ConnectError::connect)?
}
};
let connection = endpoint
.connect(peer, SYNC_ALPN)
.await
.map_err(ConnectError::connect)?;

debug!(?peer_id, ?namespace, "sync[dial]: connected");
let (mut send_stream, mut recv_stream) =
connection.open_bi().await.map_err(ConnectError::connect)?;

if cancel.is_cancelled() {
abort_alice(&mut send_stream)
.await
.map_err(|_| ConnectError::Cancelled)?;
send_stream
.finish()
.await
.map_err(|_| ConnectError::Cancelled)?;
recv_stream
.read_to_end(0)
.await
.map_err(|_| ConnectError::Cancelled)?;
return Err(ConnectError::Cancelled);
}

let res = run_alice::<S, _, _>(&mut send_stream, &mut recv_stream, doc, peer_id).await;

send_stream.finish().await.map_err(ConnectError::close)?;
Expand Down
9 changes: 9 additions & 0 deletions iroh-sync/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ enum Message {
Abort { reason: AbortReason },
}

pub(super) async fn abort_alice<W: AsyncWrite + Unpin>(writer: &mut W) -> Result<(), ConnectError> {
let mut writer = FramedWrite::new(writer, SyncCodec);
let message = Message::Abort {
reason: AbortReason::AlreadySyncing,
};
writer.send(message).await.map_err(ConnectError::sync)?;
Ok(())
}

/// Runs the initiator side of the sync protocol.
pub(super) async fn run_alice<S: store::Store, R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
writer: &mut W,
Expand Down

0 comments on commit 9bd5108

Please sign in to comment.