Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb RPC listener up to caller #5038

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6afa9b4
Plumb RPC listener up to caller
nickvikeras Jul 16, 2024
2a41755
Fix port propagation
nickvikeras Jul 18, 2024
63409c5
Removing duplicated type constraint
nickvikeras Jul 18, 2024
52619cb
Removing accidental auto-format-on-save
nickvikeras Jul 18, 2024
edb42bb
Remove old Server type alias
nickvikeras Aug 5, 2024
1a13c3c
Update substrate/client/rpc-servers/src/lib.rs
nickvikeras Aug 6, 2024
370990a
Update substrate/client/rpc-servers/src/lib.rs
nickvikeras Aug 6, 2024
c360b6d
Removing box
nickvikeras Aug 6, 2024
77cbb97
Adding prdoc
nickvikeras Aug 7, 2024
37472e1
Update prdoc/pr_5038.prdoc
nickvikeras Aug 9, 2024
1f81d96
Update prdoc/pr_5038.prdoc
nickvikeras Aug 9, 2024
c518301
Update prdoc/pr_5038.prdoc
nickvikeras Aug 9, 2024
e71565b
Merge branch 'master' into vikeras-rpc-listener-plumbing
nickvikeras Aug 12, 2024
70c6ff1
Merge branch 'master' into vikeras-rpc-listener-plumbing
nickvikeras Aug 14, 2024
7491090
Merge branch 'master' into vikeras-rpc-listener-plumbing
nickvikeras Aug 15, 2024
ff4bf3d
Merge branch 'master' into vikeras-rpc-listener-plumbing
nickvikeras Aug 23, 2024
50197cb
Merge branch 'master' into vikeras-rpc-listener-plumbing
nickvikeras Sep 5, 2024
a8efb66
Fixing previous merge commit
nickvikeras Sep 5, 2024
5e690bf
Update substrate/client/service/src/lib.rs
nickvikeras Sep 6, 2024
f19f5de
Removing waiting mod as requested in PR
nickvikeras Sep 6, 2024
37ee0ab
Merge branch 'master' into vikeras-rpc-listener-plumbing
niklasad1 Sep 9, 2024
20fc75a
Merge branch 'master' into vikeras-rpc-listener-plumbing
niklasad1 Sep 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use std::{error::Error as StdError, net::SocketAddr, num::NonZeroU32, sync::Arc,
use jsonrpsee::{
core::BoxError,
server::{
serve_with_graceful_shutdown, stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder,
serve_with_graceful_shutdown, stop_channel, ws, PingConfig, ServerHandle, StopHandle,
TowerServiceBuilder,
},
Methods, RpcModule,
};
Expand All @@ -49,8 +50,30 @@ pub use middleware::{Metrics, MiddlewareLayer, RpcMetrics};

const MEGABYTE: u32 = 1024 * 1024;

/// Type alias for the JSON-RPC server.
pub type Server = jsonrpsee::server::ServerHandle;
/// Type to encapsulate the server handle and listening address.
pub struct Server {
/// Handle to the rpc server
handle: ServerHandle,
/// Listening address of the server
listen_addr: Option<SocketAddr>,
}

impl Server {
/// Creates a new Server.
pub fn new(handle: ServerHandle, listen_addr: Option<SocketAddr>) -> Server {
Server { handle, listen_addr }
}

/// Returns the `jsonrpsee::server::ServerHandle` for this Server. Can be used to stop the server.
pub fn handle(&self) -> &ServerHandle {
&self.handle
}

/// The listen address for the running RPC service.
pub fn listen_addr(&self) -> Option<&SocketAddr> {
self.listen_addr.as_ref()
}
}

/// RPC server configuration.
#[derive(Debug)]
Expand Down Expand Up @@ -264,5 +287,5 @@ where
format_cors(cors)
);

Ok(server_handle)
Ok(Server::new(server_handle, local_addr))
}
35 changes: 31 additions & 4 deletions substrate/client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::{
build_network_future, build_system_rpc_future,
client::{Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig},
config::{Configuration, KeystoreConfig, Multiaddr, PrometheusConfig},
error::Error,
metrics::MetricsService,
start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
Expand All @@ -43,6 +43,7 @@ use sc_executor::{
use sc_keystore::LocalKeystore;
use sc_network::{
config::{FullNetworkConfiguration, SyncMode},
multiaddr::Protocol,
service::{
traits::{PeerStore, RequestResponseConfig},
NotificationMetrics,
Expand Down Expand Up @@ -378,6 +379,19 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub telemetry: Option<&'a mut Telemetry>,
}

// Wrapper for HTTP and WS servers that makes sure they are properly shut down.
mod waiting {
nickvikeras marked this conversation as resolved.
Show resolved Hide resolved
pub struct Server(pub Option<jsonrpsee::server::ServerHandle>);

impl Drop for Server {
fn drop(&mut self) {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(server) = self.0.take() {
// This doesn't not wait for the server to be stopped but fires the signal.
let _ = server.stop();
}
}
}
}
/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
Expand Down Expand Up @@ -507,8 +521,21 @@ where
)
};

let rpc = start_rpc_servers(&config, gen_rpc_module, rpc_id_provider)?;
let rpc_handlers = RpcHandlers(Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()));
let server = start_rpc_servers(&config, gen_rpc_module, rpc_id_provider)?;

let listen_addrs = match server.listen_addr() {
Some(socket_addr) => {
let mut multiaddr: Multiaddr = socket_addr.ip().into();
multiaddr.push(Protocol::Tcp(socket_addr.port()));
vec![multiaddr]
},
None => vec![],
};

let rpc_handlers = RpcHandlers {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't makes sense to add listen_addresses to RpcHandlers this just an in-memory way to perform rpc calls, please remove it RpcHandlers.

Why did you add it (maybe I'm missing something)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add it (maybe I'm missing something)?

Just because the RpcHandlers type is what is getting returned to the caller. Would a new return type that can provide both the Server and the RpcHandlers be preferred?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aight, I see one may want to know the listen addrs on the rpc endpoint after calling spawn_tasks.

It's probably fine to keep the RpcHandlers or rename to the RpcService or something, but it needs to documented that is represent a running RPC server as well with the legacy in-memory rpc calls.

We may want to remove latter at some point, it was for WASM stuff before smoldot was a thing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some notes with a link to this comment thread.

rpc_module: Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()),
listen_addresses: listen_addrs,
};

// Spawn informant task
spawn_handle.spawn(
Expand All @@ -522,7 +549,7 @@ where
),
);

task_manager.keep_alive((config.base_path, rpc));
task_manager.keep_alive((config.base_path, waiting::Server(Some(server.handle().to_owned()))));

Ok(rpc_handlers)
}
Expand Down
41 changes: 21 additions & 20 deletions substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod client;
mod metrics;
mod task_manager;

use crate::config::Multiaddr;
use std::{collections::HashMap, net::SocketAddr};

use codec::{Decode, Encode};
Expand All @@ -47,6 +48,7 @@ use sc_network::{
};
use sc_network_sync::SyncingService;
use sc_network_types::PeerId;
use sc_rpc_server::Server;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain::HeaderMetadata;
use sp_consensus::SyncOracle;
Expand Down Expand Up @@ -96,9 +98,17 @@ pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT

const DEFAULT_PROTOCOL_ID: &str = "sup";

/// RPC handlers that can perform RPC queries.
/// A running RPC service that can perform in-memory RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(Arc<RpcModule<()>>);
pub struct RpcHandlers {
// This is legacy and may be removed at some point, it was for WASM stuff before smoldot was a
// thing. /~https://github.com/paritytech/polkadot-sdk/pull/5038#discussion_r1694971805
rpc_module: Arc<RpcModule<()>>,

// This can be used to introspect the port the RPC server is listening on. SDK consumers are
// depending on this and it should be supported even if in-memory query support is removed.
listen_addresses: Vec<Multiaddr>,
}

impl RpcHandlers {
/// Starts an RPC query.
Expand All @@ -120,12 +130,17 @@ impl RpcHandlers {
// This limit is used to prevent panics and is large enough.
const TOKIO_MPSC_MAX_SIZE: usize = tokio::sync::Semaphore::MAX_PERMITS;

self.0.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE).await
self.rpc_module.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE).await
}

/// Provides access to the underlying `RpcModule`
pub fn handle(&self) -> Arc<RpcModule<()>> {
self.0.clone()
self.rpc_module.clone()
}

/// Provides access to listen addresses
pub fn listen_addresses(&self) -> &[Multiaddr] {
&self.listen_addresses[..]
}
}

Expand Down Expand Up @@ -353,26 +368,12 @@ pub async fn build_system_rpc_future<
debug!("`NetworkWorker` has terminated, shutting down the system RPC future.");
}

// Wrapper for HTTP and WS servers that makes sure they are properly shut down.
mod waiting {
pub struct Server(pub Option<sc_rpc_server::Server>);

impl Drop for Server {
fn drop(&mut self) {
if let Some(server) = self.0.take() {
// This doesn't not wait for the server to be stopped but fires the signal.
let _ = server.stop();
}
}
}
}

/// Starts RPC servers.
pub fn start_rpc_servers<R>(
config: &Configuration,
gen_rpc_module: R,
rpc_id_provider: Option<Box<dyn RpcSubscriptionIdProvider>>,
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error>
) -> Result<Server, error::Error>
where
R: Fn(sc_rpc::DenyUnsafe) -> Result<RpcModule<()>, Error>,
{
Expand Down Expand Up @@ -419,7 +420,7 @@ where
match tokio::task::block_in_place(|| {
config.tokio_handle.block_on(sc_rpc_server::start_server(server_config))
}) {
Ok(server) => Ok(Box::new(waiting::Server(Some(server)))),
Ok(server) => Ok(server),
nickvikeras marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => Err(Error::Application(e)),
}
}
Expand Down
Loading