Skip to content

Commit

Permalink
pipeline abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy committed Oct 7, 2024
1 parent dc08191 commit c600956
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 52 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"crates/katana/executor",
"crates/katana/node",
"crates/katana/node-bindings",
"crates/katana/pipeline",
"crates/katana/pool",
"crates/katana/primitives",
"crates/katana/rpc/rpc",
Expand Down Expand Up @@ -87,6 +88,7 @@ katana-db = { path = "crates/katana/storage/db" }
katana-executor = { path = "crates/katana/executor" }
katana-node = { path = "crates/katana/node", default-features = false }
katana-node-bindings = { path = "crates/katana/node-bindings" }
katana-pipeline = { path = "crates/katana/pipeline" }
katana-pool = { path = "crates/katana/pool" }
katana-primitives = { path = "crates/katana/primitives" }
katana-provider = { path = "crates/katana/storage/provider" }
Expand Down
23 changes: 16 additions & 7 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,31 @@ type BlockProductionWithTxnsFuture =
#[allow(missing_debug_implementations)]
pub struct BlockProducer<EF: ExecutorFactory> {
/// The inner mode of mining.
pub producer: RwLock<BlockProducerMode<EF>>,
pub producer: Arc<RwLock<BlockProducerMode<EF>>>,
}

impl<EF: ExecutorFactory> BlockProducer<EF> {
/// Creates a block producer that mines a new block every `interval` milliseconds.
pub fn interval(backend: Arc<Backend<EF>>, interval: u64) -> Self {
let prod = IntervalBlockProducer::new(backend, Some(interval));
Self { producer: BlockProducerMode::Interval(prod).into() }
let producer = IntervalBlockProducer::new(backend, Some(interval));
let producer = Arc::new(RwLock::new(BlockProducerMode::Interval(producer)));
Self { producer }
}

/// Creates a new block producer that will only be possible to mine by calling the
/// `katana_generateBlock` RPC method.
pub fn on_demand(backend: Arc<Backend<EF>>) -> Self {
let prod = IntervalBlockProducer::new(backend, None);
Self { producer: BlockProducerMode::Interval(prod).into() }
let producer = IntervalBlockProducer::new(backend, None);
let producer = Arc::new(RwLock::new(BlockProducerMode::Interval(producer)));
Self { producer }
}

/// Creates a block producer that mines a new block as soon as there are ready transactions in
/// the transactions pool.
pub fn instant(backend: Arc<Backend<EF>>) -> Self {
let prod = InstantBlockProducer::new(backend);
Self { producer: BlockProducerMode::Instant(prod).into() }
let producer = InstantBlockProducer::new(backend);
let producer = Arc::new(RwLock::new(BlockProducerMode::Instant(producer)));
Self { producer }
}

pub(super) fn queue(&self, transactions: Vec<ExecutableTxWithHash>) {
Expand Down Expand Up @@ -143,6 +146,12 @@ impl<EF: ExecutorFactory> BlockProducer<EF> {
}
}

impl<EF: ExecutorFactory> Clone for BlockProducer<EF> {
fn clone(&self) -> Self {
BlockProducer { producer: self.producer.clone() }
}
}

/// The inner type of [BlockProducer].
///
/// On _interval_ mining, a new block is opened for a fixed amount of interval. Within this
Expand Down
9 changes: 2 additions & 7 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::channel::mpsc::Receiver;
Expand Down Expand Up @@ -33,7 +32,7 @@ pub(crate) const LOG_TARGET: &str = "node";
#[allow(missing_debug_implementations)]
pub struct BlockProductionTask<EF: ExecutorFactory> {
/// creates new blocks
pub(crate) block_producer: Arc<BlockProducer<EF>>,
pub(crate) block_producer: BlockProducer<EF>,
/// the miner responsible to select transactions from the `pool´
pub(crate) miner: TransactionMiner,
/// the pool that holds all transactions
Expand All @@ -43,11 +42,7 @@ pub struct BlockProductionTask<EF: ExecutorFactory> {
}

impl<EF: ExecutorFactory> BlockProductionTask<EF> {
pub fn new(
pool: TxPool,
miner: TransactionMiner,
block_producer: Arc<BlockProducer<EF>>,
) -> Self {
pub fn new(pool: TxPool, miner: TransactionMiner, block_producer: BlockProducer<EF>) -> Self {
Self { block_producer, miner, pool, metrics: BlockProducerMetrics::default() }
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ version.workspace = true
katana-core.workspace = true
katana-db.workspace = true
katana-executor.workspace = true
katana-pipeline.workspace = true
katana-pool.workspace = true
katana-primitives.workspace = true
katana-provider.workspace = true
Expand Down
37 changes: 18 additions & 19 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,14 +19,12 @@ use katana_core::env::BlockContextGenerator;
#[allow(deprecated)]
use katana_core::sequencer::SequencerConfig;
use katana_core::service::block_producer::BlockProducer;
#[cfg(feature = "messaging")]
use katana_core::service::messaging::{MessagingService, MessagingTask};
use katana_core::service::{BlockProductionTask, TransactionMiner};
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pipeline::{stage, Pipeline};
use katana_pool::ordering::FiFo;
use katana_pool::validation::stateful::TxValidator;
use katana_pool::{TransactionPool, TxPool};
use katana_pool::TxPool;
use katana_primitives::block::FinalityStatus;
use katana_primitives::env::{CfgEnv, FeeTokenAddressses};
use katana_provider::providers::fork::ForkedProvider;
Expand Down Expand Up @@ -57,7 +56,7 @@ pub struct Handle {
pub rpc: RpcServer,
pub task_manager: TaskManager,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: Arc<BlockProducer<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
}

impl Handle {
Expand Down Expand Up @@ -185,11 +184,10 @@ pub async fn start(
BlockProducer::instant(Arc::clone(&backend))
};

// --- build transaction pool and miner
// --- build transaction pool

let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());
let miner = TransactionMiner::new(pool.add_listener());

// --- build metrics service

Expand All @@ -214,21 +212,22 @@ pub async fn start(

let task_manager = TaskManager::current();

// --- build and spawn the messaging task
// --- build sequencing stage

#[cfg(feature = "messaging")]
if let Some(config) = sequencer_config.messaging.clone() {
let messaging = MessagingService::new(config, pool.clone(), Arc::clone(&backend)).await?;
let task = MessagingTask::new(messaging);
task_manager.build_task().critical().name("Messaging").spawn(task);
}
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
task_manager.clone(),
block_producer.clone(),
sequencer_config.messaging.clone(),
);

let block_producer = Arc::new(block_producer);
// --- build and start the pipeline

// --- build and spawn the block production task
let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

let task = BlockProductionTask::new(pool.clone(), miner, block_producer.clone());
task_manager.build_task().critical().name("BlockProduction").spawn(task);
task_manager.spawn(pipeline.into_future());

// --- spawn rpc server

Expand All @@ -240,7 +239,7 @@ pub async fn start(

// Moved from `katana_rpc` crate
pub async fn spawn<EF: ExecutorFactory>(
node_components: (TxPool, Arc<Backend<EF>>, Arc<BlockProducer<EF>>, TxValidator),
node_components: (TxPool, Arc<Backend<EF>>, BlockProducer<EF>, TxValidator),
config: ServerConfig,
) -> Result<RpcServer> {
let (pool, backend, block_producer, validator) = node_components;
Expand Down
18 changes: 18 additions & 0 deletions crates/katana/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
edition.workspace = true
license.workspace = true
name = "katana-pipeline"
repository.workspace = true
version.workspace = true

[dependencies]
katana-core.workspace = true
katana-executor.workspace = true
katana-pool.workspace = true
katana-tasks.workspace = true

anyhow.workspace = true
async-trait.workspace = true
futures.workspace = true
thiserror.workspace = true
tracing.workspace = true
77 changes: 77 additions & 0 deletions crates/katana/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

pub mod stage;

use core::future::IntoFuture;

use futures::future::BoxFuture;
use stage::Stage;
use tracing::info;

/// The result of a pipeline execution.
pub type PipelineResult = Result<(), Error>;

/// The future type for [Pipeline]'s implementation of [IntoFuture].
pub type PipelineFut = BoxFuture<'static, PipelineResult>;

#[derive(Debug, thiserror::Error)]

Check warning on line 17 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L17

Added line #L17 was not covered by tests
pub enum Error {
#[error(transparent)]
Stage(#[from] stage::Error),
}

/// Manages the execution of stages.
///
/// The pipeline drives the execution of stages, running each stage to completion in the order they
/// were added.
///
/// Inspired by [`reth`]'s staged sync pipeline.
///
/// [`reth`]: /~https://github.com/paradigmxyz/reth/blob/c7aebff0b6bc19cd0b73e295497d3c5150d40ed8/crates/stages/api/src/pipeline/mod.rs#L66
pub struct Pipeline {
stages: Vec<Box<dyn Stage>>,
}

impl Pipeline {
/// Create a new empty pipeline.
pub fn new() -> Self {
Self { stages: Vec::new() }
}

/// Insert a new stage into the pipeline.
pub fn add_stage(&mut self, stage: Box<dyn Stage>) {
self.stages.push(stage);
}

/// Start the pipeline.
pub async fn run(&mut self) -> PipelineResult {
for stage in &mut self.stages {
info!(id = %stage.id(), "Executing stage");
stage.execute().await?;
}
Ok(())
}

Check warning on line 53 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L52-L53

Added lines #L52 - L53 were not covered by tests
}

impl IntoFuture for Pipeline {
type Output = PipelineResult;
type IntoFuture = PipelineFut;

fn into_future(mut self) -> Self::IntoFuture {
Box::pin(async move { self.run().await })
}
}

impl core::default::Default for Pipeline {
fn default() -> Self {
Self::new()
}

Check warning on line 68 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L66-L68

Added lines #L66 - L68 were not covered by tests
}

impl core::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.iter().map(|s| s.id()).collect::<Vec<_>>())
.finish()
}

Check warning on line 76 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L72-L76

Added lines #L72 - L76 were not covered by tests
}
34 changes: 34 additions & 0 deletions crates/katana/pipeline/src/stage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
mod sequencing;

pub use sequencing::Sequencing;

/// The result type of a stage execution. See [Stage::execute].
pub type StageResult = Result<(), Error>;

#[derive(Debug, Clone, Copy)]
pub enum StageId {
Sequencing,
}

impl core::fmt::Display for StageId {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StageId::Sequencing => write!(f, "Sequencing"),
}
}

Check warning on line 18 in crates/katana/pipeline/src/stage/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/stage/mod.rs#L14-L18

Added lines #L14 - L18 were not covered by tests
}

#[derive(Debug, thiserror::Error)]

Check warning on line 21 in crates/katana/pipeline/src/stage/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/stage/mod.rs#L21

Added line #L21 was not covered by tests
pub enum Error {
#[error(transparent)]
Other(#[from] anyhow::Error),
}

#[async_trait::async_trait]
pub trait Stage: Send + Sync {
/// Returns the id which uniquely identifies the stage.
fn id(&self) -> StageId;

/// Executes the stage.
async fn execute(&mut self) -> StageResult;
}
Loading

0 comments on commit c600956

Please sign in to comment.