Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Transactionpool: Make ready_at return earlier (#8995)
Browse files Browse the repository at this point in the history
`ready_at` returns when we have processed the requested block. However,
on startup we already have processed the best block and there
are no transactions in the pool on startup anyway. So, we can set `updated_at`
to the best block on startup.

Besides that `ready_at` now returns early when there are no ready nor
any future transactions in the pool.
  • Loading branch information
bkchr authored Jun 2, 2021
1 parent 4652f9e commit 538b15f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 18 deletions.
10 changes: 9 additions & 1 deletion client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,13 @@ mod tests {
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(), true.into(), api(), None, RevalidationType::Full, spawner.clone(),
Options::default(),
true.into(),
api(),
None,
RevalidationType::Full,
spawner.clone(),
0,
));
let env = ProposerFactory::new(
spawner.clone(),
Expand Down Expand Up @@ -373,6 +379,7 @@ mod tests {
None,
RevalidationType::Full,
spawner.clone(),
0,
));
let env = ProposerFactory::new(
spawner.clone(),
Expand Down Expand Up @@ -453,6 +460,7 @@ mod tests {
None,
RevalidationType::Full,
spawner.clone(),
0,
));
let env = ProposerFactory::new(
spawner.clone(),
Expand Down
66 changes: 49 additions & 17 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
}

impl<T, Block: BlockT> ReadyPoll<T, Block> {
fn new(best_block_number: NumberFor<Block>) -> Self {
Self {
updated_at: best_block_number,
pollers: Default::default(),
}
}

fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
self.updated_at = number;

Expand Down Expand Up @@ -189,6 +196,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType,
spawner: impl SpawnNamed,
best_block_number: NumberFor<Block>,
) -> Self {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone()));
let (revalidation_queue, background_task) = match revalidation_type {
Expand All @@ -213,7 +221,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
RevalidationType::Full => RevalidationStrategy::Always,
}
)),
ready_poll: Default::default(),
ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
metrics: PrometheusMetrics::new(prometheus),
}
}
Expand Down Expand Up @@ -309,21 +317,29 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
}

fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
let status = self.status();
// If there are no transactions in the pool, it is fine to return early.
//
// There could be transaction being added because of some re-org happening at the relevant
// block, but this is relative unlikely.
if status.ready == 0 && status.future == 0 {
return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
}

if self.ready_poll.lock().updated_at() >= at {
log::trace!(target: "txpool", "Transaction pool already processed block #{}", at);
let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
return Box::pin(futures::future::ready(iterator));
return async move { iterator }.boxed();
}

Box::pin(
self.ready_poll
.lock()
.add(at)
.map(|received| received.unwrap_or_else(|e| {
log::warn!("Error receiving pending set: {:?}", e);
Box::new(vec![].into_iter())
}))
)
self.ready_poll
.lock()
.add(at)
.map(|received| received.unwrap_or_else(|e| {
log::warn!("Error receiving pending set: {:?}", e);
Box::new(std::iter::empty())
}))
.boxed()
}

fn ready(&self) -> ReadyIteratorFor<PoolApi> {
Expand All @@ -334,7 +350,7 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
impl<Block, Client, Fetcher> LightPool<Block, Client, Fetcher>
where
Block: BlockT,
Client: sp_blockchain::HeaderBackend<Block> + 'static,
Client: sp_blockchain::HeaderBackend<Block> + sc_client_api::UsageProvider<Block> + 'static,
Fetcher: sc_client_api::Fetcher<Block> + 'static,
{
/// Create new basic transaction pool for a light node with the provided api.
Expand All @@ -345,9 +361,15 @@ where
client: Arc<Client>,
fetcher: Arc<Fetcher>,
) -> Self {
let pool_api = Arc::new(LightChainApi::new(client, fetcher));
let pool_api = Arc::new(LightChainApi::new(client.clone(), fetcher));
Self::with_revalidation_type(
options, false.into(), pool_api, prometheus, RevalidationType::Light, spawner,
options,
false.into(),
pool_api,
prometheus,
RevalidationType::Light,
spawner,
client.usage_info().chain.best_number,
)
}
}
Expand All @@ -357,8 +379,12 @@ where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: sc_client_api::ExecutorProvider<Block> + Send + Sync + 'static,
+ sp_runtime::traits::BlockIdTo<Block>
+ sc_client_api::ExecutorProvider<Block>
+ sc_client_api::UsageProvider<Block>
+ Send
+ Sync
+ 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
{
/// Create new basic transaction pool for a full node with the provided api.
Expand All @@ -371,7 +397,13 @@ where
) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool = Arc::new(Self::with_revalidation_type(
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner
options,
is_validator,
pool_api,
prometheus,
RevalidationType::Full,
spawner,
client.usage_info().chain.best_number,
));

// make transaction pool available for off-chain runtime calls.
Expand Down

0 comments on commit 538b15f

Please sign in to comment.