From b5fdaf8d4eb5eed17a641dd9451c8432a444e87d Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 19 Sep 2024 11:14:43 -0400 Subject: [PATCH 1/3] feat(torii-core): bitflags for indexing --- Cargo.lock | 2 + bin/torii/src/main.rs | 19 ++++- crates/torii/core/Cargo.toml | 2 + crates/torii/core/src/engine.rs | 119 +++++++++++++++++++------------- 4 files changed, 92 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb7394efc4..a2ee0b410f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14727,9 +14727,11 @@ dependencies = [ "anyhow", "async-trait", "base64 0.21.7", + "bitflags 2.6.0", "cainome 0.2.3 (git+/~https://github.com/cartridge-gg/cainome?rev=0d29bb0)", "camino", "chrono", + "clap", "crypto-bigint", "dojo-test-utils", "dojo-types", diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index b1447970de..f4e985efec 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -30,7 +30,7 @@ use starknet::providers::JsonRpcClient; use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; -use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors}; use torii_core::processors::event_message::EventMessageProcessor; use torii_core::processors::generate_event_processors_map; use torii_core::processors::metadata_update::MetadataUpdateProcessor; @@ -132,6 +132,14 @@ struct Args { /// Max concurrent tasks #[arg(long, default_value = "100")] max_concurrent_tasks: usize, + + /// Whether or not to index world transactions + #[arg(long, action = ArgAction::Set, default_value_t = true)] + index_transactions: bool, + + /// Whether or not to index raw events + #[arg(long, action = ArgAction::Set, default_value_t = true)] + index_raw_events: bool, } #[tokio::main] @@ -195,6 +203,14 @@ async fn main() -> anyhow::Result<()> { let (block_tx, block_rx) = tokio::sync::mpsc::channel(100); + let mut flags = IndexingFlags::empty(); + if args.index_transactions { + flags.insert(IndexingFlags::TRANSACTIONS); + } + if args.index_raw_events { + flags.insert(IndexingFlags::RAW_EVENTS); + } + let mut engine = Engine::new( world, db.clone(), @@ -206,6 +222,7 @@ async fn main() -> anyhow::Result<()> { events_chunk_size: args.events_chunk_size, index_pending: args.index_pending, polling_interval: Duration::from_millis(args.polling_interval), + flags, }, shutdown_tx.clone(), Some(block_tx), diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index a22ccfcc9c..d21c4c06b0 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -39,6 +39,8 @@ tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } tokio-stream = "0.1.11" tokio-util = "0.7.7" tracing.workspace = true +clap.workspace = true +bitflags = "2.6.0" [dev-dependencies] camino.workspace = true diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 2330091a4d..f24180ad44 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -5,12 +5,13 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use bitflags::bitflags; use dojo_world::contracts::world::WorldContractReader; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, - MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt, - TransactionReceiptWithBlockInfo, TransactionWithReceipt, + MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, Transaction, + TransactionReceipt, TransactionReceiptWithBlockInfo, TransactionWithReceipt, }; use starknet::providers::Provider; use tokio::sync::broadcast::Sender; @@ -46,6 +47,14 @@ impl Default for Processo pub(crate) const LOG_TARGET: &str = "torii_core::engine"; pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000; +bitflags! { + #[derive(Debug, Clone)] + pub struct IndexingFlags: u32 { + const TRANSACTIONS = 0b00000001; + const RAW_EVENTS = 0b00000010; + } +} + #[derive(Debug)] pub struct EngineConfig { pub polling_interval: Duration, @@ -53,6 +62,7 @@ pub struct EngineConfig { pub events_chunk_size: u64, pub index_pending: bool, pub max_concurrent_tasks: usize, + pub flags: IndexingFlags, } impl Default for EngineConfig { @@ -63,6 +73,7 @@ impl Default for EngineConfig { events_chunk_size: 1024, index_pending: true, max_concurrent_tasks: 100, + flags: IndexingFlags::empty(), } } } @@ -447,13 +458,18 @@ impl Engine

{ for ((block_number, transaction_hash), events) in data.transactions { debug!("Processing transaction hash: {:#x}", transaction_hash); // Process transaction - // let transaction = self.provider.get_transaction_by_hash(transaction_hash).await?; + let transaction = if self.config.flags.contains(IndexingFlags::TRANSACTIONS) { + Some(self.provider.get_transaction_by_hash(transaction_hash).await?) + } else { + None + }; self.process_transaction_with_events( transaction_hash, events.as_slice(), block_number, data.blocks[&block_number], + transaction, ) .await?; @@ -537,6 +553,7 @@ impl Engine

{ events: &[EmittedEvent], block_number: u64, block_timestamp: u64, + transaction: Option, ) -> Result<()> { for (event_idx, event) in events.iter().enumerate() { let event_id = @@ -553,24 +570,25 @@ impl Engine

{ block_timestamp, &event_id, &event, - // transaction_hash, + transaction_hash, ) .await?; } - // Commented out this transaction processor because it requires an RPC call for each - // transaction which is slowing down the sync process by alot. - // Self::process_transaction( - // self, - // block_number, - // block_timestamp, - // transaction_hash, - // transaction, - // ) - // .await?; + if let Some(ref transaction) = transaction { + Self::process_transaction( + self, + block_number, + block_timestamp, + transaction_hash, + transaction, + ) + .await?; + } Ok(()) } + // Process a transaction and events from its receipt. // Returns whether the transaction has a world event. async fn process_transaction_with_receipt( @@ -603,21 +621,21 @@ impl Engine

{ block_timestamp, &event_id, event, - // *transaction_hash, + *transaction_hash, ) .await?; } - // if world_event { - // Self::process_transaction( - // self, - // block_number, - // block_timestamp, - // transaction_hash, - // transaction, - // ) - // .await?; - // } + if world_event && self.config.flags.contains(IndexingFlags::TRANSACTIONS) { + Self::process_transaction( + self, + block_number, + block_timestamp, + *transaction_hash, + &transaction_with_receipt.transaction, + ) + .await?; + } } Ok(world_event) @@ -634,28 +652,28 @@ impl Engine

{ Ok(()) } - // async fn process_transaction( - // &mut self, - // block_number: u64, - // block_timestamp: u64, - // transaction_hash: Felt, - // transaction: &Transaction, - // ) -> Result<()> { - // for processor in &self.processors.transaction { - // processor - // .process( - // &mut self.db, - // self.provider.as_ref(), - // block_number, - // block_timestamp, - // transaction_hash, - // transaction, - // ) - // .await? - // } - - // Ok(()) - // } + async fn process_transaction( + &mut self, + block_number: u64, + block_timestamp: u64, + transaction_hash: Felt, + transaction: &Transaction, + ) -> Result<()> { + for processor in &self.processors.transaction { + processor + .process( + &mut self.db, + self.provider.as_ref(), + block_number, + block_timestamp, + transaction_hash, + transaction, + ) + .await? + } + + Ok(()) + } async fn process_event( &mut self, @@ -663,9 +681,12 @@ impl Engine

{ block_timestamp: u64, event_id: &str, event: &Event, - // transaction_hash: Felt, + transaction_hash: Felt, ) -> Result<()> { - // self.db.store_event(event_id, event, transaction_hash, block_timestamp); + if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { + self.db.store_event(event_id, event, transaction_hash, block_timestamp); + } + let event_key = event.keys[0]; let Some(processor) = self.processors.event.get(&event_key) else { From 7b54e381b8d84bb6432e8bce09d792c5b5491715 Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 19 Sep 2024 11:59:04 -0400 Subject: [PATCH 2/3] Disabled by default --- bin/torii/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index f4e985efec..3747bc9cf1 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -134,11 +134,11 @@ struct Args { max_concurrent_tasks: usize, /// Whether or not to index world transactions - #[arg(long, action = ArgAction::Set, default_value_t = true)] + #[arg(long, action = ArgAction::Set, default_value_t = false)] index_transactions: bool, /// Whether or not to index raw events - #[arg(long, action = ArgAction::Set, default_value_t = true)] + #[arg(long, action = ArgAction::Set, default_value_t = false)] index_raw_events: bool, } From cc6c6b30745031422b0eda4af6468b632b84d95d Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 19 Sep 2024 12:12:01 -0400 Subject: [PATCH 3/3] index raw events by default --- bin/torii/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 3747bc9cf1..cf568429ce 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -138,7 +138,7 @@ struct Args { index_transactions: bool, /// Whether or not to index raw events - #[arg(long, action = ArgAction::Set, default_value_t = false)] + #[arg(long, action = ArgAction::Set, default_value_t = true)] index_raw_events: bool, } @@ -211,7 +211,7 @@ async fn main() -> anyhow::Result<()> { flags.insert(IndexingFlags::RAW_EVENTS); } - let mut engine = Engine::new( + let mut engine: Engine>> = Engine::new( world, db.clone(), provider.clone(),