diff --git a/Cargo.lock b/Cargo.lock index 2e4e4bff74..318b8ce138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2697,8 +2697,7 @@ dependencies = [ [[package]] name = "iroh-docs" version = "0.27.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "fb2c8bc1fe680549dd9776031be30e68663884026f71a914aab7108e0b8f8ce4" +source = "git+/~https://github.com/n0-computer/iroh-docs?branch=main#16bc7fe4c7dee1b1b88f54390856c3fafa3d7656" dependencies = [ "anyhow", "async-channel", @@ -2715,6 +2714,7 @@ dependencies = [ "iroh-gossip", "iroh-metrics", "iroh-net", + "iroh-router", "lru", "num_enum", "postcard", diff --git a/Cargo.toml b/Cargo.toml index eb7ea71768..1cb098e60b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,3 +54,4 @@ iroh-router = { path = "./iroh-router" } iroh-blobs = { git = "/~https://github.com/n0-computer/iroh-blobs", branch = "main" } iroh-gossip = { git = "/~https://github.com/n0-computer/iroh-gossip", branch = "main" } +iroh-docs = { git = "/~https://github.com/n0-computer/iroh-docs", branch = "main" } diff --git a/deny.toml b/deny.toml index c7fb823266..f62c19d428 100644 --- a/deny.toml +++ b/deny.toml @@ -39,4 +39,5 @@ ignore = [ allow-git = [ "/~https://github.com/n0-computer/iroh-blobs.git", "/~https://github.com/n0-computer/iroh-gossip.git", + "/~https://github.com/n0-computer/iroh-docs.git", ] diff --git a/iroh-router/src/protocol.rs b/iroh-router/src/protocol.rs index 22affa67d9..6ced048992 100644 --- a/iroh-router/src/protocol.rs +++ b/iroh-router/src/protocol.rs @@ -13,7 +13,7 @@ use iroh_net::endpoint::Connecting; /// /// Implement this trait on a struct that should handle incoming connections. /// The protocol handler must then be registered on the node for an ALPN protocol with -/// [`crate::node::builder::ProtocolBuilder::accept`]. +/// [`crate::RouterBuilder::accept`]. pub trait ProtocolHandler: Send + Sync + IntoArcAny + std::fmt::Debug + 'static { /// Handle an incoming connection. /// diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 482a4da2a8..bd2df13944 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -54,7 +54,7 @@ use iroh_blobs::{ store::Store as BaoStore, util::local_pool::{LocalPool, LocalPoolHandle}, }; -use iroh_docs::net::DOCS_ALPN; +use iroh_docs::{engine::Engine, net::DOCS_ALPN}; use iroh_net::{ endpoint::{DirectAddrsStream, RemoteInfo}, AddrInfo, Endpoint, NodeAddr, @@ -65,11 +65,10 @@ use tokio::task::{JoinError, JoinSet}; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use crate::node::{nodes_storage::store_node_addrs, protocol::docs::DocsProtocol}; +use crate::node::nodes_storage::store_node_addrs; mod builder; mod nodes_storage; -mod protocol; mod rpc; mod rpc_status; @@ -294,7 +293,7 @@ impl NodeInner { if let GcPolicy::Interval(gc_period) = gc_policy { let router = router.clone(); let handle = local_pool.spawn(move || async move { - let docs_engine = router.get_protocol::(DOCS_ALPN); + let docs_engine = router.get_protocol::(DOCS_ALPN); let blobs = router .get_protocol::>(iroh_blobs::protocol::ALPN) .expect("missing blobs"); diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 9599591e02..4d0d08e5f1 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -16,7 +16,10 @@ use iroh_blobs::{ store::{Map, Store as BaoStore}, util::local_pool::{self, LocalPool, LocalPoolHandle, PanicMode}, }; -use iroh_docs::{engine::DefaultAuthorStorage, net::DOCS_ALPN}; +use iroh_docs::{ + engine::{DefaultAuthorStorage, Engine}, + net::DOCS_ALPN, +}; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; #[cfg(not(test))] use iroh_net::discovery::local_swarm_discovery::LocalSwarmDiscovery; @@ -37,7 +40,7 @@ use tracing::{debug, error_span, trace, Instrument}; use super::{rpc_status::RpcStatus, IrohServerEndpoint, JoinErrToStr, Node, NodeInner}; use crate::{ client::RPC_ALPN, - node::{nodes_storage::load_node_addrs, protocol::docs::DocsProtocol}, + node::nodes_storage::load_node_addrs, rpc_protocol::RpcService, util::{fs::load_secret_key, path::IrohPaths}, }; @@ -71,6 +74,32 @@ pub enum DocsStorage { Persistent(PathBuf), } +/// Start the engine, and prepare the selected storage version. +async fn spawn_docs( + storage: DocsStorage, + blobs_store: S, + default_author_storage: DefaultAuthorStorage, + endpoint: Endpoint, + gossip: Gossip, + downloader: Downloader, +) -> anyhow::Result> { + let docs_store = match storage { + DocsStorage::Disabled => return Ok(None), + DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), + DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?, + }; + let engine = Engine::spawn( + endpoint, + gossip, + docs_store, + blobs_store, + downloader, + default_author_storage, + ) + .await?; + Ok(Some(engine)) +} + /// Builder for the [`Node`]. /// /// You must supply a blob store and a document store. @@ -651,7 +680,7 @@ where // Spawn the docs engine, if enabled. // This returns None for DocsStorage::Disabled, otherwise Some(DocsProtocol). - let docs = DocsProtocol::spawn( + let docs = spawn_docs( self.docs_storage, self.blobs_store.clone(), self.storage.default_author_storage(), @@ -809,7 +838,7 @@ impl ProtocolBuilder { store: D, gossip: Gossip, downloader: Downloader, - docs: Option, + docs: Option, ) -> Self { // Register blobs. let blobs_proto = BlobsProtocol::new_with_events( diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs deleted file mode 100644 index 7ad25a44ff..0000000000 --- a/iroh/src/node/protocol.rs +++ /dev/null @@ -1 +0,0 @@ -pub(crate) mod docs; diff --git a/iroh/src/node/protocol/docs.rs b/iroh/src/node/protocol/docs.rs deleted file mode 100644 index 07884f3e99..0000000000 --- a/iroh/src/node/protocol/docs.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{ops::Deref, sync::Arc}; - -use anyhow::Result; -use futures_lite::future::Boxed as BoxedFuture; -use iroh_blobs::downloader::Downloader; -use iroh_docs::engine::{DefaultAuthorStorage, Engine}; -use iroh_gossip::net::Gossip; -use iroh_net::{endpoint::Connecting, Endpoint}; -use iroh_router::ProtocolHandler; - -use crate::node::DocsStorage; - -/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. -#[derive(Debug, Clone)] -pub(crate) struct DocsProtocol(Engine); - -impl DocsProtocol { - pub async fn spawn( - storage: DocsStorage, - blobs_store: S, - default_author_storage: DefaultAuthorStorage, - endpoint: Endpoint, - gossip: Gossip, - downloader: Downloader, - ) -> anyhow::Result> { - let docs_store = match storage { - DocsStorage::Disabled => return Ok(None), - DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), - DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?, - }; - let engine = Engine::spawn( - endpoint, - gossip, - docs_store, - blobs_store, - downloader, - default_author_storage, - ) - .await?; - Ok(Some(DocsProtocol(engine))) - } -} - -impl Deref for DocsProtocol { - type Target = Engine; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl ProtocolHandler for DocsProtocol { - fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { - Box::pin(async move { self.handle_connection(conn).await }) - } - - fn shutdown(self: Arc) -> BoxedFuture<()> { - Box::pin(async move { - let this: &Self = &self; - if let Err(err) = this.shutdown().await { - tracing::warn!("shutdown error: {:?}", err); - } - }) - } -} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index a4f32fb070..5599a6da5e 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -27,7 +27,7 @@ use iroh_blobs::{ }, BlobFormat, HashAndFormat, Tag, }; -use iroh_docs::net::DOCS_ALPN; +use iroh_docs::{engine::Engine, net::DOCS_ALPN}; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_io::AsyncSliceReader; use iroh_net::{relay::RelayUrl, NodeAddr, NodeId}; @@ -44,7 +44,7 @@ use crate::{ tags::TagInfo, NodeStatus, }, - node::{protocol::docs::DocsProtocol, NodeInner}, + node::NodeInner, rpc_protocol::{ authors, blobs::{ @@ -96,8 +96,8 @@ impl Handler { } impl Handler { - fn docs(&self) -> Option> { - self.router.get_protocol::(DOCS_ALPN) + fn docs(&self) -> Option> { + self.router.get_protocol::(DOCS_ALPN) } fn blobs(&self) -> Arc> { @@ -113,7 +113,7 @@ impl Handler { async fn with_docs(self, f: F) -> RpcResult where T: Send + 'static, - F: FnOnce(Arc) -> Fut, + F: FnOnce(Arc) -> Fut, Fut: std::future::Future>, { if let Some(docs) = self.docs() { @@ -126,7 +126,7 @@ impl Handler { fn with_docs_stream(self, f: F) -> impl Stream> where T: Send + 'static, - F: FnOnce(Arc) -> S, + F: FnOnce(Arc) -> S, S: Stream>, { if let Some(docs) = self.docs() { @@ -279,48 +279,13 @@ impl Handler { ) -> Result<(), RpcServerError> { use authors::Request::*; match msg { - List(msg) => { - chan.server_streaming(msg, self, |handler, req: authors::ListRequest| { - handler.with_docs_stream(|docs| docs.author_list(req)) - }) - .await - } - Create(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.author_create(req).await }) - }) - .await - } - Import(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.author_import(req).await }) - }) - .await - } - Export(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.author_export(req).await }) - }) - .await - } - Delete(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.author_delete(req).await }) - }) - .await - } - GetDefault(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { Ok(docs.author_default(req)) }) - }) - .await - } - SetDefault(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.author_set_default(req).await }) - }) - .await - } + List(msg) => chan.server_streaming(msg, self, Self::author_list).await, + Create(msg) => chan.rpc(msg, self, Self::author_create).await, + Import(msg) => chan.rpc(msg, self, Self::author_import).await, + Export(msg) => chan.rpc(msg, self, Self::author_export).await, + Delete(msg) => chan.rpc(msg, self, Self::author_delete).await, + GetDefault(msg) => chan.rpc(msg, self, Self::author_default).await, + SetDefault(msg) => chan.rpc(msg, self, Self::author_set_default).await, } } @@ -331,55 +296,14 @@ impl Handler { ) -> Result<(), RpcServerError> { use DocsRequest::*; match msg { - Open(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_open(req).await }) - }) - .await - } - Close(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_close(req).await }) - }) - .await - } - Status(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_status(req).await }) - }) - .await - } - List(msg) => { - chan.server_streaming(msg, self, |handler, req| { - handler.with_docs_stream(|docs| docs.doc_list(req)) - }) - .await - } - Create(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_create(req).await }) - }) - .await - } - Drop(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_drop(req).await }) - }) - .await - } - Import(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_import(req).await }) - }) - .await - } - Set(msg) => { - let blobs_store = self.blobs_store(); - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_set(&blobs_store, req).await }) - }) - .await - } + Open(msg) => chan.rpc(msg, self, Self::doc_open).await, + Close(msg) => chan.rpc(msg, self, Self::doc_close).await, + Status(msg) => chan.rpc(msg, self, Self::doc_status).await, + List(msg) => chan.server_streaming(msg, self, Self::doc_list).await, + Create(msg) => chan.rpc(msg, self, Self::doc_create).await, + Drop(msg) => chan.rpc(msg, self, Self::doc_drop).await, + Import(msg) => chan.rpc(msg, self, Self::doc_import).await, + Set(msg) => chan.rpc(msg, self, Self::doc_set).await, ImportFile(msg) => { chan.server_streaming(msg, self, Self::doc_import_file) .await @@ -388,74 +312,20 @@ impl Handler { chan.server_streaming(msg, self, Self::doc_export_file) .await } - Del(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_del(req).await }) - }) - .await - } - SetHash(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_set_hash(req).await }) - }) - .await - } - Get(msg) => { - chan.server_streaming(msg, self, |handler, req| { - handler.with_docs_stream(|docs| docs.doc_get_many(req)) - }) - .await - } - GetExact(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_get_exact(req).await }) - }) - .await - } - StartSync(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_start_sync(req).await }) - }) - .await - } - Leave(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_leave(req).await }) - }) - .await - } - Share(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_share(req).await }) - }) - .await - } + Del(msg) => chan.rpc(msg, self, Self::doc_del).await, + SetHash(msg) => chan.rpc(msg, self, Self::doc_set_hash).await, + Get(msg) => chan.server_streaming(msg, self, Self::doc_get_many).await, + GetExact(msg) => chan.rpc(msg, self, Self::doc_get_exact).await, + StartSync(msg) => chan.rpc(msg, self, Self::doc_start_sync).await, + Leave(msg) => chan.rpc(msg, self, Self::doc_leave).await, + Share(msg) => chan.rpc(msg, self, Self::doc_share).await, Subscribe(msg) => { - chan.try_server_streaming(msg, self, |handler, req| async move { - handler - .with_docs(|docs| async move { docs.doc_subscribe(req).await }) - .await - }) - .await - } - SetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_set_download_policy(req).await }) - }) - .await - } - GetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_get_download_policy(req).await }) - }) - .await - } - GetSyncPeers(msg) => { - chan.rpc(msg, self, |handler, req| { - handler.with_docs(|docs| async move { docs.doc_get_sync_peers(req).await }) - }) - .await + chan.try_server_streaming(msg, self, Self::doc_subscribe) + .await } + SetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_set_download_policy).await, + GetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_get_download_policy).await, + GetSyncPeers(msg) => chan.rpc(msg, self, Self::doc_get_sync_peers).await, } } @@ -677,7 +547,6 @@ impl Handler { msg: ImportFileRequest, progress: async_channel::Sender, ) -> anyhow::Result<()> { - let docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; use std::collections::BTreeMap; use iroh_blobs::store::ImportMode; @@ -732,7 +601,7 @@ impl Handler { let hash_and_format = temp_tag.inner(); let HashAndFormat { hash, .. } = *hash_and_format; - docs.doc_set_hash(SetHashRequest { + self.doc_set_hash(SetHashRequest { doc_id, author_id, key: key.clone(), diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index 0ab0c019f0..e8411aeb17 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -1,14 +1,13 @@ -//! This module contains an impl block on [`DocsProtocol`] with handlers for RPC requests +//! This module contains an impl block on [`Handler`] to handle docs related requests. use anyhow::anyhow; use futures_lite::{Stream, StreamExt}; use iroh_blobs::{store::Store as BaoStore, BlobFormat}; use iroh_docs::{Author, DocTicket, NamespaceSecret}; -use super::{RpcError, RpcResult}; +use super::{Handler, RpcError, RpcResult}; use crate::{ client::docs::ShareMode, - node::protocol::docs::DocsProtocol, rpc_protocol::{ authors::{ CreateRequest, CreateResponse, DeleteRequest, DeleteResponse, ExportRequest, @@ -34,363 +33,441 @@ use crate::{ /// Capacity for the flume channels to forward sync store iterators to async RPC streams. const ITER_CHANNEL_CAP: usize = 64; -#[allow(missing_docs)] -impl DocsProtocol { - pub async fn author_create(&self, _req: CreateRequest) -> RpcResult { - // TODO: pass rng - let author = Author::new(&mut rand::rngs::OsRng {}); - self.sync - .import_author(author.clone()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(CreateResponse { - author_id: author.id(), +impl Handler { + pub(super) async fn author_create(self, _req: CreateRequest) -> RpcResult { + self.with_docs(|docs| async move { + // TODO: pass rng + let author = Author::new(&mut rand::rngs::OsRng {}); + docs.sync + .import_author(author.clone()) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(CreateResponse { + author_id: author.id(), + }) }) + .await } - pub fn author_default(&self, _req: GetDefaultRequest) -> GetDefaultResponse { - let author_id = self.default_author.get(); - GetDefaultResponse { author_id } + pub(super) async fn author_default( + self, + _req: GetDefaultRequest, + ) -> RpcResult { + self.with_docs(|docs| async move { + let author_id = docs.default_author.get(); + Ok(GetDefaultResponse { author_id }) + }) + .await } - pub async fn author_set_default( - &self, + pub(super) async fn author_set_default( + self, req: SetDefaultRequest, ) -> RpcResult { - self.default_author - .set(req.author_id, &self.sync) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetDefaultResponse) + self.with_docs(|docs| async move { + docs.default_author + .set(req.author_id, &docs.sync) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(SetDefaultResponse) + }) + .await } - pub fn author_list( - &self, + pub(super) fn author_list( + self, _req: AuthorListRequest, ) -> impl Stream> + Unpin { - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = self.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.list_authors(tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|author_id| AuthorListResponse { author_id }) - .map_err(|e| RpcError::new(&*e)) + self.with_docs_stream(|docs| { + let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); + let sync = docs.sync.clone(); + // we need to spawn a task to send our request to the sync handle, because the method + // itself must be sync. + tokio::task::spawn(async move { + let tx2 = tx.clone(); + if let Err(err) = sync.list_authors(tx).await { + tx2.send(Err(err)).await.ok(); + } + }); + rx.boxed().map(|r| { + r.map(|author_id| AuthorListResponse { author_id }) + .map_err(|e| RpcError::new(&*e)) + }) }) } - pub async fn author_import(&self, req: ImportRequest) -> RpcResult { - let author_id = self - .sync - .import_author(req.author) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(ImportResponse { author_id }) + pub(super) async fn author_import(self, req: ImportRequest) -> RpcResult { + self.with_docs(|docs| async move { + let author_id = docs + .sync + .import_author(req.author) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(ImportResponse { author_id }) + }) + .await } - pub async fn author_export(&self, req: ExportRequest) -> RpcResult { - let author = self - .sync - .export_author(req.author) - .await - .map_err(|e| RpcError::new(&*e))?; + pub(super) async fn author_export(self, req: ExportRequest) -> RpcResult { + self.with_docs(|docs| async move { + let author = docs + .sync + .export_author(req.author) + .await + .map_err(|e| RpcError::new(&*e))?; - Ok(ExportResponse { author }) + Ok(ExportResponse { author }) + }) + .await } - pub async fn author_delete(&self, req: DeleteRequest) -> RpcResult { - if req.author == self.default_author.get() { - return Err(RpcError::new(&*anyhow!( - "Deleting the default author is not supported" - ))); - } - self.sync - .delete_author(req.author) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DeleteResponse) + pub(super) async fn author_delete(self, req: DeleteRequest) -> RpcResult { + self.with_docs(|docs| async move { + if req.author == docs.default_author.get() { + return Err(RpcError::new(&*anyhow!( + "Deleting the default author is not supported" + ))); + } + docs.sync + .delete_author(req.author) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(DeleteResponse) + }) + .await } - pub async fn doc_create(&self, _req: DocCreateRequest) -> RpcResult { - let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {}); - let id = namespace.id(); - self.sync - .import_namespace(namespace.into()) - .await - .map_err(|e| RpcError::new(&*e))?; - self.sync - .open(id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DocCreateResponse { id }) + pub(super) async fn doc_create(self, _req: DocCreateRequest) -> RpcResult { + self.with_docs(|docs| async move { + let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {}); + let id = namespace.id(); + docs.sync + .import_namespace(namespace.into()) + .await + .map_err(|e| RpcError::new(&*e))?; + docs.sync + .open(id, Default::default()) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(DocCreateResponse { id }) + }) + .await } - pub async fn doc_drop(&self, req: DropRequest) -> RpcResult { - let DropRequest { doc_id } = req; - self.leave(doc_id, true) - .await - .map_err(|e| RpcError::new(&*e))?; - self.sync - .drop_replica(doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DropResponse {}) + pub(super) async fn doc_drop(self, req: DropRequest) -> RpcResult { + self.with_docs(|docs| async move { + let DropRequest { doc_id } = req; + docs.leave(doc_id, true) + .await + .map_err(|e| RpcError::new(&*e))?; + docs.sync + .drop_replica(doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(DropResponse {}) + }) + .await } - pub fn doc_list( - &self, + pub(super) fn doc_list( + self, _req: DocListRequest, ) -> impl Stream> + Unpin { - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = self.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.list_replicas(tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|(id, capability)| DocListResponse { id, capability }) - .map_err(|e| RpcError::new(&*e)) + self.with_docs_stream(|docs| { + let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); + let sync = docs.sync.clone(); + // we need to spawn a task to send our request to the sync handle, because the method + // itself must be sync. + tokio::task::spawn(async move { + let tx2 = tx.clone(); + if let Err(err) = sync.list_replicas(tx).await { + tx2.send(Err(err)).await.ok(); + } + }); + rx.boxed().map(|r| { + r.map(|(id, capability)| DocListResponse { id, capability }) + .map_err(|e| RpcError::new(&*e)) + }) }) } - pub async fn doc_open(&self, req: OpenRequest) -> RpcResult { - self.sync - .open(req.doc_id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(OpenResponse {}) + pub(super) async fn doc_open(self, req: OpenRequest) -> RpcResult { + self.with_docs(|docs| async move { + docs.sync + .open(req.doc_id, Default::default()) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(OpenResponse {}) + }) + .await } - pub async fn doc_close(&self, req: CloseRequest) -> RpcResult { - self.sync - .close(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(CloseResponse {}) + pub(super) async fn doc_close(self, req: CloseRequest) -> RpcResult { + self.with_docs(|docs| async move { + docs.sync + .close(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(CloseResponse {}) + }) + .await } - pub async fn doc_status(&self, req: StatusRequest) -> RpcResult { - let status = self - .sync - .get_state(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(StatusResponse { status }) + pub(super) async fn doc_status(self, req: StatusRequest) -> RpcResult { + self.with_docs(|docs| async move { + let status = docs + .sync + .get_state(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(StatusResponse { status }) + }) + .await } - pub async fn doc_share(&self, req: ShareRequest) -> RpcResult { - let ShareRequest { - doc_id, - mode, - addr_options, - } = req; - let mut me = self - .endpoint - .node_addr() - .await - .map_err(|e| RpcError::new(&*e))?; - me.apply_options(addr_options); - - let capability = match mode { - ShareMode::Read => iroh_docs::Capability::Read(doc_id), - ShareMode::Write => { - let secret = self - .sync - .export_secret_key(doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - iroh_docs::Capability::Write(secret) - } - }; - self.start_sync(doc_id, vec![]) - .await - .map_err(|e| RpcError::new(&*e))?; - - Ok(ShareResponse(DocTicket { - capability, - nodes: vec![me], - })) + pub(super) async fn doc_share(self, req: ShareRequest) -> RpcResult { + self.with_docs(|docs| async move { + let ShareRequest { + doc_id, + mode, + addr_options, + } = req; + let mut me = docs + .endpoint + .node_addr() + .await + .map_err(|e| RpcError::new(&*e))?; + me.apply_options(addr_options); + + let capability = match mode { + ShareMode::Read => iroh_docs::Capability::Read(doc_id), + ShareMode::Write => { + let secret = docs + .sync + .export_secret_key(doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; + iroh_docs::Capability::Write(secret) + } + }; + docs.start_sync(doc_id, vec![]) + .await + .map_err(|e| RpcError::new(&*e))?; + + Ok(ShareResponse(DocTicket { + capability, + nodes: vec![me], + })) + }) + .await } - pub async fn doc_subscribe( - &self, + pub(super) async fn doc_subscribe( + self, req: DocSubscribeRequest, ) -> RpcResult>> { - let stream = self - .subscribe(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - - Ok(stream.map(|el| { - el.map(|event| DocSubscribeResponse { event }) - .map_err(|e| RpcError::new(&*e)) - })) + self.with_docs(|docs| async move { + let stream = docs + .subscribe(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; + + Ok(stream.map(|el| { + el.map(|event| DocSubscribeResponse { event }) + .map_err(|e| RpcError::new(&*e)) + })) + }) + .await } - pub async fn doc_import(&self, req: DocImportRequest) -> RpcResult { - let DocImportRequest { capability } = req; - let doc_id = self - .sync - .import_namespace(capability) - .await - .map_err(|e| RpcError::new(&*e))?; - self.sync - .open(doc_id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DocImportResponse { doc_id }) + pub(super) async fn doc_import(self, req: DocImportRequest) -> RpcResult { + self.with_docs(|docs| async move { + let DocImportRequest { capability } = req; + let doc_id = docs + .sync + .import_namespace(capability) + .await + .map_err(|e| RpcError::new(&*e))?; + docs.sync + .open(doc_id, Default::default()) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(DocImportResponse { doc_id }) + }) + .await } - pub async fn doc_start_sync(&self, req: StartSyncRequest) -> RpcResult { - let StartSyncRequest { doc_id, peers } = req; - self.start_sync(doc_id, peers) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(StartSyncResponse {}) + pub(super) async fn doc_start_sync( + self, + req: StartSyncRequest, + ) -> RpcResult { + self.with_docs(|docs| async move { + let StartSyncRequest { doc_id, peers } = req; + docs.start_sync(doc_id, peers) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(StartSyncResponse {}) + }) + .await } - pub async fn doc_leave(&self, req: LeaveRequest) -> RpcResult { - let LeaveRequest { doc_id } = req; - self.leave(doc_id, false) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(LeaveResponse {}) + pub(super) async fn doc_leave(self, req: LeaveRequest) -> RpcResult { + self.with_docs(|docs| async move { + let LeaveRequest { doc_id } = req; + docs.leave(doc_id, false) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(LeaveResponse {}) + }) + .await } - pub async fn doc_set( - &self, - bao_store: &B, - req: SetRequest, - ) -> RpcResult { - let SetRequest { - doc_id, - author_id, - key, - value, - } = req; - let len = value.len(); - let tag = bao_store - .import_bytes(value, BlobFormat::Raw) - .await - .map_err(|e| RpcError::new(&e))?; - self.sync - .insert_local(doc_id, author_id, key.clone(), *tag.hash(), len as u64) - .await - .map_err(|e| RpcError::new(&*e))?; - let entry = self - .sync - .get_exact(doc_id, author_id, key, false) - .await - .map_err(|e| RpcError::new(&*e))? - .ok_or_else(|| RpcError::new(&*anyhow!("failed to get entry after insertion")))?; - Ok(SetResponse { entry }) + pub(super) async fn doc_set(self, req: SetRequest) -> RpcResult { + let blobs_store = self.blobs_store(); + self.with_docs(|docs| async move { + let SetRequest { + doc_id, + author_id, + key, + value, + } = req; + let len = value.len(); + let tag = blobs_store + .import_bytes(value, BlobFormat::Raw) + .await + .map_err(|e| RpcError::new(&e))?; + docs.sync + .insert_local(doc_id, author_id, key.clone(), *tag.hash(), len as u64) + .await + .map_err(|e| RpcError::new(&*e))?; + let entry = docs + .sync + .get_exact(doc_id, author_id, key, false) + .await + .map_err(|e| RpcError::new(&*e))? + .ok_or_else(|| RpcError::new(&*anyhow!("failed to get entry after insertion")))?; + Ok(SetResponse { entry }) + }) + .await } - pub async fn doc_del(&self, req: DelRequest) -> RpcResult { - let DelRequest { - doc_id, - author_id, - prefix, - } = req; - let removed = self - .sync - .delete_prefix(doc_id, author_id, prefix) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DelResponse { removed }) + pub(super) async fn doc_del(self, req: DelRequest) -> RpcResult { + self.with_docs(|docs| async move { + let DelRequest { + doc_id, + author_id, + prefix, + } = req; + let removed = docs + .sync + .delete_prefix(doc_id, author_id, prefix) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(DelResponse { removed }) + }) + .await } - pub async fn doc_set_hash(&self, req: SetHashRequest) -> RpcResult { - let SetHashRequest { - doc_id, - author_id, - key, - hash, - size, - } = req; - self.sync - .insert_local(doc_id, author_id, key.clone(), hash, size) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetHashResponse {}) + pub(super) async fn doc_set_hash(self, req: SetHashRequest) -> RpcResult { + self.with_docs(|docs| async move { + let SetHashRequest { + doc_id, + author_id, + key, + hash, + size, + } = req; + docs.sync + .insert_local(doc_id, author_id, key.clone(), hash, size) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(SetHashResponse {}) + }) + .await } - pub fn doc_get_many( - &self, + pub(super) fn doc_get_many( + self, req: GetManyRequest, ) -> impl Stream> + Unpin { let GetManyRequest { doc_id, query } = req; - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = self.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.get_many(doc_id, query, tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|entry| GetManyResponse { entry }) - .map_err(|e| RpcError::new(&*e)) + self.with_docs_stream(move |docs| { + let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); + let sync = docs.sync.clone(); + // we need to spawn a task to send our request to the sync handle, because the method + // itself must be sync. + tokio::task::spawn(async move { + let tx2 = tx.clone(); + if let Err(err) = sync.get_many(doc_id, query, tx).await { + tx2.send(Err(err)).await.ok(); + } + }); + rx.boxed().map(|r| { + r.map(|entry| GetManyResponse { entry }) + .map_err(|e| RpcError::new(&*e)) + }) }) } - pub async fn doc_get_exact(&self, req: GetExactRequest) -> RpcResult { - let GetExactRequest { - doc_id, - author, - key, - include_empty, - } = req; - let entry = self - .sync - .get_exact(doc_id, author, key, include_empty) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetExactResponse { entry }) + pub(super) async fn doc_get_exact(self, req: GetExactRequest) -> RpcResult { + self.with_docs(|docs| async move { + let GetExactRequest { + doc_id, + author, + key, + include_empty, + } = req; + let entry = docs + .sync + .get_exact(doc_id, author, key, include_empty) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(GetExactResponse { entry }) + }) + .await } - pub async fn doc_set_download_policy( - &self, + pub(super) async fn doc_set_download_policy( + self, req: SetDownloadPolicyRequest, ) -> RpcResult { - self.sync - .set_download_policy(req.doc_id, req.policy) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetDownloadPolicyResponse {}) + self.with_docs(|docs| async move { + docs.sync + .set_download_policy(req.doc_id, req.policy) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(SetDownloadPolicyResponse {}) + }) + .await } - pub async fn doc_get_download_policy( - &self, + + pub(super) async fn doc_get_download_policy( + self, req: GetDownloadPolicyRequest, ) -> RpcResult { - let policy = self - .sync - .get_download_policy(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetDownloadPolicyResponse { policy }) + self.with_docs(|docs| async move { + let policy = docs + .sync + .get_download_policy(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(GetDownloadPolicyResponse { policy }) + }) + .await } - pub async fn doc_get_sync_peers( - &self, + pub(super) async fn doc_get_sync_peers( + self, req: GetSyncPeersRequest, ) -> RpcResult { - let peers = self - .sync - .get_sync_peers(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetSyncPeersResponse { peers }) + self.with_docs(|docs| async move { + let peers = docs + .sync + .get_sync_peers(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; + Ok(GetSyncPeersResponse { peers }) + }) + .await } }