Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix(katana): make sure validator state is synced with block producer #2353

Merged
merged 10 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

191 changes: 119 additions & 72 deletions crates/katana/core/src/service/block_producer.rs

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub struct NodeService<EF: ExecutorFactory> {
pub(crate) messaging: Option<MessagingService<EF>>,
/// Metrics for recording the service operations
metrics: ServiceMetrics,
// validator: StatefulValidator
}

impl<EF: ExecutorFactory> NodeService<EF> {
Expand Down Expand Up @@ -100,8 +99,6 @@ impl<EF: ExecutorFactory> Future for NodeService<EF> {
let steps_used = outcome.stats.cairo_steps_used;
metrics.l1_gas_processed_total.increment(gas_used as u64);
metrics.cairo_steps_processed_total.increment(steps_used as u64);

pin.block_producer.update_validator().expect("failed to update validator");
}

Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ starknet = { workspace = true, optional = true }
thiserror.workspace = true
tracing.workspace = true

blockifier = { git = "/~https://github.com/dojoengine/blockifier", branch = "cairo-2.7-new", features = [ "testing" ], optional = true }
blockifier = { git = "/~https://github.com/dojoengine/blockifier", branch = "cairo-2.7-newer", features = [ "testing" ], optional = true }
katana-cairo = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
19 changes: 12 additions & 7 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use katana_core::service::{NodeService, TransactionMiner};
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pool::ordering::FiFo;
use katana_pool::validation::stateful::TxValidator;
use katana_pool::{TransactionPool, TxPool};
use katana_primitives::block::FinalityStatus;
use katana_primitives::env::{CfgEnv, FeeTokenAddressses};
Expand Down Expand Up @@ -167,8 +168,8 @@ pub async fn start(

// --- build transaction pool and miner

let validator = block_producer.validator().clone();
let pool = TxPool::new(validator, FiFo::new());
let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());
let miner = TransactionMiner::new(pool.add_listener());

// --- build metrics service
Expand Down Expand Up @@ -212,18 +213,18 @@ pub async fn start(

// --- spawn rpc server

let node_components = (pool, backend.clone(), block_producer);
let node_components = (pool, backend.clone(), block_producer, validator);
let rpc_handle = spawn(node_components, server_config).await?;

Ok((rpc_handle, backend))
}

// Moved from `katana_rpc` crate
pub async fn spawn<EF: ExecutorFactory>(
node_components: (TxPool, Arc<Backend<EF>>, Arc<BlockProducer<EF>>),
node_components: (TxPool, Arc<Backend<EF>>, Arc<BlockProducer<EF>>, TxValidator),
config: ServerConfig,
) -> Result<NodeHandle> {
let (pool, backend, block_producer) = node_components;
let (pool, backend, block_producer, validator) = node_components;

let mut methods = RpcModule::new(());
methods.register_method("health", |_, _| Ok(serde_json::json!({ "health": true })))?;
Expand All @@ -232,8 +233,12 @@ pub async fn spawn<EF: ExecutorFactory>(
match api {
ApiKind::Starknet => {
// TODO: merge these into a single logic.
let server =
StarknetApi::new(backend.clone(), pool.clone(), block_producer.clone());
let server = StarknetApi::new(
backend.clone(),
pool.clone(),
block_producer.clone(),
validator.clone(),
);
methods.merge(StarknetApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetWriteApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetTraceApiServer::into_rpc(server))?;
Expand Down
5 changes: 3 additions & 2 deletions crates/katana/pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ where
Ok(hash)
}

ValidationOutcome::Invalid { tx, error } => {
warn!(hash = format!("{:#x}", tx.hash()), "Invalid transaction.");
ValidationOutcome::Invalid { error, .. } => {
warn!(hash = format!("{hash:#x}"), "Invalid transaction.");
Err(PoolError::InvalidTransaction(Box::new(error)))
}

// return as error for now but ideally we should kept the tx in a separate
// queue and revalidate it when the parent tx is added to the pool
ValidationOutcome::Dependent { tx, tx_nonce, current_nonce } => {
info!(hash = format!("{hash:#x}"), "Dependent transaction.");
let err = InvalidTransactionError::InvalidNonce {
address: tx.sender(),
current_nonce,
Expand Down
56 changes: 35 additions & 21 deletions crates/katana/pool/src/validation/stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ use crate::tx::PoolTransaction;
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct TxValidator {
inner: Arc<Inner>,
}

struct Inner {
cfg_env: CfgEnv,
execution_flags: SimulationFlag,
validator: Arc<Mutex<StatefulValidatorAdapter>>,
validator: Mutex<StatefulValidatorAdapter>,
permit: Arc<Mutex<()>>,
}

impl TxValidator {
Expand All @@ -35,16 +40,28 @@ impl TxValidator {
execution_flags: SimulationFlag,
cfg_env: CfgEnv,
block_env: &BlockEnv,
permit: Arc<Mutex<()>>,
) -> Self {
let inner = StatefulValidatorAdapter::new(state, block_env, &cfg_env);
Self { cfg_env, execution_flags, validator: Arc::new(Mutex::new(inner)) }
let validator = StatefulValidatorAdapter::new(state, block_env, &cfg_env);
Self {
inner: Arc::new(Inner {
permit,
cfg_env,
execution_flags,
validator: Mutex::new(validator),
}),
}
}

/// Reset the state of the validator with the given params. This method is used to update the
/// validator's state with a new state and block env after a block is mined.
pub fn update(&self, state: Box<dyn StateProvider>, block_env: &BlockEnv) {
let updated = StatefulValidatorAdapter::new(state, block_env, &self.cfg_env);
*self.validator.lock() = updated;
pub fn update(&self, new_state: Box<dyn StateProvider>, block_env: &BlockEnv) {
let mut validator = self.inner.validator.lock();

let mut state = validator.inner.tx_executor.block_state.take().unwrap();
state.state = StateProviderDb::new(new_state);

*validator = StatefulValidatorAdapter::new_inner(state, block_env, &self.inner.cfg_env);
}

// NOTE:
Expand All @@ -54,7 +71,7 @@ impl TxValidator {
// safety is not guaranteed by TransactionExecutor itself.
pub fn get_nonce(&self, address: ContractAddress) -> Nonce {
let address = to_blk_address(address);
let nonce = self.validator.lock().inner.get_nonce(address).expect("state err");
let nonce = self.inner.validator.lock().inner.get_nonce(address).expect("state err");
nonce.0
}
}
Expand All @@ -65,23 +82,19 @@ struct StatefulValidatorAdapter {
}

impl StatefulValidatorAdapter {
fn new(
state: Box<dyn StateProvider>,
block_env: &BlockEnv,
cfg_env: &CfgEnv,
) -> StatefulValidatorAdapter {
let inner = Self::new_inner(state, block_env, cfg_env);
Self { inner }
fn new(state: Box<dyn StateProvider>, block_env: &BlockEnv, cfg_env: &CfgEnv) -> Self {
let state = CachedState::new(StateProviderDb::new(state));
Self::new_inner(state, block_env, cfg_env)
}

fn new_inner(
state: Box<dyn StateProvider>,
state: CachedState<StateProviderDb<'static>>,
block_env: &BlockEnv,
cfg_env: &CfgEnv,
) -> StatefulValidator<StateProviderDb<'static>> {
let state = CachedState::new(StateProviderDb::new(state));
) -> Self {
let block_context = block_context_from_envs(block_env, cfg_env);
StatefulValidator::create(state, block_context)
let inner = StatefulValidator::create(state, block_context, Default::default());
Self { inner }
}

/// Used only in the [`Validator::validate`] trait
Expand Down Expand Up @@ -125,7 +138,8 @@ impl Validator for TxValidator {
type Transaction = ExecutableTxWithHash;

fn validate(&self, tx: Self::Transaction) -> ValidationResult<Self::Transaction> {
let this = &mut *self.validator.lock();
let _permit = self.inner.permit.lock();
let this = &mut *self.inner.validator.lock();

// Check if validation of an invoke transaction should be skipped due to deploy_account not
// being proccessed yet. This feature is used to improve UX for users sending
Expand All @@ -145,8 +159,8 @@ impl Validator for TxValidator {
StatefulValidatorAdapter::validate(
this,
tx,
self.execution_flags.skip_validate || skip_validate,
self.execution_flags.skip_fee_transfer,
self.inner.execution_flags.skip_validate || skip_validate,
self.inner.execution_flags.skip_fee_transfer,
)
}
}
Expand Down
8 changes: 5 additions & 3 deletions crates/katana/rpc/rpc/src/starknet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use anyhow::Result;
use katana_core::backend::Backend;
use katana_core::service::block_producer::{BlockProducer, BlockProducerMode, PendingExecutor};
use katana_executor::{ExecutionResult, ExecutorFactory};
use katana_pool::validation::stateful::TxValidator;
use katana_pool::TxPool;
use katana_primitives::block::{
BlockHash, BlockHashOrNumber, BlockIdOrTag, BlockNumber, BlockTag, FinalityStatus,
Expand Down Expand Up @@ -53,6 +54,7 @@ impl<EF: ExecutorFactory> Clone for StarknetApi<EF> {
}

struct Inner<EF: ExecutorFactory> {
validator: TxValidator,
pool: TxPool,
backend: Arc<Backend<EF>>,
block_producer: Arc<BlockProducer<EF>>,
Expand All @@ -64,11 +66,12 @@ impl<EF: ExecutorFactory> StarknetApi<EF> {
backend: Arc<Backend<EF>>,
pool: TxPool,
block_producer: Arc<BlockProducer<EF>>,
validator: TxValidator,
) -> Self {
let blocking_task_pool =
BlockingTaskPool::new().expect("failed to create blocking task pool");

let inner = Inner { pool, backend, block_producer, blocking_task_pool };
let inner = Inner { pool, backend, block_producer, blocking_task_pool, validator };

Self { inner: Arc::new(inner) }
}
Expand Down Expand Up @@ -296,8 +299,7 @@ impl<EF: ExecutorFactory> StarknetApi<EF> {
// TODO: this is a temporary solution, we should have a better way to handle this.
// perhaps a pending/pool state provider that implements all the state provider traits.
if let BlockIdOrTag::Tag(BlockTag::Pending) = block_id {
let validator = this.inner.block_producer.validator();
let pool_nonce = validator.get_nonce(contract_address);
let pool_nonce = this.inner.validator.get_nonce(contract_address);
return Ok(pool_nonce);
}

Expand Down
3 changes: 2 additions & 1 deletion crates/katana/rpc/rpc/src/starknet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ impl<EF: ExecutorFactory> StarknetApi<EF> {

let tx = tx.into_tx_with_chain_id(this.inner.backend.chain_id);
let tx = ExecutableTxWithHash::new(ExecutableTx::Invoke(tx));
let hash = this.inner.pool.add_transaction(tx)?;
let hash =
this.inner.pool.add_transaction(tx).inspect_err(|e| println!("Error: {:?}", e))?;

Ok(hash.into())
})
Expand Down
42 changes: 33 additions & 9 deletions crates/katana/rpc/rpc/tests/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use starknet::core::utils::{get_contract_address, get_selector_from_name};
use starknet::macros::felt;
use starknet::providers::{Provider, ProviderError};
use starknet::signers::{LocalWallet, SigningKey};
use tokio::sync::Mutex;

mod common;

Expand Down Expand Up @@ -230,8 +231,8 @@ async fn estimate_fee() -> Result<()> {
}

#[rstest::rstest]
#[tokio::test]
async fn rapid_transactions_submissions(
#[tokio::test(flavor = "multi_thread")]
async fn concurrent_transactions_submissions(
#[values(None, Some(1000))] block_time: Option<u64>,
) -> Result<()> {
// setup test sequencer with the given configuration
Expand All @@ -240,33 +241,56 @@ async fn rapid_transactions_submissions(

let sequencer = TestSequencer::start(sequencer_config, starknet_config).await;
let provider = sequencer.provider();
let account = sequencer.account();
let account = Arc::new(sequencer.account());

// setup test contract to interact with.
abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json");
let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), &account);

// function call params
let recipient = Felt::ONE;
let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO };

const N: usize = 10;
let mut txs = IndexSet::with_capacity(N);
let initial_nonce =
provider.get_nonce(BlockId::Tag(BlockTag::Pending), sequencer.account().address()).await?;

const N: usize = 100;
let nonce = Arc::new(Mutex::new(initial_nonce));
let txs = Arc::new(Mutex::new(IndexSet::with_capacity(N)));

Comment on lines +244 to +259
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Use tokio::sync::RwLock for better read performance.

Consider using tokio::sync::RwLock instead of Mutex for nonce and txs to improve read performance, as multiple reads can occur concurrently without blocking each other.

Apply this diff to use RwLock:

-use tokio::sync::Mutex;
+use tokio::sync::{Mutex, RwLock};

-let nonce = Arc::new(Mutex::new(initial_nonce));
-let txs = Arc::new(Mutex::new(IndexSet::with_capacity(N)));
+let nonce = Arc::new(RwLock::new(initial_nonce));
+let txs = Arc::new(RwLock::new(IndexSet::with_capacity(N)));
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let account = Arc::new(sequencer.account());
// setup test contract to interact with.
abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json");
let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), &account);
// function call params
let recipient = Felt::ONE;
let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO };
const N: usize = 10;
let mut txs = IndexSet::with_capacity(N);
let initial_nonce =
provider.get_nonce(BlockId::Tag(BlockTag::Pending), sequencer.account().address()).await?;
const N: usize = 100;
let nonce = Arc::new(Mutex::new(initial_nonce));
let txs = Arc::new(Mutex::new(IndexSet::with_capacity(N)));
let account = Arc::new(sequencer.account());
// setup test contract to interact with.
abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json");
// function call params
let recipient = Felt::ONE;
let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO };
let initial_nonce =
provider.get_nonce(BlockId::Tag(BlockTag::Pending), sequencer.account().address()).await?;
const N: usize = 100;
let nonce = Arc::new(RwLock::new(initial_nonce));
let txs = Arc::new(RwLock::new(IndexSet::with_capacity(N)));

let mut handles = Vec::with_capacity(N);

for _ in 0..N {
let res = contract.transfer(&recipient, &amount).send().await?;
txs.insert(res.transaction_hash);
let txs = txs.clone();
let nonce = nonce.clone();
let amount = amount.clone();
let account = account.clone();

let handle = tokio::spawn(async move {
let mut nonce = nonce.lock().await;
let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), account);
let res = contract.transfer(&recipient, &amount).nonce(*nonce).send().await.unwrap();
txs.lock().await.insert(res.transaction_hash);
*nonce += Felt::ONE;
});
glihm marked this conversation as resolved.
Show resolved Hide resolved

handles.push(handle);
}

// wait for all txs to be submitted
for handle in handles {
handle.await?;
}

// Wait only for the last transaction to be accepted
let txs = txs.lock().await;
let last_tx = txs.last().unwrap();
dojo_utils::TransactionWaiter::new(*last_tx, &provider).await?;

// we should've submitted ITERATION transactions
assert_eq!(txs.len(), N);

// check the status of each txs
for hash in txs {
for hash in txs.iter() {
let receipt = provider.get_transaction_receipt(hash).await?;
assert_eq!(receipt.receipt.execution_result(), &ExecutionResult::Succeeded);
assert_eq!(receipt.receipt.finality_status(), &TransactionFinalityStatus::AcceptedOnL2);
Expand Down
Loading