Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(katana-tasks): separate tasks spawning responsibility from TaskManager #2514

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions bin/katana/src/cli/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ impl NodeArgs {
let node = katana_node::build(config).await.context("failed to build node")?;

if !self.silent {
#[allow(deprecated)]
let genesis = &node.backend.chain_spec.genesis;
print_intro(&self, genesis);
}
Expand All @@ -256,9 +255,10 @@ impl NodeArgs {
}

fn init_logging(&self) -> Result<()> {
const DEFAULT_LOG_FILTER: &str = "info,executor=trace,forking::backend=trace,server=debug,\
katana_core=trace,blockifier=off,jsonrpsee_server=off,\
hyper=off,messaging=debug,node=error";
const DEFAULT_LOG_FILTER: &str = "tasks=debug,info,executor=trace,forking::backend=trace,\
server=debug,katana_core=trace,blockifier=off,\
jsonrpsee_server=off,hyper=off,messaging=debug,\
node=error";

LogTracer::init()?;

Expand Down
7 changes: 4 additions & 3 deletions crates/katana/node-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,15 +622,16 @@ mod tests {
async fn can_launch_katana() {
// this will launch katana with random ports
let katana = Katana::new().spawn();

// assert some default values
assert_eq!(katana.accounts().len(), 10);
assert_eq!(katana.chain_id(), short_string!("KATANA"));
// assert that all accounts have private key
assert!(katana.accounts().iter().all(|a| a.private_key.is_some()));

let provider = JsonRpcClient::new(HttpTransport::new(katana.endpoint_url()));
let result = provider.chain_id().await;
assert!(result.is_ok());
// try to connect as a provider
let provider = JsonRpcClient::new(HttpTransport::new(dbg!(katana.endpoint_url())));
assert!(provider.chain_id().await.is_ok())
Comment on lines +632 to +634
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Consider removing the dbg! macro for cleaner test output.

The dbg! macro is useful for debugging but can clutter test outputs with unnecessary information. It's best to remove it before merging to keep the test output clean.

Apply this diff to remove the dbg! macro:

-let provider = JsonRpcClient::new(HttpTransport::new(dbg!(katana.endpoint_url())));
+let provider = JsonRpcClient::new(HttpTransport::new(katana.endpoint_url()));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// try to connect as a provider
let provider = JsonRpcClient::new(HttpTransport::new(dbg!(katana.endpoint_url())));
assert!(provider.chain_id().await.is_ok())
// try to connect as a provider
let provider = JsonRpcClient::new(HttpTransport::new(katana.endpoint_url()));
assert!(provider.chain_id().await.is_ok())

}

#[test]
Expand Down
10 changes: 7 additions & 3 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,10 @@ impl Node {

// --- build sequencing stage

#[allow(deprecated)]
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
self.task_manager.clone(),
self.task_manager.task_spawner(),
block_producer.clone(),
self.messaging_config.clone(),
);
Expand All @@ -143,7 +142,12 @@ impl Node {
let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

self.task_manager.spawn(pipeline.into_future());
self.task_manager
.task_spawner()
.build_task()
.critical()
.name("Pipeline")
.spawn(pipeline.into_future());

let node_components = (pool, backend, block_producer, validator);
let rpc = spawn(node_components, self.rpc_config.clone()).await?;
Expand Down
1 change: 1 addition & 0 deletions crates/katana/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ anyhow.workspace = true
async-trait.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
11 changes: 8 additions & 3 deletions crates/katana/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use core::future::IntoFuture;

use futures::future::BoxFuture;
use stage::Stage;
use tracing::info;
use tracing::{error, info};

/// The result of a pipeline execution.
pub type PipelineResult = Result<(), Error>;
Expand Down Expand Up @@ -46,9 +46,10 @@ impl Pipeline {
/// Start the pipeline.
pub async fn run(&mut self) -> PipelineResult {
for stage in &mut self.stages {
info!(id = %stage.id(), "Executing stage");
info!(target: "pipeline", id = %stage.id(), "Executing stage.");
stage.execute().await?;
}
info!(target: "pipeline", "Pipeline finished.");
Ok(())
}
}
Expand All @@ -58,7 +59,11 @@ impl IntoFuture for Pipeline {
type IntoFuture = PipelineFut;

fn into_future(mut self) -> Self::IntoFuture {
Box::pin(async move { self.run().await })
Box::pin(async move {
self.run().await.inspect_err(|error| {
error!(target: "pipeline", %error, "Pipeline failed.");
})
})
}
}

Expand Down
51 changes: 32 additions & 19 deletions crates/katana/pipeline/src/stage/sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use katana_core::service::messaging::{MessagingConfig, MessagingService, Messagi
use katana_core::service::{BlockProductionTask, TransactionMiner};
use katana_executor::ExecutorFactory;
use katana_pool::{TransactionPool, TxPool};
use katana_tasks::TaskManager;
use katana_tasks::{TaskHandle, TaskSpawner};
use tracing::error;

use super::{StageId, StageResult};
use crate::Stage;
Expand All @@ -18,7 +19,7 @@ use crate::Stage;
pub struct Sequencing<EF: ExecutorFactory> {
pool: TxPool,
backend: Arc<Backend<EF>>,
task_manager: TaskManager,
task_spawner: TaskSpawner,
block_producer: BlockProducer<EF>,
messaging_config: Option<MessagingConfig>,
}
Expand All @@ -27,41 +28,37 @@ impl<EF: ExecutorFactory> Sequencing<EF> {
pub fn new(
pool: TxPool,
backend: Arc<Backend<EF>>,
task_manager: TaskManager,
task_spawner: TaskSpawner,
block_producer: BlockProducer<EF>,
messaging_config: Option<MessagingConfig>,
) -> Self {
Self { pool, backend, task_manager, block_producer, messaging_config }
Self { pool, backend, task_spawner, block_producer, messaging_config }
}

async fn run_messaging(&self) -> Result<()> {
async fn run_messaging(&self) -> Result<TaskHandle<()>> {
if let Some(config) = &self.messaging_config {
let config = config.clone();
let pool = self.pool.clone();
let backend = self.backend.clone();

let service = MessagingService::new(config, pool, backend).await?;
let task = MessagingTask::new(service);
self.task_manager.build_task().critical().name("Messaging").spawn(task);

let handle = self.task_spawner.build_task().name("Messaging").spawn(task);
Ok(handle)
} else {
// this will create a future that will never resolve
self.task_manager
.build_task()
.critical()
.name("Messaging")
.spawn(future::pending::<()>());
let handle = self.task_spawner.build_task().spawn(future::pending::<()>());
Ok(handle)
}

Ok(())
}

async fn run_block_production(&self) {
fn run_block_production(&self) -> TaskHandle<()> {
let pool = self.pool.clone();
let miner = TransactionMiner::new(pool.add_listener());
let block_producer = self.block_producer.clone();

let service = BlockProductionTask::new(pool, miner, block_producer);
self.task_manager.build_task().critical().name("Block production").spawn(service);
self.task_spawner.build_task().name("Block production").spawn(service)
}
}

Expand All @@ -71,9 +68,25 @@ impl<EF: ExecutorFactory> Stage for Sequencing<EF> {
StageId::Sequencing
}

#[tracing::instrument(skip(self), name = "Stage", fields(id = %self.id()))]
async fn execute(&mut self) -> StageResult {
let _ = self.run_messaging().await?;
let _ = self.run_block_production().await;
future::pending::<StageResult>().await
// Build the messaging and block production tasks.
let messaging = self.run_messaging().await?;
let block_production = self.run_block_production();

// Neither of these tasks should complete as they are meant to be run forever,
// but if either of them do complete, the sequencing stage should return.
//
// Select on the tasks completion to prevent the task from failing silently (if any).
tokio::select! {
res = messaging => {
error!(target: "pipeline", reason = ?res, "Messaging task finished unexpectedly.");
},
res = block_production => {
error!(target: "pipeline", reason = ?res, "Block production task finished unexpectedly.");
}
}

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/katana/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio::task::JoinHandle;
#[error("Failed to initialize task spawner: {0}")]
pub struct TaskSpawnerInitError(tokio::runtime::TryCurrentError);

// TODO: replace this with TaskSpawner in manager.rs
/// A task spawner for spawning tasks on a tokio runtime. This is simple wrapper around a tokio's
/// runtime [Handle] to easily spawn tasks on the runtime.
///
Expand Down
Loading
Loading