Skip to content

Commit

Permalink
refactor(iroh-sync): remove generic from SyncEngine (#1648)
Browse files Browse the repository at this point in the history
## Description

With #1612 we moved all operations on replicas into a dedicated actor,
with all communication happening over a channel. A nice consequence of
this is that the iroh-sync store only lives in the actor thread, and
therefore we don't need a generic on the `SyncEngine` and the `Node`
anymore. This PR removes the generic.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
  • Loading branch information
Frando authored Oct 17, 2023
1 parent 5813e09 commit 53b0bb0
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 48 deletions.
12 changes: 4 additions & 8 deletions iroh/src/commands/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ use iroh::{
rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService},
util::{fs::load_secret_key, path::IrohPaths},
};
use iroh_bytes::{
protocol::RequestToken, store::flat::Store as BaoFsStore, store::Store as BaoStore,
util::runtime,
};
use iroh_bytes::{protocol::RequestToken, util::runtime};
use iroh_net::{
derp::{DerpMap, DerpMode},
key::SecretKey,
};
use iroh_sync::store::{fs::Store as DocFsStore, Store as DocStore};
use quic_rpc::{transport::quinn::QuinnServerEndpoint, ServiceEndpoint};
use tracing::{info_span, Instrument};

Expand Down Expand Up @@ -90,7 +86,7 @@ pub async fn run(rt: &runtime::Handle, opts: StartOptions, add_opts: BlobAddOpti
async fn start_daemon_node(
rt: &runtime::Handle,
opts: StartOptions,
) -> Result<Node<BaoFsStore, DocFsStore>> {
) -> Result<Node<iroh_bytes::store::flat::Store>> {
let blob_dir = path_with_env(IrohPaths::BaoFlatStoreComplete)?;
let partial_blob_dir = path_with_env(IrohPaths::BaoFlatStorePartial)?;
let meta_dir = path_with_env(IrohPaths::BaoFlatStoreMeta)?;
Expand All @@ -106,14 +102,14 @@ async fn start_daemon_node(
spawn_daemon_node(rt, bao_store, doc_store, key, peer_data_path, opts).await
}

async fn spawn_daemon_node<B: BaoStore, D: DocStore>(
async fn spawn_daemon_node<B: iroh_bytes::store::Store, D: iroh_sync::store::Store>(
rt: &runtime::Handle,
bao_store: B,
doc_store: D,
key: Option<PathBuf>,
peers_data_path: PathBuf,
opts: StartOptions,
) -> Result<Node<B, D>> {
) -> Result<Node<B>> {
let secret_key = get_secret_key(key).await?;

let mut builder = Node::builder(bao_store, doc_store)
Expand Down
36 changes: 18 additions & 18 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ where
/// This will create the underlying network server and spawn a tokio task accepting
/// connections. The returned [`Node`] can be used to control the task as well as
/// get information about it.
pub async fn spawn(self) -> Result<Node<D, S>> {
pub async fn spawn(self) -> Result<Node<D>> {
trace!("spawning node");
let rt = self.rt.context("runtime not set")?;
// Initialize the metrics collection.
Expand Down Expand Up @@ -424,7 +424,7 @@ where
server: MagicEndpoint,
callbacks: Callbacks,
mut cb_receiver: mpsc::Receiver<EventCallback>,
handler: RpcHandler<D, S>,
handler: RpcHandler<D>,
rpc: E,
internal_rpc: impl ServiceEndpoint<ProviderService>,
auth_handler: Arc<dyn RequestAuthorizationHandler>,
Expand Down Expand Up @@ -592,12 +592,12 @@ where

// TODO: Restructure this code to not take all these arguments.
#[allow(clippy::too_many_arguments)]
async fn handle_connection<D: BaoStore, S: DocStore>(
async fn handle_connection<D: BaoStore>(
connecting: quinn::Connecting,
alpn: String,
node: Arc<NodeInner<D, S>>,
node: Arc<NodeInner<D>>,
gossip: Gossip,
sync: SyncEngine<S>,
sync: SyncEngine,
auth_handler: Arc<dyn RequestAuthorizationHandler>,
) -> Result<()> {
match alpn.as_bytes() {
Expand Down Expand Up @@ -660,13 +660,13 @@ impl iroh_bytes::provider::EventSender for Callbacks {
/// await the [`Node`] struct directly, it will complete when the task completes. If
/// this is dropped the node task is not stopped but keeps running.
#[derive(Debug, Clone)]
pub struct Node<D: Map, S: DocStore> {
inner: Arc<NodeInner<D, S>>,
pub struct Node<D: Map> {
inner: Arc<NodeInner<D>>,
task: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
}

#[derive(derive_more::Debug)]
struct NodeInner<D, S: DocStore> {
struct NodeInner<D> {
db: D,
endpoint: MagicEndpoint,
secret_key: SecretKey,
Expand All @@ -678,7 +678,7 @@ struct NodeInner<D, S: DocStore> {
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
rt: runtime::Handle,
pub(crate) sync: SyncEngine<S>,
pub(crate) sync: SyncEngine,
}

/// Events emitted by the [`Node`] informing about the current status.
Expand All @@ -690,11 +690,11 @@ pub enum Event {
Db(iroh_bytes::store::Event),
}

impl<D: ReadableStore, S: DocStore> Node<D, S> {
impl<D: ReadableStore> Node<D> {
/// Returns a new builder for the [`Node`].
///
/// Once the done with the builder call [`Builder::spawn`] to create the node.
pub fn builder(bao_store: D, doc_store: S) -> Builder<D, S> {
pub fn builder<S: DocStore>(bao_store: D, doc_store: S) -> Builder<D, S> {
Builder::with_db_and_store(bao_store, doc_store)
}

Expand Down Expand Up @@ -780,7 +780,7 @@ impl<D: ReadableStore, S: DocStore> Node<D, S> {
}
}

impl<D: Map, S: DocStore> NodeInner<D, S> {
impl<D: Map> NodeInner<D> {
async fn local_endpoints(&self) -> Result<Vec<Endpoint>> {
self.endpoint.local_endpoints().await
}
Expand All @@ -801,7 +801,7 @@ impl<D: Map, S: DocStore> NodeInner<D, S> {
}

/// The future completes when the spawned tokio task finishes.
impl<D: Map, S: DocStore> Future for Node<D, S> {
impl<D: Map> Future for Node<D> {
type Output = Result<(), Arc<JoinError>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Expand All @@ -810,11 +810,11 @@ impl<D: Map, S: DocStore> Future for Node<D, S> {
}

#[derive(Debug, Clone)]
struct RpcHandler<D, S: DocStore> {
inner: Arc<NodeInner<D, S>>,
struct RpcHandler<D> {
inner: Arc<NodeInner<D>>,
}

impl<D: BaoStore, S: DocStore> RpcHandler<D, S> {
impl<D: BaoStore> RpcHandler<D> {
fn rt(&self) -> runtime::Handle {
self.inner.rt.clone()
}
Expand Down Expand Up @@ -1407,10 +1407,10 @@ impl<D: BaoStore, S: DocStore> RpcHandler<D, S> {
}
}

fn handle_rpc_request<D: BaoStore, S: DocStore, E: ServiceEndpoint<ProviderService>>(
fn handle_rpc_request<D: BaoStore, E: ServiceEndpoint<ProviderService>>(
msg: ProviderRequest,
chan: RpcChannel<ProviderService, E>,
handler: &RpcHandler<D, S>,
handler: &RpcHandler<D>,
rt: &runtime::Handle,
) {
let handler = handler.clone();
Expand Down
17 changes: 4 additions & 13 deletions iroh/src/sync_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use iroh_bytes::{store::EntryStatus, util::runtime::Handle, Hash};
use iroh_gossip::net::Gossip;
use iroh_net::{key::PublicKey, MagicEndpoint, PeerAddr};
use iroh_sync::{
actor::SyncHandle, store::Store, sync::NamespaceId, ContentStatus, ContentStatusCallback,
Entry, InsertOrigin,
actor::SyncHandle, sync::NamespaceId, ContentStatus, ContentStatusCallback, Entry, InsertOrigin,
};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -44,29 +43,22 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256;
/// The RPC methods dealing with documents and sync operate on the `SyncEngine`, with method
/// implementations in [rpc].
#[derive(derive_more::Debug, Clone)]
pub struct SyncEngine<S: Store> {
pub struct SyncEngine {
pub(crate) rt: Handle,
pub(crate) endpoint: MagicEndpoint,
pub(crate) sync: SyncHandle,
to_live_actor: mpsc::Sender<ToLiveActor>,
tasks_fut: Shared<BoxFuture<'static, ()>>,
#[debug("ContentStatusCallback")]
content_status_cb: ContentStatusCallback,

// TODO:
// After the latest refactoring we don't need the store here anymore because all interactions
// go over the [`SyncHandle`]. Removing the store removes the `S: Store` generic from the
// `SyncEngine`, in turn removing the `S: Store` generic from [`iroh::node::Node`]. Yay!
// As this changes the code in many lines, I'd defer it to a follwup.
_store: S,
}

impl<S: Store> SyncEngine<S> {
impl SyncEngine {
/// Start the sync engine.
///
/// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
/// thread for the [`iroh_sync::actor::SyncHandle`].
pub fn spawn<B: iroh_bytes::store::Store>(
pub fn spawn<S: iroh_sync::store::Store, B: iroh_bytes::store::Store>(
rt: Handle,
endpoint: MagicEndpoint,
gossip: Gossip,
Expand Down Expand Up @@ -142,7 +134,6 @@ impl<S: Store> SyncEngine<S> {
to_live_actor: live_actor_tx,
tasks_fut,
content_status_cb,
_store: replica_store,
}
}

Expand Down
4 changes: 2 additions & 2 deletions iroh/src/sync_engine/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use anyhow::anyhow;
use futures::Stream;
use iroh_bytes::{store::Store as BaoStore, util::BlobFormat};
use iroh_sync::{store::Store, sync::Namespace, Author};
use iroh_sync::{sync::Namespace, Author};
use tokio_stream::StreamExt;

use crate::{
Expand All @@ -25,7 +25,7 @@ use crate::{
const ITER_CHANNEL_CAP: usize = 64;

#[allow(missing_docs)]
impl<S: Store> SyncEngine<S> {
impl SyncEngine {
pub async fn author_create(
&self,
_req: AuthorCreateRequest,
Expand Down
8 changes: 4 additions & 4 deletions iroh/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn wrap_in_node<S>(
bao_store: S,
rt: iroh_bytes::util::runtime::Handle,
gc_period: Duration,
) -> Node<S, iroh_sync::store::memory::Store>
) -> Node<S>
where
S: iroh_bytes::store::Store,
{
Expand All @@ -43,8 +43,8 @@ where
.unwrap()
}

async fn attach_db_events<D: iroh_bytes::store::Store, S: iroh_sync::store::Store>(
node: &Node<D, S>,
async fn attach_db_events<D: iroh_bytes::store::Store>(
node: &Node<D>,
) -> flume::Receiver<iroh_bytes::store::Event> {
let (db_send, db_recv) = flume::unbounded();
node.subscribe(move |ev| {
Expand All @@ -62,7 +62,7 @@ async fn attach_db_events<D: iroh_bytes::store::Store, S: iroh_sync::store::Stor
}

async fn gc_test_node() -> (
Node<iroh_bytes::store::mem::Store, iroh_sync::store::memory::Store>,
Node<iroh_bytes::store::mem::Store>,
iroh_bytes::store::mem::Store,
flume::Receiver<iroh_bytes::store::Event>,
) {
Expand Down
5 changes: 2 additions & 3 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ fn spawn_node(
rt: runtime::Handle,
i: usize,
rng: &mut (impl CryptoRng + Rng),
) -> impl Future<Output = anyhow::Result<Node<iroh_bytes::store::mem::Store, store::memory::Store>>>
+ 'static {
) -> impl Future<Output = anyhow::Result<Node<iroh_bytes::store::mem::Store>>> + 'static {
let secret_key = SecretKey::generate_with_rng(rng);
async move {
let node = test_node(rt, "127.0.0.1:0".parse()?, secret_key);
Expand All @@ -71,7 +70,7 @@ async fn spawn_nodes(
rt: runtime::Handle,
n: usize,
mut rng: &mut (impl CryptoRng + Rng),
) -> anyhow::Result<Vec<Node<iroh_bytes::store::mem::Store, store::memory::Store>>> {
) -> anyhow::Result<Vec<Node<iroh_bytes::store::mem::Store>>> {
let mut futs = vec![];
for i in 0..n {
futs.push(spawn_node(rt.clone(), i, &mut rng));
Expand Down

0 comments on commit 53b0bb0

Please sign in to comment.