-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(katana): stage sync pipeline #2502
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
[package] | ||
edition.workspace = true | ||
license.workspace = true | ||
name = "katana-pipeline" | ||
repository.workspace = true | ||
version.workspace = true | ||
|
||
[dependencies] | ||
katana-core.workspace = true | ||
katana-executor.workspace = true | ||
katana-pool.workspace = true | ||
katana-tasks.workspace = true | ||
|
||
anyhow.workspace = true | ||
async-trait.workspace = true | ||
futures.workspace = true | ||
thiserror.workspace = true | ||
tracing.workspace = true |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
#![cfg_attr(not(test), warn(unused_crate_dependencies))] | ||
|
||
pub mod stage; | ||
|
||
use core::future::IntoFuture; | ||
|
||
use futures::future::BoxFuture; | ||
use stage::Stage; | ||
use tracing::info; | ||
|
||
/// The result of a pipeline execution. | ||
pub type PipelineResult = Result<(), Error>; | ||
|
||
/// The future type for [Pipeline]'s implementation of [IntoFuture]. | ||
pub type PipelineFut = BoxFuture<'static, PipelineResult>; | ||
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum Error { | ||
#[error(transparent)] | ||
Stage(#[from] stage::Error), | ||
} | ||
|
||
/// Manages the execution of stages. | ||
/// | ||
/// The pipeline drives the execution of stages, running each stage to completion in the order they | ||
/// were added. | ||
/// | ||
/// Inspired by [`reth`]'s staged sync pipeline. | ||
/// | ||
/// [`reth`]: /~https://github.com/paradigmxyz/reth/blob/c7aebff0b6bc19cd0b73e295497d3c5150d40ed8/crates/stages/api/src/pipeline/mod.rs#L66 | ||
pub struct Pipeline { | ||
stages: Vec<Box<dyn Stage>>, | ||
} | ||
|
||
impl Pipeline { | ||
/// Create a new empty pipeline. | ||
pub fn new() -> Self { | ||
Self { stages: Vec::new() } | ||
} | ||
|
||
/// Insert a new stage into the pipeline. | ||
pub fn add_stage(&mut self, stage: Box<dyn Stage>) { | ||
self.stages.push(stage); | ||
} | ||
|
||
/// Start the pipeline. | ||
pub async fn run(&mut self) -> PipelineResult { | ||
for stage in &mut self.stages { | ||
info!(id = %stage.id(), "Executing stage"); | ||
stage.execute().await?; | ||
} | ||
Ok(()) | ||
} | ||
Comment on lines
+47
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo, sensei! Ensure the The Can I assist in writing tests for the 🧰 Tools🪛 GitHub Check: codecov/patch
|
||
} | ||
|
||
impl IntoFuture for Pipeline { | ||
type Output = PipelineResult; | ||
type IntoFuture = PipelineFut; | ||
|
||
fn into_future(mut self) -> Self::IntoFuture { | ||
Box::pin(async move { self.run().await }) | ||
} | ||
} | ||
|
||
impl core::default::Default for Pipeline { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl core::fmt::Debug for Pipeline { | ||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | ||
f.debug_struct("Pipeline") | ||
.field("stages", &self.stages.iter().map(|s| s.id()).collect::<Vec<_>>()) | ||
.finish() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
mod sequencing; | ||
|
||
pub use sequencing::Sequencing; | ||
|
||
/// The result type of a stage execution. See [Stage::execute]. | ||
pub type StageResult = Result<(), Error>; | ||
|
||
#[derive(Debug, Clone, Copy)] | ||
pub enum StageId { | ||
Sequencing, | ||
} | ||
|
||
impl core::fmt::Display for StageId { | ||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | ||
match self { | ||
StageId::Sequencing => write!(f, "Sequencing"), | ||
} | ||
} | ||
} | ||
Comment on lines
+13
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo sensei! Consider adding tests for The Would you like assistance in creating these unit tests? 🧰 Tools🪛 GitHub Check: codecov/patch
|
||
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum Error { | ||
#[error(transparent)] | ||
Other(#[from] anyhow::Error), | ||
} | ||
Comment on lines
+21
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo sensei! Adding tests for the The Would you like help in writing tests for the 🧰 Tools🪛 GitHub Check: codecov/patch
|
||
|
||
#[async_trait::async_trait] | ||
pub trait Stage: Send + Sync { | ||
/// Returns the id which uniquely identifies the stage. | ||
fn id(&self) -> StageId; | ||
|
||
/// Executes the stage. | ||
async fn execute(&mut self) -> StageResult; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Consider adding tests for the
Error
enum to enhance error handlingIncluding unit tests for the
Error
enum will ensure that error propagation from stages is functioning correctly. This will improve the robustness of the pipeline's error handling mechanism.Would you like assistance in creating unit tests for the
Error
enum or opening a GitHub issue to track this task?🧰 Tools
🪛 GitHub Check: codecov/patch