Skip to content

Commit

Permalink
rpc-v2: Implement archive_unstable_storageDiff (paritytech#5997)
Browse files Browse the repository at this point in the history
This PR implements the `archive_unstable_storageDiff`.

The implementation follows the rpc-v2 spec from:
-  paritytech/json-rpc-interface-spec#159.
- builds on top of
paritytech/json-rpc-interface-spec#161

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
  • Loading branch information
2 people authored and Krayt78 committed Dec 18, 2024
1 parent b8f05bf commit a7bc763
Show file tree
Hide file tree
Showing 10 changed files with 1,449 additions and 18 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions prdoc/pr_5997.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Implement archive_unstable_storageDiff method

doc:
- audience: Node Dev
description: |
This PR implements the `archive_unstable_storageDiff` rpc-v2 method.
Developers can use this method to fetch the storage differences
between two blocks. This is useful for oracles and archive nodes.
For more details see: /~https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/archive_unstable_storageDiff.md.

crates:
- name: sc-rpc-spec-v2
bump: major
- name: sc-service
bump: patch
1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ log = { workspace = true, default-features = true }
futures-util = { workspace = true }
rand = { workspace = true, default-features = true }
schnellru = { workspace = true }
itertools = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
Expand Down
22 changes: 21 additions & 1 deletion substrate/client/rpc-spec-v2/src/archive/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
//! API trait of the archive methods.
use crate::{
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
MethodResult,
};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
Expand Down Expand Up @@ -104,4 +107,21 @@ pub trait ArchiveApi<Hash> {
items: Vec<PaginatedStorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult>;

/// Returns the storage difference between two blocks.
///
/// # Unstable
///
/// This method is unstable and can change in minor or patch releases.
#[subscription(
name = "archive_unstable_storageDiff" => "archive_unstable_storageDiffEvent",
unsubscribe = "archive_unstable_storageDiff_stopStorageDiff",
item = ArchiveStorageDiffEvent,
)]
fn archive_unstable_storage_diff(
&self,
hash: Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Hash>,
);
}
89 changes: 84 additions & 5 deletions substrate/client/rpc-spec-v2/src/archive/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,29 @@
//! API implementation for `archive`.
use crate::{
archive::{error::Error as ArchiveError, ArchiveApiServer},
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
hex_string, MethodResult,
archive::{
archive_storage::{ArchiveStorage, ArchiveStorageDiff},
error::Error as ArchiveError,
ArchiveApiServer,
},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
hex_string, MethodResult, SubscriptionTaskExecutor,
};

use codec::Encode;
use jsonrpsee::core::{async_trait, RpcResult};
use futures::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
PendingSubscriptionSink,
};
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::Subscription;
use sp_api::{CallApiAt, CallContext};
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
Expand All @@ -41,7 +53,9 @@ use sp_runtime::{
};
use std::{collections::HashSet, marker::PhantomData, sync::Arc};

use super::archive_storage::ArchiveStorage;
use tokio::sync::mpsc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";

/// The configuration of [`Archive`].
pub struct ArchiveConfig {
Expand All @@ -64,6 +78,12 @@ const MAX_DESCENDANT_RESPONSES: usize = 5;
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;

/// The buffer capacity for each storage query.
///
/// This is small because the underlying JSON-RPC server has
/// its down buffer capacity per connection as well.
const STORAGE_QUERY_BUF: usize = 16;

impl Default for ArchiveConfig {
fn default() -> Self {
Self {
Expand All @@ -79,6 +99,8 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items the `archive_storage` can return for a descendant query before
Expand All @@ -96,12 +118,14 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
client: Arc<Client>,
backend: Arc<BE>,
genesis_hash: GenesisHash,
executor: SubscriptionTaskExecutor,
config: ArchiveConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend,
executor,
genesis_hash,
storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items: config.max_queried_items,
Expand Down Expand Up @@ -278,4 +302,59 @@ where

Ok(storage_client.handle_query(hash, items, child_trie))
}

fn archive_unstable_storage_diff(
&self,
pending: PendingSubscriptionSink,
hash: Block::Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Block::Hash>,
) {
let storage_client = ArchiveStorageDiff::new(self.client.clone());
let client = self.client.clone();

log::trace!(target: LOG_TARGET, "Storage diff subscription started");

let fut = async move {
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };

let previous_hash = if let Some(previous_hash) = previous_hash {
previous_hash
} else {
let Ok(Some(current_header)) = client.header(hash) else {
let message = format!("Block header is not present: {hash}");
let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
return
};
*current_header.parent_hash()
};

let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
let storage_fut =
storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());

// We don't care about the return value of this join:
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
}

/// Sends all the events to the sink.
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
while let Some(event) = rx.recv().await {
if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
}

if sink.send(&event).await.is_err() {
return
}
}
}
Loading

0 comments on commit a7bc763

Please sign in to comment.