From c600956113033e140c20e01a01eacd5cca498785 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 7 Oct 2024 17:25:47 -0400 Subject: [PATCH] pipeline abstraction --- Cargo.lock | 16 ++++ Cargo.toml | 2 + .../katana/core/src/service/block_producer.rs | 23 ++++-- crates/katana/core/src/service/mod.rs | 9 +-- crates/katana/node/Cargo.toml | 1 + crates/katana/node/src/lib.rs | 37 +++++---- crates/katana/pipeline/Cargo.toml | 18 +++++ crates/katana/pipeline/src/lib.rs | 77 ++++++++++++++++++ crates/katana/pipeline/src/stage/mod.rs | 34 ++++++++ .../katana/pipeline/src/stage/sequencing.rs | 79 +++++++++++++++++++ crates/katana/pool/src/validation/stateful.rs | 4 +- crates/katana/rpc/rpc/src/dev.rs | 4 +- crates/katana/rpc/rpc/src/saya.rs | 9 +-- crates/katana/rpc/rpc/src/starknet/mod.rs | 4 +- crates/katana/rpc/rpc/src/torii.rs | 10 +-- 15 files changed, 275 insertions(+), 52 deletions(-) create mode 100644 crates/katana/pipeline/Cargo.toml create mode 100644 crates/katana/pipeline/src/lib.rs create mode 100644 crates/katana/pipeline/src/stage/mod.rs create mode 100644 crates/katana/pipeline/src/stage/sequencing.rs diff --git a/Cargo.lock b/Cargo.lock index 8aac815ca6..7aa378e2e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8029,6 +8029,7 @@ dependencies = [ "katana-core", "katana-db", "katana-executor", + "katana-pipeline", "katana-pool", "katana-primitives", "katana-provider", @@ -8057,6 +8058,21 @@ dependencies = [ "url", ] +[[package]] +name = "katana-pipeline" +version = "1.0.0-alpha.14" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "katana-core", + "katana-executor", + "katana-pool", + "katana-tasks", + "thiserror", + "tracing", +] + [[package]] name = "katana-pool" version = "1.0.0-alpha.14" diff --git a/Cargo.toml b/Cargo.toml index ad4707a630..987e8b51be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "crates/katana/executor", "crates/katana/node", "crates/katana/node-bindings", + "crates/katana/pipeline", "crates/katana/pool", "crates/katana/primitives", "crates/katana/rpc/rpc", @@ -87,6 +88,7 @@ katana-db = { path = "crates/katana/storage/db" } katana-executor = { path = "crates/katana/executor" } katana-node = { path = "crates/katana/node", default-features = false } katana-node-bindings = { path = "crates/katana/node-bindings" } +katana-pipeline = { path = "crates/katana/pipeline" } katana-pool = { path = "crates/katana/pool" } katana-primitives = { path = "crates/katana/primitives" } katana-provider = { path = "crates/katana/storage/provider" } diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 618d0f2c03..ef0829828d 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -74,28 +74,31 @@ type BlockProductionWithTxnsFuture = #[allow(missing_debug_implementations)] pub struct BlockProducer { /// The inner mode of mining. - pub producer: RwLock>, + pub producer: Arc>>, } impl BlockProducer { /// Creates a block producer that mines a new block every `interval` milliseconds. pub fn interval(backend: Arc>, interval: u64) -> Self { - let prod = IntervalBlockProducer::new(backend, Some(interval)); - Self { producer: BlockProducerMode::Interval(prod).into() } + let producer = IntervalBlockProducer::new(backend, Some(interval)); + let producer = Arc::new(RwLock::new(BlockProducerMode::Interval(producer))); + Self { producer } } /// Creates a new block producer that will only be possible to mine by calling the /// `katana_generateBlock` RPC method. pub fn on_demand(backend: Arc>) -> Self { - let prod = IntervalBlockProducer::new(backend, None); - Self { producer: BlockProducerMode::Interval(prod).into() } + let producer = IntervalBlockProducer::new(backend, None); + let producer = Arc::new(RwLock::new(BlockProducerMode::Interval(producer))); + Self { producer } } /// Creates a block producer that mines a new block as soon as there are ready transactions in /// the transactions pool. pub fn instant(backend: Arc>) -> Self { - let prod = InstantBlockProducer::new(backend); - Self { producer: BlockProducerMode::Instant(prod).into() } + let producer = InstantBlockProducer::new(backend); + let producer = Arc::new(RwLock::new(BlockProducerMode::Instant(producer))); + Self { producer } } pub(super) fn queue(&self, transactions: Vec) { @@ -143,6 +146,12 @@ impl BlockProducer { } } +impl Clone for BlockProducer { + fn clone(&self) -> Self { + BlockProducer { producer: self.producer.clone() } + } +} + /// The inner type of [BlockProducer]. /// /// On _interval_ mining, a new block is opened for a fixed amount of interval. Within this diff --git a/crates/katana/core/src/service/mod.rs b/crates/katana/core/src/service/mod.rs index 2ff904f733..dabd76363a 100644 --- a/crates/katana/core/src/service/mod.rs +++ b/crates/katana/core/src/service/mod.rs @@ -3,7 +3,6 @@ use std::future::Future; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; use futures::channel::mpsc::Receiver; @@ -33,7 +32,7 @@ pub(crate) const LOG_TARGET: &str = "node"; #[allow(missing_debug_implementations)] pub struct BlockProductionTask { /// creates new blocks - pub(crate) block_producer: Arc>, + pub(crate) block_producer: BlockProducer, /// the miner responsible to select transactions from the `pool´ pub(crate) miner: TransactionMiner, /// the pool that holds all transactions @@ -43,11 +42,7 @@ pub struct BlockProductionTask { } impl BlockProductionTask { - pub fn new( - pool: TxPool, - miner: TransactionMiner, - block_producer: Arc>, - ) -> Self { + pub fn new(pool: TxPool, miner: TransactionMiner, block_producer: BlockProducer) -> Self { Self { block_producer, miner, pool, metrics: BlockProducerMetrics::default() } } } diff --git a/crates/katana/node/Cargo.toml b/crates/katana/node/Cargo.toml index c74b8e1380..e0973df04f 100644 --- a/crates/katana/node/Cargo.toml +++ b/crates/katana/node/Cargo.toml @@ -9,6 +9,7 @@ version.workspace = true katana-core.workspace = true katana-db.workspace = true katana-executor.workspace = true +katana-pipeline.workspace = true katana-pool.workspace = true katana-primitives.workspace = true katana-provider.workspace = true diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 55dab949f0..452a12307d 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] +use std::future::IntoFuture; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -18,14 +19,12 @@ use katana_core::env::BlockContextGenerator; #[allow(deprecated)] use katana_core::sequencer::SequencerConfig; use katana_core::service::block_producer::BlockProducer; -#[cfg(feature = "messaging")] -use katana_core::service::messaging::{MessagingService, MessagingTask}; -use katana_core::service::{BlockProductionTask, TransactionMiner}; use katana_executor::implementation::blockifier::BlockifierFactory; use katana_executor::{ExecutorFactory, SimulationFlag}; +use katana_pipeline::{stage, Pipeline}; use katana_pool::ordering::FiFo; use katana_pool::validation::stateful::TxValidator; -use katana_pool::{TransactionPool, TxPool}; +use katana_pool::TxPool; use katana_primitives::block::FinalityStatus; use katana_primitives::env::{CfgEnv, FeeTokenAddressses}; use katana_provider::providers::fork::ForkedProvider; @@ -57,7 +56,7 @@ pub struct Handle { pub rpc: RpcServer, pub task_manager: TaskManager, pub backend: Arc>, - pub block_producer: Arc>, + pub block_producer: BlockProducer, } impl Handle { @@ -185,11 +184,10 @@ pub async fn start( BlockProducer::instant(Arc::clone(&backend)) }; - // --- build transaction pool and miner + // --- build transaction pool let validator = block_producer.validator(); let pool = TxPool::new(validator.clone(), FiFo::new()); - let miner = TransactionMiner::new(pool.add_listener()); // --- build metrics service @@ -214,21 +212,22 @@ pub async fn start( let task_manager = TaskManager::current(); - // --- build and spawn the messaging task + // --- build sequencing stage - #[cfg(feature = "messaging")] - if let Some(config) = sequencer_config.messaging.clone() { - let messaging = MessagingService::new(config, pool.clone(), Arc::clone(&backend)).await?; - let task = MessagingTask::new(messaging); - task_manager.build_task().critical().name("Messaging").spawn(task); - } + let sequencing = stage::Sequencing::new( + pool.clone(), + backend.clone(), + task_manager.clone(), + block_producer.clone(), + sequencer_config.messaging.clone(), + ); - let block_producer = Arc::new(block_producer); + // --- build and start the pipeline - // --- build and spawn the block production task + let mut pipeline = Pipeline::new(); + pipeline.add_stage(Box::new(sequencing)); - let task = BlockProductionTask::new(pool.clone(), miner, block_producer.clone()); - task_manager.build_task().critical().name("BlockProduction").spawn(task); + task_manager.spawn(pipeline.into_future()); // --- spawn rpc server @@ -240,7 +239,7 @@ pub async fn start( // Moved from `katana_rpc` crate pub async fn spawn( - node_components: (TxPool, Arc>, Arc>, TxValidator), + node_components: (TxPool, Arc>, BlockProducer, TxValidator), config: ServerConfig, ) -> Result { let (pool, backend, block_producer, validator) = node_components; diff --git a/crates/katana/pipeline/Cargo.toml b/crates/katana/pipeline/Cargo.toml new file mode 100644 index 0000000000..fc8894e28b --- /dev/null +++ b/crates/katana/pipeline/Cargo.toml @@ -0,0 +1,18 @@ +[package] +edition.workspace = true +license.workspace = true +name = "katana-pipeline" +repository.workspace = true +version.workspace = true + +[dependencies] +katana-core.workspace = true +katana-executor.workspace = true +katana-pool.workspace = true +katana-tasks.workspace = true + +anyhow.workspace = true +async-trait.workspace = true +futures.workspace = true +thiserror.workspace = true +tracing.workspace = true diff --git a/crates/katana/pipeline/src/lib.rs b/crates/katana/pipeline/src/lib.rs new file mode 100644 index 0000000000..287275d581 --- /dev/null +++ b/crates/katana/pipeline/src/lib.rs @@ -0,0 +1,77 @@ +#![cfg_attr(not(test), warn(unused_crate_dependencies))] + +pub mod stage; + +use core::future::IntoFuture; + +use futures::future::BoxFuture; +use stage::Stage; +use tracing::info; + +/// The result of a pipeline execution. +pub type PipelineResult = Result<(), Error>; + +/// The future type for [Pipeline]'s implementation of [IntoFuture]. +pub type PipelineFut = BoxFuture<'static, PipelineResult>; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Stage(#[from] stage::Error), +} + +/// Manages the execution of stages. +/// +/// The pipeline drives the execution of stages, running each stage to completion in the order they +/// were added. +/// +/// Inspired by [`reth`]'s staged sync pipeline. +/// +/// [`reth`]: /~https://github.com/paradigmxyz/reth/blob/c7aebff0b6bc19cd0b73e295497d3c5150d40ed8/crates/stages/api/src/pipeline/mod.rs#L66 +pub struct Pipeline { + stages: Vec>, +} + +impl Pipeline { + /// Create a new empty pipeline. + pub fn new() -> Self { + Self { stages: Vec::new() } + } + + /// Insert a new stage into the pipeline. + pub fn add_stage(&mut self, stage: Box) { + self.stages.push(stage); + } + + /// Start the pipeline. + pub async fn run(&mut self) -> PipelineResult { + for stage in &mut self.stages { + info!(id = %stage.id(), "Executing stage"); + stage.execute().await?; + } + Ok(()) + } +} + +impl IntoFuture for Pipeline { + type Output = PipelineResult; + type IntoFuture = PipelineFut; + + fn into_future(mut self) -> Self::IntoFuture { + Box::pin(async move { self.run().await }) + } +} + +impl core::default::Default for Pipeline { + fn default() -> Self { + Self::new() + } +} + +impl core::fmt::Debug for Pipeline { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Pipeline") + .field("stages", &self.stages.iter().map(|s| s.id()).collect::>()) + .finish() + } +} diff --git a/crates/katana/pipeline/src/stage/mod.rs b/crates/katana/pipeline/src/stage/mod.rs new file mode 100644 index 0000000000..6d50761ffc --- /dev/null +++ b/crates/katana/pipeline/src/stage/mod.rs @@ -0,0 +1,34 @@ +mod sequencing; + +pub use sequencing::Sequencing; + +/// The result type of a stage execution. See [Stage::execute]. +pub type StageResult = Result<(), Error>; + +#[derive(Debug, Clone, Copy)] +pub enum StageId { + Sequencing, +} + +impl core::fmt::Display for StageId { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + StageId::Sequencing => write!(f, "Sequencing"), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +#[async_trait::async_trait] +pub trait Stage: Send + Sync { + /// Returns the id which uniquely identifies the stage. + fn id(&self) -> StageId; + + /// Executes the stage. + async fn execute(&mut self) -> StageResult; +} diff --git a/crates/katana/pipeline/src/stage/sequencing.rs b/crates/katana/pipeline/src/stage/sequencing.rs new file mode 100644 index 0000000000..958e21792a --- /dev/null +++ b/crates/katana/pipeline/src/stage/sequencing.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; + +use anyhow::Result; +use futures::future; +use katana_core::backend::Backend; +use katana_core::service::block_producer::BlockProducer; +use katana_core::service::messaging::{MessagingConfig, MessagingService, MessagingTask}; +use katana_core::service::{BlockProductionTask, TransactionMiner}; +use katana_executor::ExecutorFactory; +use katana_pool::{TransactionPool, TxPool}; +use katana_tasks::TaskManager; + +use super::{StageId, StageResult}; +use crate::Stage; + +/// The sequencing stage is responsible for advancing the chain state. +#[allow(missing_debug_implementations)] +pub struct Sequencing { + pool: TxPool, + backend: Arc>, + task_manager: TaskManager, + block_producer: BlockProducer, + messaging_config: Option, +} + +impl Sequencing { + pub fn new( + pool: TxPool, + backend: Arc>, + task_manager: TaskManager, + block_producer: BlockProducer, + messaging_config: Option, + ) -> Self { + Self { pool, backend, task_manager, block_producer, messaging_config } + } + + async fn run_messaging(&self) -> Result<()> { + if let Some(config) = &self.messaging_config { + let config = config.clone(); + let pool = self.pool.clone(); + let backend = self.backend.clone(); + + let service = MessagingService::new(config, pool, backend).await?; + let task = MessagingTask::new(service); + self.task_manager.build_task().critical().name("Messaging").spawn(task); + } else { + // this will create a future that will never resolve + self.task_manager + .build_task() + .critical() + .name("Messaging") + .spawn(future::pending::<()>()); + } + + Ok(()) + } + + async fn run_block_production(&self) { + let pool = self.pool.clone(); + let miner = TransactionMiner::new(pool.add_listener()); + let block_producer = self.block_producer.clone(); + + let service = BlockProductionTask::new(pool, miner, block_producer); + self.task_manager.build_task().critical().name("Block production").spawn(service); + } +} + +#[async_trait::async_trait] +impl Stage for Sequencing { + fn id(&self) -> StageId { + StageId::Sequencing + } + + async fn execute(&mut self) -> StageResult { + let _ = self.run_messaging().await?; + let _ = self.run_block_production().await; + future::pending::().await + } +} diff --git a/crates/katana/pool/src/validation/stateful.rs b/crates/katana/pool/src/validation/stateful.rs index 88052435da..911ac242e2 100644 --- a/crates/katana/pool/src/validation/stateful.rs +++ b/crates/katana/pool/src/validation/stateful.rs @@ -24,13 +24,13 @@ use parking_lot::Mutex; use super::{Error, InvalidTransactionError, ValidationOutcome, ValidationResult, Validator}; use crate::tx::PoolTransaction; -#[allow(missing_debug_implementations)] -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct TxValidator { inner: Arc>, permit: Arc>, } +#[derive(Debug)] struct Inner { // execution context cfg_env: CfgEnv, diff --git a/crates/katana/rpc/rpc/src/dev.rs b/crates/katana/rpc/rpc/src/dev.rs index f2d039bcdf..2790762801 100644 --- a/crates/katana/rpc/rpc/src/dev.rs +++ b/crates/katana/rpc/rpc/src/dev.rs @@ -12,11 +12,11 @@ use katana_rpc_types::error::dev::DevApiError; #[allow(missing_debug_implementations)] pub struct DevApi { backend: Arc>, - block_producer: Arc>, + block_producer: BlockProducer, } impl DevApi { - pub fn new(backend: Arc>, block_producer: Arc>) -> Self { + pub fn new(backend: Arc>, block_producer: BlockProducer) -> Self { Self { backend, block_producer } } diff --git a/crates/katana/rpc/rpc/src/saya.rs b/crates/katana/rpc/rpc/src/saya.rs index c93d4188ae..c779fa2dcc 100644 --- a/crates/katana/rpc/rpc/src/saya.rs +++ b/crates/katana/rpc/rpc/src/saya.rs @@ -16,20 +16,17 @@ use katana_tasks::TokioTaskSpawner; #[allow(missing_debug_implementations)] pub struct SayaApi { backend: Arc>, - block_producer: Arc>, + block_producer: BlockProducer, } impl Clone for SayaApi { fn clone(&self) -> Self { - Self { - backend: Arc::clone(&self.backend), - block_producer: Arc::clone(&self.block_producer), - } + Self { backend: Arc::clone(&self.backend), block_producer: self.block_producer.clone() } } } impl SayaApi { - pub fn new(backend: Arc>, block_producer: Arc>) -> Self { + pub fn new(backend: Arc>, block_producer: BlockProducer) -> Self { Self { backend, block_producer } } diff --git a/crates/katana/rpc/rpc/src/starknet/mod.rs b/crates/katana/rpc/rpc/src/starknet/mod.rs index a5ec301e89..6f5b39063b 100644 --- a/crates/katana/rpc/rpc/src/starknet/mod.rs +++ b/crates/katana/rpc/rpc/src/starknet/mod.rs @@ -53,7 +53,7 @@ struct Inner { validator: TxValidator, pool: TxPool, backend: Arc>, - block_producer: Arc>, + block_producer: BlockProducer, blocking_task_pool: BlockingTaskPool, } @@ -61,7 +61,7 @@ impl StarknetApi { pub fn new( backend: Arc>, pool: TxPool, - block_producer: Arc>, + block_producer: BlockProducer, validator: TxValidator, ) -> Self { let blocking_task_pool = diff --git a/crates/katana/rpc/rpc/src/torii.rs b/crates/katana/rpc/rpc/src/torii.rs index e8db499384..99b4c32f5a 100644 --- a/crates/katana/rpc/rpc/src/torii.rs +++ b/crates/katana/rpc/rpc/src/torii.rs @@ -22,7 +22,7 @@ const MAX_PAGE_SIZE: usize = 100; pub struct ToriiApi { backend: Arc>, pool: TxPool, - block_producer: Arc>, + block_producer: BlockProducer, } impl Clone for ToriiApi { @@ -30,17 +30,13 @@ impl Clone for ToriiApi { Self { pool: self.pool.clone(), backend: Arc::clone(&self.backend), - block_producer: Arc::clone(&self.block_producer), + block_producer: self.block_producer.clone(), } } } impl ToriiApi { - pub fn new( - backend: Arc>, - pool: TxPool, - block_producer: Arc>, - ) -> Self { + pub fn new(backend: Arc>, pool: TxPool, block_producer: BlockProducer) -> Self { Self { pool, backend, block_producer } }