Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(katana): stage sync pipeline #2502

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"crates/katana/executor",
"crates/katana/node",
"crates/katana/node-bindings",
"crates/katana/pipeline",
"crates/katana/pool",
"crates/katana/primitives",
"crates/katana/rpc/rpc",
Expand Down Expand Up @@ -87,6 +88,7 @@ katana-db = { path = "crates/katana/storage/db" }
katana-executor = { path = "crates/katana/executor" }
katana-node = { path = "crates/katana/node", default-features = false }
katana-node-bindings = { path = "crates/katana/node-bindings" }
katana-pipeline = { path = "crates/katana/pipeline" }
katana-pool = { path = "crates/katana/pool" }
katana-primitives = { path = "crates/katana/primitives" }
katana-provider = { path = "crates/katana/storage/provider" }
Expand Down
23 changes: 16 additions & 7 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,31 @@ type BlockProductionWithTxnsFuture =
#[allow(missing_debug_implementations)]
pub struct BlockProducer<EF: ExecutorFactory> {
/// The inner mode of mining.
pub producer: RwLock<BlockProducerMode<EF>>,
pub producer: Arc<RwLock<BlockProducerMode<EF>>>,
}

impl<EF: ExecutorFactory> BlockProducer<EF> {
/// Creates a block producer that mines a new block every `interval` milliseconds.
pub fn interval(backend: Arc<Backend<EF>>, interval: u64) -> Self {
let prod = IntervalBlockProducer::new(backend, Some(interval));
Self { producer: BlockProducerMode::Interval(prod).into() }
let producer = IntervalBlockProducer::new(backend, Some(interval));
let producer = Arc::new(RwLock::new(BlockProducerMode::Interval(producer)));
Self { producer }
}

/// Creates a new block producer that will only be possible to mine by calling the
/// `katana_generateBlock` RPC method.
pub fn on_demand(backend: Arc<Backend<EF>>) -> Self {
let prod = IntervalBlockProducer::new(backend, None);
Self { producer: BlockProducerMode::Interval(prod).into() }
let producer = IntervalBlockProducer::new(backend, None);
let producer = Arc::new(RwLock::new(BlockProducerMode::Interval(producer)));
Self { producer }
}

/// Creates a block producer that mines a new block as soon as there are ready transactions in
/// the transactions pool.
pub fn instant(backend: Arc<Backend<EF>>) -> Self {
let prod = InstantBlockProducer::new(backend);
Self { producer: BlockProducerMode::Instant(prod).into() }
let producer = InstantBlockProducer::new(backend);
let producer = Arc::new(RwLock::new(BlockProducerMode::Instant(producer)));
Self { producer }
}

pub(super) fn queue(&self, transactions: Vec<ExecutableTxWithHash>) {
Expand Down Expand Up @@ -143,6 +146,12 @@ impl<EF: ExecutorFactory> BlockProducer<EF> {
}
}

impl<EF: ExecutorFactory> Clone for BlockProducer<EF> {
fn clone(&self) -> Self {
BlockProducer { producer: self.producer.clone() }
}
}

/// The inner type of [BlockProducer].
///
/// On _interval_ mining, a new block is opened for a fixed amount of interval. Within this
Expand Down
9 changes: 2 additions & 7 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::channel::mpsc::Receiver;
Expand Down Expand Up @@ -33,7 +32,7 @@ pub(crate) const LOG_TARGET: &str = "node";
#[allow(missing_debug_implementations)]
pub struct BlockProductionTask<EF: ExecutorFactory> {
/// creates new blocks
pub(crate) block_producer: Arc<BlockProducer<EF>>,
pub(crate) block_producer: BlockProducer<EF>,
/// the miner responsible to select transactions from the `pool´
pub(crate) miner: TransactionMiner,
/// the pool that holds all transactions
Expand All @@ -43,11 +42,7 @@ pub struct BlockProductionTask<EF: ExecutorFactory> {
}

impl<EF: ExecutorFactory> BlockProductionTask<EF> {
pub fn new(
pool: TxPool,
miner: TransactionMiner,
block_producer: Arc<BlockProducer<EF>>,
) -> Self {
pub fn new(pool: TxPool, miner: TransactionMiner, block_producer: BlockProducer<EF>) -> Self {
Self { block_producer, miner, pool, metrics: BlockProducerMetrics::default() }
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ version.workspace = true
katana-core.workspace = true
katana-db.workspace = true
katana-executor.workspace = true
katana-pipeline.workspace = true
katana-pool.workspace = true
katana-primitives.workspace = true
katana-provider.workspace = true
Expand Down
37 changes: 18 additions & 19 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,14 +19,12 @@ use katana_core::env::BlockContextGenerator;
#[allow(deprecated)]
use katana_core::sequencer::SequencerConfig;
use katana_core::service::block_producer::BlockProducer;
#[cfg(feature = "messaging")]
use katana_core::service::messaging::{MessagingService, MessagingTask};
use katana_core::service::{BlockProductionTask, TransactionMiner};
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pipeline::{stage, Pipeline};
use katana_pool::ordering::FiFo;
use katana_pool::validation::stateful::TxValidator;
use katana_pool::{TransactionPool, TxPool};
use katana_pool::TxPool;
use katana_primitives::block::FinalityStatus;
use katana_primitives::env::{CfgEnv, FeeTokenAddressses};
use katana_provider::providers::fork::ForkedProvider;
Expand Down Expand Up @@ -57,7 +56,7 @@ pub struct Handle {
pub rpc: RpcServer,
pub task_manager: TaskManager,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: Arc<BlockProducer<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
}

impl Handle {
Expand Down Expand Up @@ -185,11 +184,10 @@ pub async fn start(
BlockProducer::instant(Arc::clone(&backend))
};

// --- build transaction pool and miner
// --- build transaction pool

let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());
let miner = TransactionMiner::new(pool.add_listener());

// --- build metrics service

Expand All @@ -214,21 +212,22 @@ pub async fn start(

let task_manager = TaskManager::current();

// --- build and spawn the messaging task
// --- build sequencing stage

#[cfg(feature = "messaging")]
if let Some(config) = sequencer_config.messaging.clone() {
let messaging = MessagingService::new(config, pool.clone(), Arc::clone(&backend)).await?;
let task = MessagingTask::new(messaging);
task_manager.build_task().critical().name("Messaging").spawn(task);
}
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
task_manager.clone(),
block_producer.clone(),
sequencer_config.messaging.clone(),
);

let block_producer = Arc::new(block_producer);
// --- build and start the pipeline

// --- build and spawn the block production task
let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

let task = BlockProductionTask::new(pool.clone(), miner, block_producer.clone());
task_manager.build_task().critical().name("BlockProduction").spawn(task);
task_manager.spawn(pipeline.into_future());

// --- spawn rpc server

Expand All @@ -240,7 +239,7 @@ pub async fn start(

// Moved from `katana_rpc` crate
pub async fn spawn<EF: ExecutorFactory>(
node_components: (TxPool, Arc<Backend<EF>>, Arc<BlockProducer<EF>>, TxValidator),
node_components: (TxPool, Arc<Backend<EF>>, BlockProducer<EF>, TxValidator),
config: ServerConfig,
) -> Result<RpcServer> {
let (pool, backend, block_producer, validator) = node_components;
Expand Down
18 changes: 18 additions & 0 deletions crates/katana/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
edition.workspace = true
license.workspace = true
name = "katana-pipeline"
repository.workspace = true
version.workspace = true

[dependencies]
katana-core.workspace = true
katana-executor.workspace = true
katana-pool.workspace = true
katana-tasks.workspace = true

anyhow.workspace = true
async-trait.workspace = true
futures.workspace = true
thiserror.workspace = true
tracing.workspace = true
77 changes: 77 additions & 0 deletions crates/katana/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

pub mod stage;

use core::future::IntoFuture;

use futures::future::BoxFuture;
use stage::Stage;
use tracing::info;

/// The result of a pipeline execution.
pub type PipelineResult = Result<(), Error>;

/// The future type for [Pipeline]'s implementation of [IntoFuture].
pub type PipelineFut = BoxFuture<'static, PipelineResult>;

#[derive(Debug, thiserror::Error)]

Check warning on line 17 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L17

Added line #L17 was not covered by tests
pub enum Error {
#[error(transparent)]
Stage(#[from] stage::Error),
}
Comment on lines +17 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Consider adding tests for the Error enum to enhance error handling

Including unit tests for the Error enum will ensure that error propagation from stages is functioning correctly. This will improve the robustness of the pipeline's error handling mechanism.

Would you like assistance in creating unit tests for the Error enum or opening a GitHub issue to track this task?

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 17-17: crates/katana/pipeline/src/lib.rs#L17
Added line #L17 was not covered by tests


/// Manages the execution of stages.
///
/// The pipeline drives the execution of stages, running each stage to completion in the order they
/// were added.
///
/// Inspired by [`reth`]'s staged sync pipeline.
///
/// [`reth`]: /~https://github.com/paradigmxyz/reth/blob/c7aebff0b6bc19cd0b73e295497d3c5150d40ed8/crates/stages/api/src/pipeline/mod.rs#L66
pub struct Pipeline {
stages: Vec<Box<dyn Stage>>,
}

impl Pipeline {
/// Create a new empty pipeline.
pub fn new() -> Self {
Self { stages: Vec::new() }
}

/// Insert a new stage into the pipeline.
pub fn add_stage(&mut self, stage: Box<dyn Stage>) {
self.stages.push(stage);
}

/// Start the pipeline.
pub async fn run(&mut self) -> PipelineResult {
for stage in &mut self.stages {
info!(id = %stage.id(), "Executing stage");
stage.execute().await?;
}
Ok(())
}

Check warning on line 53 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L52-L53

Added lines #L52 - L53 were not covered by tests
Comment on lines +47 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Ensure the run method is adequately tested

The run method is central to the pipeline's operation. Adding tests to verify that stages execute in order and handle errors appropriately will increase confidence in the pipeline's reliability.

Can I assist in writing tests for the run method or open a GitHub issue to address this?

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 52-53: crates/katana/pipeline/src/lib.rs#L52-L53
Added lines #L52 - L53 were not covered by tests

}

impl IntoFuture for Pipeline {
type Output = PipelineResult;
type IntoFuture = PipelineFut;

fn into_future(mut self) -> Self::IntoFuture {
Box::pin(async move { self.run().await })
}
}

impl core::default::Default for Pipeline {
fn default() -> Self {
Self::new()
}

Check warning on line 68 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L66-L68

Added lines #L66 - L68 were not covered by tests
}

impl core::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.iter().map(|s| s.id()).collect::<Vec<_>>())
.finish()
}

Check warning on line 76 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L72-L76

Added lines #L72 - L76 were not covered by tests
}
34 changes: 34 additions & 0 deletions crates/katana/pipeline/src/stage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
mod sequencing;

pub use sequencing::Sequencing;

/// The result type of a stage execution. See [Stage::execute].
pub type StageResult = Result<(), Error>;

#[derive(Debug, Clone, Copy)]
pub enum StageId {
Sequencing,
}

impl core::fmt::Display for StageId {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StageId::Sequencing => write!(f, "Sequencing"),
}
}

Check warning on line 18 in crates/katana/pipeline/src/stage/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/stage/mod.rs#L14-L18

Added lines #L14 - L18 were not covered by tests
}
Comment on lines +13 to +19
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Consider adding tests for StageId's Display implementation.

The Display implementation for StageId (lines 14-18) is not currently covered by tests, as indicated by the static analysis tools. Including unit tests will ensure that the formatted output remains correct, especially if new variants are added to StageId in the future.

Would you like assistance in creating these unit tests?

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 14-18: crates/katana/pipeline/src/stage/mod.rs#L14-L18
Added lines #L14 - L18 were not covered by tests


#[derive(Debug, thiserror::Error)]

Check warning on line 21 in crates/katana/pipeline/src/stage/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/stage/mod.rs#L21

Added line #L21 was not covered by tests
pub enum Error {
#[error(transparent)]
Other(#[from] anyhow::Error),
}
Comment on lines +21 to +25
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Adding tests for the Error enum would be beneficial.

The Error enum derived from thiserror::Error (line 21) is not covered by tests. Testing the error handling ensures that errors are correctly propagated and formatted, which is crucial for effective debugging and reliability.

Would you like help in writing tests for the Error enum?

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 21-21: crates/katana/pipeline/src/stage/mod.rs#L21
Added line #L21 was not covered by tests


#[async_trait::async_trait]
pub trait Stage: Send + Sync {
/// Returns the id which uniquely identifies the stage.
fn id(&self) -> StageId;

/// Executes the stage.
async fn execute(&mut self) -> StageResult;
}
Loading
Loading