Skip to content

Commit

Permalink
rpc-v2/tx/tests: Add transaction broadcast tests and check propagated…
Browse files Browse the repository at this point in the history
… tx status (paritytech#3193)

This PR adds tests for the `transaction_broadcast` method.


The testing needs to coordinate the following components:
- The `TestApi` marks transactions as invalid and implements
`ChainApi::validate_transaction`
- this is what dictates if a transaction is valid or not and is called
from within the `BasicPool`
- The `BasicPool` which maintains the transactions and implements
`submit_and_watch` needed by the tx broadcast to submit the transaction
- The status of the transaction pool is exposed by mocking the BasicPool
- The `ChainHeadMockClient` which mocks the
`BlockchainEvents::import_notification_stream` needed by the tx
broadcast to know to which blocks the transaction is submitted

The following changes have been added to the substrate testing to
accommodate this:
- `TestApi` gets ` remove_invalid`, counterpart to `add_invalid` to
ensure an invalid transaction can become valid again; as well as a
priority setter for extrinsics
- `BasicPool` test constructor is extended with options for the
`PoolRotator`
- this mechanism is needed because transactions are banned for 30mins
(default) after they are declared invalid
  - testing bypasses this by providing a `Duration::ZERO`

### Testing Scenarios

- Capture the status of the transaction as it is normally broadcasted
- `transaction_stop` is valid while the transaction is in progress
- A future transaction is handled when the dependencies are completed
- Try to resubmit the transaction at a later block (currently invalid)
- An invalid transaction status is propagated; the transaction is marked
as temporarily banned; then the ban expires and transaction is
resubmitted
  
This builds on top of:
paritytech#3079
Part of: paritytech#3084

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
  • Loading branch information
lexnv and jsdw authored Feb 28, 2024
1 parent 0cf9a38 commit ec119e2
Show file tree
Hide file tree
Showing 10 changed files with 985 additions and 244 deletions.
4 changes: 2 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Client> ChainHeadMockClient<Client> {
BlockImportNotification::new(header.hash(), BlockOrigin::Own, header, true, None, sink);

for sink in self.import_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
let _ = sink.unbounded_send(notification.clone());
}
}

Expand All @@ -83,7 +83,7 @@ impl<Client> ChainHeadMockClient<Client> {
let notification = FinalityNotification::from_summary(summary, sink);

for sink in self.finality_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
let _ = sink.unbounded_send(notification.clone());
}
}
}
Expand Down
238 changes: 0 additions & 238 deletions substrate/client/rpc-spec-v2/src/transaction/tests.rs

This file was deleted.

100 changes: 100 additions & 0 deletions substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::sync::{atomic::AtomicUsize, Arc};
use tokio::sync::mpsc;

/// Wrap the `TaskExecutor` to know when the broadcast future is dropped.
#[derive(Clone)]
pub struct TaskExecutorBroadcast {
executor: TaskExecutor,
sender: mpsc::UnboundedSender<()>,
num_tasks: Arc<AtomicUsize>,
}

/// The channel that receives events when the broadcast futures are dropped.
pub type TaskExecutorRecv = mpsc::UnboundedReceiver<()>;

/// The state of the `TaskExecutorBroadcast`.
pub struct TaskExecutorState {
pub recv: TaskExecutorRecv,
pub num_tasks: Arc<AtomicUsize>,
}

impl TaskExecutorState {
pub fn num_tasks(&self) -> usize {
self.num_tasks.load(std::sync::atomic::Ordering::Acquire)
}
}

impl TaskExecutorBroadcast {
/// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures
/// are dropped.
pub fn new() -> (Self, TaskExecutorState) {
let (sender, recv) = mpsc::unbounded_channel();
let num_tasks = Arc::new(AtomicUsize::new(0));

(
Self { executor: TaskExecutor::new(), sender, num_tasks: num_tasks.clone() },
TaskExecutorState { recv, num_tasks },
)
}
}

impl SpawnNamed for TaskExecutorBroadcast {
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let num_tasks = self.num_tasks.clone();

let future = Box::pin(async move {
num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
future.await;
num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);

let _ = sender.send(());
});

self.executor.spawn(name, group, future)
}

fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let num_tasks = self.num_tasks.clone();

let future = Box::pin(async move {
num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
future.await;
num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);

let _ = sender.send(());
});

self.executor.spawn_blocking(name, group, future)
}
}
Loading

0 comments on commit ec119e2

Please sign in to comment.