Skip to content

Commit

Permalink
refactor(iroh-sync): Add actor to iroh-sync, remove deadlocks (#1612)
Browse files Browse the repository at this point in the history
## Description

* Add an actor to `iroh_sync` that processes async requests for
operations on Replicas
* Major refactoring of how we access the iroh sync API from iroh: Before
this our `live::Actor` would easily deadlock, because inserting entries
from gossip, and processing insert events coming back from the replica,
happened in the same actor loop. Also, because iroh-sync is fully
synchronous, we have to take care to not lock or block the async
executor. So, to solve both of these issues, all operations on replicas
and the iroh sync store are moved into a separate actor which runs in a
loop on a `std::thread` and just blocks while waiting for actions from a
channel. This, in turn, moves some state keeping from iroh to iroh-sync,
which is a good thing I think.
* Remove Clone and all locks from `Replica`. As they are now usually
owned by the `iroh_sync::actor` actor, no need for locks. We can get
unique mut refs instead.

* Change the subscription model on `Replica`s to be a
`Vec<flume::Sender<Event>>`. This now allows to directly subscribe to
replica events from multiple places. This allows us to have subscription
to replica events from the client not go through any actor. The replica
events are merged together with the events from the `sync_engine::live`
actor, and that's it. Should improve latency and ease work on the
actors. Because we merge the channels for replica events and live actor
events for client subscriptions, this means, that closing a replica from
the client does not end the subscription stream, because the
subscription on the live actor will remain active. To end the
subscription, the client would need to drop the receiver. I think this
is fine for now. What we might want to do instead is to split the
subscription from the client to two methods and channels,
`subscribe_insert_events` and `subscribe_sync_events` or so, then we
could close them separately. However the one for sync events would still
stay open indefinitely, because you wouldn't want to drop and recreate,
I think, in case you do `join` / `leave` / `join`. Or do we?

* Track the open state of Replicas in the `iroh_sync::actor`. I opted
for a simple implementation to start: The actor counts calls to `open`
and decrements on calls to `close` and closes the replica once the count
reaches zero. This works fine, however for replicas opened from the RPC
client, because there's no async drop, I spawn a tokio task in drop to
send the close call to the node. This means that if a client is
force-killed, the replica would remain open indefinitely. It would be
better to solve this cleaner - the only idea I had so far was to give
out something like `ReplicaDescriptor`s on `open` and then send require
regular keep-alive calls from the RPC client. I think it's fine to defer
this change to a followup (which will be straightforward with the
architecture in place now) because the impl in this PR is already quite
an improvement over the state in `main` or #1612, where we don't ever
close replicas.
* event subscriptions for replica insert events are now handled directly
in the `iroh_sync::actor`, which is much nicer IMO. For the RPC
subscription this event stream is merged with a subscription for sync
events from the `iroh::sync_engine::live` actor.


## Notes & open questions

Replicas may remain open indefinitely if the RPC client dies without
calling `Doc::close`. This should be fixed in a followup and will need
work in quic-rpc to get a notification once connections close.


## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.

---------

Co-authored-by: Friedel Ziegelmayer <me@dignifiedquire.com>
  • Loading branch information
Frando and dignifiedquire authored Oct 16, 2023
1 parent 615381c commit a70c6f1
Show file tree
Hide file tree
Showing 29 changed files with 3,316 additions and 2,523 deletions.
265 changes: 260 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
data-encoding = "2.3.3"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "into"] }
flume = "0.10.14"
flume = "0.11"
futures = "0.3.25"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hex = "0.4.3"
Expand Down
70 changes: 42 additions & 28 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0";
/// Maximum message size is limited to 1024 bytes.
pub const MAX_MESSAGE_SIZE: usize = 1024;

/// Channel capacity for topic subscription broadcast channels (one per topic)
const SUBSCRIBE_ALL_CAP: usize = 64;
/// Channel capacity for all subscription broadcast channels (single)
const SUBSCRIBE_TOPIC_CAP: usize = 64;
const SUBSCRIBE_ALL_CAP: usize = 2048;
/// Channel capacity for topic subscription broadcast channels (one per topic)
const SUBSCRIBE_TOPIC_CAP: usize = 2048;
/// Channel capacity for the send queue (one per connection)
const SEND_QUEUE_CAP: usize = 64;
/// Channel capacity for the ToActor message queue (single)
Expand Down Expand Up @@ -196,24 +196,27 @@ impl Gossip {
///
/// Note that this method takes self by value. Usually you would clone the [`Gossip`] handle.
/// before.
pub fn subscribe_all(self) -> impl Stream<Item = anyhow::Result<(TopicId, Event)>> {
pub fn subscribe_all(
self,
) -> impl Stream<Item = Result<(TopicId, Event), broadcast::error::RecvError>> {
Gen::new(|co| async move {
if let Err(cause) = self.subscribe_all0(&co).await {
co.yield_(Err(cause)).await
if let Err(err) = self.subscribe_all0(&co).await {
warn!("subscribe_all produced error: {err:?}");
co.yield_(Err(broadcast::error::RecvError::Closed)).await
}
})
}

async fn subscribe_all0(
&self,
co: &Co<anyhow::Result<(TopicId, Event)>>,
co: &Co<Result<(TopicId, Event), broadcast::error::RecvError>>,
) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
self.send(ToActor::SubscribeAll(tx)).await?;
let mut res = rx.await.map_err(|_| anyhow!("subscribe_tx dropped"))??;
let mut res = rx.await??;
loop {
let event = res.recv().await?;
co.yield_(Ok(event)).await;
let event = res.recv().await;
co.yield_(event).await;
}
}

Expand Down Expand Up @@ -256,9 +259,9 @@ impl Gossip {
///
/// TODO: Optionally resolve to an error once all connection attempts failed.
#[derive(Debug)]
pub struct JoinTopicFut(oneshot::Receiver<anyhow::Result<()>>);
pub struct JoinTopicFut(oneshot::Receiver<anyhow::Result<TopicId>>);
impl Future for JoinTopicFut {
type Output = anyhow::Result<()>;
type Output = anyhow::Result<TopicId>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -290,7 +293,7 @@ enum ToActor {
Join(
TopicId,
Vec<PublicKey>,
#[debug(skip)] oneshot::Sender<anyhow::Result<()>>,
#[debug(skip)] oneshot::Sender<anyhow::Result<TopicId>>,
),
/// Leave a topic, send disconnect messages and drop all state.
Quit(TopicId),
Expand Down Expand Up @@ -345,10 +348,14 @@ struct Actor {

impl Actor {
pub async fn run(mut self) -> anyhow::Result<()> {
let mut i = 0;
loop {
i += 1;
trace!(?i, "tick");
tokio::select! {
biased;
msg = self.to_actor_rx.recv() => {
trace!(?i, "tick: to_actor_rx");
match msg {
Some(msg) => self.handle_to_actor_msg(msg, Instant::now()).await?,
None => {
Expand All @@ -371,6 +378,7 @@ impl Actor {
}
}
(peer_id, res) = self.dialer.next_conn() => {
trace!(?i, "tick: dialer");
match res {
Ok(conn) => {
debug!(peer = ?peer_id, "dial successfull");
Expand All @@ -382,6 +390,7 @@ impl Actor {
}
}
event = self.in_event_rx.recv() => {
trace!(?i, "tick: in_event_rx");
match event {
Some(event) => {
self.handle_in_event(event, Instant::now()).await.context("in_event_rx.recv -> handle_in_event")?;
Expand All @@ -390,6 +399,7 @@ impl Actor {
}
}
drain = self.timers.wait_and_drain() => {
trace!(?i, "tick: timers");
let now = Instant::now();
for (_instant, timer) in drain {
self.handle_in_event(InEvent::TimerExpired(timer), now).await.context("timers.drain_expired -> handle_in_event")?;
Expand All @@ -412,21 +422,24 @@ impl Actor {

// Spawn a task for this connection
let in_event_tx = self.in_event_tx.clone();
tokio::spawn(async move {
debug!(peer = ?peer_id, "connection established");
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
Ok(()) => {
debug!(peer = ?peer_id, "connection closed without error")
}
Err(err) => {
debug!(peer = ?peer_id, "connection closed with error {err:?}")
tokio::spawn(
async move {
debug!("connection established");
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
Ok(()) => {
debug!("connection closed without error")
}
Err(err) => {
debug!("connection closed with error {err:?}")
}
}
in_event_tx
.send(InEvent::PeerDisconnected(peer_id))
.await
.ok();
}
in_event_tx
.send(InEvent::PeerDisconnected(peer_id))
.await
.ok();
});
.instrument(error_span!("gossip_conn", peer = %peer_id.fmt_short())),
);

// Forward queued pending sends
if let Some(send_queue) = self.pending_sends.remove(&peer_id) {
Expand All @@ -440,12 +453,13 @@ impl Actor {
.await?;
if self.state.has_active_peers(&topic_id) {
// If the active_view contains at least one peer, reply now
reply.send(Ok(())).ok();
reply.send(Ok(topic_id)).ok();
} else {
// Otherwise, wait for any peer to come up as neighbor.
let sub = self.subscribe(topic_id);
tokio::spawn(async move {
let res = wait_for_neighbor_up(sub).await;
let res = res.map(|_| topic_id);
reply.send(res).ok();
});
}
Expand Down Expand Up @@ -575,7 +589,7 @@ async fn wait_for_neighbor_up(mut sub: broadcast::Receiver<Event>) -> anyhow::Re
Ok(Event::NeighborUp(_neighbor)) => break Ok(()),
Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => {
break Err(anyhow!("Failed to join swarm: Gossip actor dropped"))
break Err(anyhow!("Failed to join swarm: channel closed"))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ data-encoding = "2.3.3"
der = { version = "0.7", features = ["alloc", "derive"] }
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "deref"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.10.14"
flume = "0.11"
futures = "0.3.25"
governor = "0.6.0"
hex = "0.4.3"
Expand Down
3 changes: 2 additions & 1 deletion iroh-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ crossbeam = "0.8.2"
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.10"
flume = "0.11"
iroh-bytes = { version = "0.7.0", path = "../iroh-bytes" }
iroh-metrics = { version = "0.7.0", path = "../iroh-metrics", optional = true }
once_cell = "1.18.0"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8.5"
rand_core = "0.6.4"
serde = { version = "1.0.164", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
url = "2.4"
bytes = "1"
parking_lot = "0.12.1"
Expand Down
Loading

0 comments on commit a70c6f1

Please sign in to comment.