Skip to content

Commit

Permalink
[eth-indexer] subscribe to finalize blocks instead of best blocks (#7260
Browse files Browse the repository at this point in the history
)

For eth-indexer, it's probably safer to use `subscribe_finalized` and
index these blocks into the DB rather than `subscribe_best`

---------

Co-authored-by: command-bot <>
  • Loading branch information
pgherveou authored Jan 20, 2025
1 parent 2c4cecc commit cbf3925
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
10 changes: 10 additions & 0 deletions prdoc/pr_7260.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: '[eth-indexer] subscribe to finalize blocks instead of best blocks'
doc:
- audience: Runtime Dev
description: 'For eth-indexer, it''s probably safer to use `subscribe_finalized`
and index these blocks into the DB rather than `subscribe_best`

'
crates:
- name: pallet-revive-eth-rpc
bump: minor
45 changes: 29 additions & 16 deletions substrate/frame/revive/rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ pub type Shared<T> = Arc<RwLock<T>>;
/// The runtime balance type.
pub type Balance = u128;

/// The subscription type used to listen to new blocks.
pub enum SubscriptionType {
/// Subscribe to the best blocks.
BestBlocks,
/// Subscribe to the finalized blocks.
FinalizedBlocks,
}

/// Unwrap the original `jsonrpsee::core::client::Error::Call` error.
fn unwrap_call_err(err: &subxt::error::RpcError) -> Option<ErrorObjectOwned> {
use subxt::backend::rpc::reconnecting_rpc_client;
Expand Down Expand Up @@ -278,19 +286,23 @@ impl Client {

/// Subscribe to new best blocks, and execute the async closure with
/// the extracted block and ethereum transactions
async fn subscribe_new_blocks<F, Fut>(&self, callback: F) -> Result<(), ClientError>
async fn subscribe_new_blocks<F, Fut>(
&self,
subscription_type: SubscriptionType,
callback: F,
) -> Result<(), ClientError>
where
F: Fn(SubstrateBlock) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<(), ClientError>> + Send,
{
log::info!(target: LOG_TARGET, "Subscribing to new blocks");
let mut block_stream = match self.api.blocks().subscribe_best().await {
Ok(s) => s,
Err(err) => {
log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
return Err(err.into());
},
};
let mut block_stream = match subscription_type {
SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await,
SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await,
}
.inspect_err(|err| {
log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}");
})?;

while let Some(block) = block_stream.next().await {
let block = match block {
Expand Down Expand Up @@ -324,7 +336,7 @@ impl Client {
let client = self.clone();
spawn_handle.spawn("subscribe-blocks", None, async move {
let res = client
.subscribe_new_blocks(|block| async {
.subscribe_new_blocks(SubscriptionType::BestBlocks, |block| async {
let receipts = extract_receipts_from_block(&block).await?;

client.receipt_provider.insert(&block.hash(), &receipts).await;
Expand All @@ -347,13 +359,14 @@ impl Client {
&self,
oldest_block: Option<SubstrateBlockNumber>,
) -> Result<(), ClientError> {
let new_blocks_fut = self.subscribe_new_blocks(|block| async move {
let receipts = extract_receipts_from_block(&block).await.inspect_err(|err| {
log::error!(target: LOG_TARGET, "Failed to extract receipts from block: {err:?}");
})?;
self.receipt_provider.insert(&block.hash(), &receipts).await;
Ok(())
});
let new_blocks_fut =
self.subscribe_new_blocks(SubscriptionType::FinalizedBlocks, |block| async move {
let receipts = extract_receipts_from_block(&block).await.inspect_err(|err| {
log::error!(target: LOG_TARGET, "Failed to extract receipts from block: {err:?}");
})?;
self.receipt_provider.insert(&block.hash(), &receipts).await;
Ok(())
});

let Some(oldest_block) = oldest_block else { return new_blocks_fut.await };

Expand Down

0 comments on commit cbf3925

Please sign in to comment.