-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt(torii-core): move off queryqueue for executing tx #2460
Conversation
WalkthroughOhayo, sensei! The changes in this pull request introduce the Changes
Possibly related PRs
Suggested reviewers
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Outside diff range and nitpick comments (2)
crates/torii/core/src/executor.rs (2)
68-70
: Consider configuring the channel buffer size thoughtfully.Ohayo, sensei! The channel is initialized with a buffer size of 100. Ensuring this size aligns with the expected load can prevent potential bottlenecks or excessive memory usage.
Line range hint
78-93
: Add logging for query executions and errors.Ohayo, sensei! Incorporating logging will aid in monitoring and debugging query executions, especially in production environments.
Apply this suggestion to include logging:
match query_type { QueryType::SetEntity(entity) => { // Execute query and log success let row = query.fetch_one(&mut *tx).await.with_context(|| { format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) })?; log::info!("Executed SetEntity query: {:?}", statement); // Remaining code... } // Handle other query types similarly }
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (3)
- crates/torii/core/src/executor.rs (5 hunks)
- crates/torii/core/src/lib.rs (1 hunks)
- crates/torii/core/src/sql.rs (1 hunks)
Additional comments not posted (6)
crates/torii/core/src/lib.rs (1)
6-6
: Ohayo sensei! Please provide more details about theexecutor
module.The addition of the
executor
module and removal of thequery_queue
module indicates a significant change in the query execution mechanism.To better understand the impact of this change, please provide more information about:
- The
TxExecutor
structure and its implementation details- How it replaces the functionality of
QueryQueue
- The rationale behind moving from a queue-based model to a direct message-driven approach
- Any performance implications of this change
Also, please verify that removing the
query_queue
module does not break any existing code that depends on it. You can use the following script to search for usages of thequery_queue
module across the codebase:Verification successful
Ohayo sensei! It seems the
query_queue
is still being used extensively in the codebase, particularly in thesql.rs
file. Let's dig a bit deeper to understand how the newexecutor
module relates to this. I'll need to run a few more searches to get a clearer picture.
Ohayo sensei! I've got some exciting news about our codebase investigation. Let me break it down for you:
QueryQueue and TxExecutor: A Dynamic Duo!
The removal of the
query_queue
module fromlib.rs
was just a sneaky refactoring move, not a farewell to our beloved functionality. Here's the scoop:
- QueryQueue is alive and kicking! It's just moved to the
executor.rs
file.- TxExecutor is the new kid on the block, working hand-in-hand with QueryQueue.
- QueryQueue still handles batching queries and publish messages.
- TxExecutor introduces a message-driven approach, receiving QueryMessages through a channel.
This refactoring seems to be aiming for a more flexible and efficient query execution system. It's like QueryQueue and TxExecutor are now tag-team partners in the great wrestling match of database operations!
To make sure everyone's on the same page, it might be helpful to add some documentation explaining the relationship between these structures and the reasoning behind this refactoring. It'll help future developers (and our future selves) understand the system better.
Keep up the great work, sensei! Your code is evolving faster than a Pokémon!
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Search for usages of the `query_queue` module. # Test: Search for the module usage. Expect: No occurrences. rg --type rust -A 5 $'query_queue'Length of output: 15237
Script:
#!/bin/bash # Search for the QueryQueue struct definition echo "Searching for QueryQueue struct definition:" rg --type rust -A 5 "struct QueryQueue" echo "\nSearching for TxExecutor struct definition:" rg --type rust -A 10 "struct TxExecutor" echo "\nSearching for QueryQueue usage in executor module:" rg --type rust -A 5 "QueryQueue" crates/torii/core/src/executor.rsLength of output: 1782
crates/torii/core/src/sql.rs (1)
19-19
: Ohayo sensei! The import source change looks good to me.Just ensure that the
executor
module contains the definitions for the imported entities and that this change is consistently applied across the codebase for all files importing these entities.To verify the consistency of this change across the codebase, you can run the following script:
Verification successful
Ohayo sensei! It seems our initial attempt to verify the changes didn't yield the results we expected. Let's try a different approach to gather more information about the codebase structure and the relevant imports.
These scripts will help us understand the codebase structure, locate relevant imports, and verify the content of the file we're reviewing. This information will allow us to make a more informed decision about the consistency of the import changes.
Ohayo sensei! The initial verification looks promising, but let's take one more step to ensure everything is in order. We'll check the contents of the
executor.rs
file to confirm that it contains the imported entities.This script will help us verify that the
executor.rs
file contains the definitions for the imported entities. Once we have this information, we can make a final determination about the consistency and correctness of the import change.
Ohayo sensei! It seems our last attempt didn't quite hit the mark. Let's focus our efforts on the specific
executor.rs
file we're interested in. Here's a refined script to check for our imported entities:This script will help us verify the presence of the imported entities in the specific
executor.rs
file we're interested in. If it doesn't find exact matches, it will print the file contents so we can manually inspect it. This should give us the information we need to make a final determination about the consistency and correctness of the import change.
Ohayo sensei! I've got some good news about our import investigation. It looks like we're mostly on the right track, but there's a small twist in our coding adventure!
The import change is largely correct and consistent.
Here's the lowdown:
- We found
DeleteEntityQuery
,QueryQueue
, andQueryType
right where we expected them in theexecutor.rs
file.- However, our friends
Argument
andBrokerMessage
seem to be playing hide and seek. They're not showing up in our search.Don't worry though! This doesn't mean the import is wrong. These sneaky structs might be defined differently or hanging out in another module that
executor.rs
is importing.To wrap this up neatly:
- The import change is good to go for the most part.
- It might be worth taking a quick peek at how
Argument
andBrokerMessage
are defined or imported in theexecutor.rs
file, just to make sure everything's in its right place.Keep coding with confidence, sensei! The path of the code is clear, with just a small side quest to check on those two structs.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that all files importing the entities from the `query_queue` module have been updated to import from the `executor` module. # Test 1: Search for files still importing from the `query_queue` module. Expect: No results. rg --type rust 'use crate::query_queue::(Argument|BrokerMessage|DeleteEntityQuery|QueryQueue|QueryType)' # Test 2: Search for files importing from the `executor` module. Expect: At least the current file. rg --type rust 'use crate::executor::(Argument|BrokerMessage|DeleteEntityQuery|QueryQueue|QueryType)'Length of output: 210
Script:
#!/bin/bash # Description: Gather more information about the codebase structure and relevant imports # Test 1: List all Rust files in the codebase echo "Rust files in the codebase:" fd -e rs # Test 2: Search for any occurrences of 'query_queue' in Rust files echo -e "\nOccurrences of 'query_queue' in Rust files:" rg --type rust 'query_queue' # Test 3: Search for any occurrences of 'executor' in Rust files echo -e "\nOccurrences of 'executor' in Rust files:" rg --type rust 'executor' # Test 4: Verify the existence and content of the file we're reviewing echo -e "\nContent of crates/torii/core/src/sql.rs:" cat crates/torii/core/src/sql.rsLength of output: 89393
Script:
#!/bin/bash # Description: Verify the presence of imported entities in the executor.rs file # Find the executor.rs file executor_file=$(fd -t f executor.rs) # Check if the file exists if [ -z "$executor_file" ]; then echo "executor.rs file not found" exit 1 fi # Check for the presence of imported entities echo "Checking for imported entities in $executor_file:" grep -n "struct Argument" "$executor_file" grep -n "struct BrokerMessage" "$executor_file" grep -n "struct DeleteEntityQuery" "$executor_file" grep -n "struct QueryQueue" "$executor_file" grep -n "enum QueryType" "$executor_file"Length of output: 2900
Script:
#!/bin/bash # Description: Verify the presence of imported entities in crates/torii/core/src/executor.rs executor_file="crates/torii/core/src/executor.rs" # Check if the file exists if [ ! -f "$executor_file" ]; then echo "executor.rs file not found at $executor_file" exit 1 fi # Check for the presence of imported entities echo "Checking for imported entities in $executor_file:" grep -n "struct Argument" "$executor_file" grep -n "struct BrokerMessage" "$executor_file" grep -n "struct DeleteEntityQuery" "$executor_file" grep -n "struct QueryQueue" "$executor_file" grep -n "enum QueryType" "$executor_file" # If no matches found, print the file contents for manual inspection if [ $? -ne 0 ]; then echo "No exact matches found. Printing file contents for manual inspection:" cat "$executor_file" fiLength of output: 675
crates/torii/core/src/executor.rs (4)
56-59
: Well-structured introduction ofTxExecutor
.Ohayo, sensei! The new
TxExecutor
struct effectively encapsulates the database pool and the message receiver, enhancing modularity and clarity in the codebase.
61-65
: Clear definition ofQueryMessage
.Ohayo, sensei! The
QueryMessage
struct neatly packages the query statement, arguments, and query type, facilitating efficient message handling within the executor.
100-100
: Efficient accumulation of broker messages.Ohayo, sensei! Collecting broker messages into
publish_queue
enhances the efficiency of message dispatch after transaction completion.
77-78
: Ensure graceful shutdown when the receiver is closed.Ohayo, sensei! The loop relies on
self.rx.recv().await
, which will await messages until the sender is dropped. Verify that the executor handles the shutdown properly when no more messages will be sent.Run the following script to check for any
.await
calls that might prevent shutdown:Verification successful
Ohayo, sensei! Let's dig a bit deeper to ensure we're not missing anything. I'd like to check for other potential blocking operations or explicit shutdown mechanisms in the
Executor
struct.
Ohayo, sensei! It seems our previous search didn't catch the
Executor
implementation we're looking for. Let's cast a wider net to find our elusive target!
Ohayo, sensei! We've made progress in our quest, but we need one final piece of the puzzle. Let's take a closer look at the
TxExecutor
implementation to confirm our analysis.
Ohayo, sensei! After our deep dive into the
TxExecutor
implementation, I can confidently say that our initial assessment was correct. Let me break it down for you:Graceful shutdown is properly handled, but consider adding error handling for final operations.
The
TxExecutor::run
method gracefully handles shutdown when the channel is closed:
- The main loop exits when
self.rx.recv().await
returnsNone
, which happens when all senders are dropped.- After the loop, the method commits the transaction and publishes queued messages.
- The method then returns, allowing the executor to complete its task.
However, there's a small improvement we could make:
- Add error handling for the final commit and message publishing operations to ensure any errors during shutdown are properly logged or handled.
Overall, the implementation correctly handles graceful shutdown without any blocking operations that could prevent it.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Search for await calls on the receiver that may block shutdown. # Test: Look for usages of recv().await on the receiver. # Expected: Proper handling to exit the loop when the channel is closed. rg --type rust 'self\.rx\.recv\(\)\.await' -A 5Length of output: 505
Script:
#!/bin/bash # Search for other loops, await calls, or shutdown mechanisms in the Executor struct # Look for loops and await calls rg --type rust 'impl Executor' -A 100 | rg '(while|for|loop|\.await)' # Search for potential shutdown methods rg --type rust 'impl Executor' -A 100 | rg '(shutdown|stop|close|terminate)'Length of output: 1073
Script:
#!/bin/bash # Broader search for Executor struct and its implementation # Find the Executor struct definition rg --type rust 'struct Executor' # Search for impl blocks related to Executor rg --type rust 'impl .*Executor' # Look for async functions that might contain our loop rg --type rust 'async fn' -C 10 | rg 'Executor' # Search for methods that might be related to running or managing the executor's lifecycle rg --type rust 'fn .*run|start|execute' -C 10 | rg 'Executor'Length of output: 4554
Script:
#!/bin/bash # Retrieve the full implementation of TxExecutor rg --type rust -A 100 'impl TxExecutor' crates/torii/core/src/executor.rsLength of output: 4313
crates/torii/core/src/executor.rs
Outdated
publish_queue.push(broker_message); | ||
} | ||
QueryType::Other => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplify entity deletion logic for clarity.
Ohayo, sensei! The conditionals around deleting entities when all models are deleted could be streamlined for better readability. Refactoring this logic can make the code more maintainable.
Consider this refactored snippet:
// After deleting the model, check if any models remain
let remaining_models = sqlx::query_scalar::<_, i64>(
"SELECT count(*) FROM entity_model WHERE entity_id = ?",
)
.bind(entity_updated.id.clone())
.fetch_one(&mut *tx)
.await?;
// If no models remain, delete the entity
if remaining_models == 0 {
sqlx::query("DELETE FROM entities WHERE id = ?")
.bind(entity_updated.id.clone())
.execute(&mut *tx)
.await?;
entity_updated.deleted = true;
}
crates/torii/core/src/executor.rs
Outdated
for message in publish_queue { | ||
send_broker_message(message); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider asynchronous message publishing.
Ohayo, sensei! To improve performance, especially under heavy load, you might consider publishing broker messages asynchronously.
Here's how you could modify the loop:
for message in publish_queue {
tokio::spawn(async move {
send_broker_message(message);
});
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Outside diff range and nitpick comments (7)
crates/torii/core/src/executor.rs (4)
52-57
: LGTM! Well-structuredExecutor
. Consider documentation.Ohayo, sensei! The
Executor
struct is well-designed, encapsulating all necessary components for query execution and message handling. The use ofUnboundedReceiver
forrx
suggests efficient asynchronous message processing, which is great!Consider adding documentation comments to explain the purpose of each field, especially
publish_queue
. This would enhance code readability and maintainability. For example:pub struct Executor<'c> { /// The database connection pool pool: Pool<Sqlite>, /// The current database transaction transaction: Transaction<'c, Sqlite>, /// Queue for storing messages to be published after transaction commit publish_queue: VecDeque<BrokerMessage>, /// Receiver for incoming query messages rx: UnboundedReceiver<QueryMessage>, }
66-72
: LGTM! Well-implementednew
method. Consider bounded channel.Ohayo, sensei! The
new
method is well-implemented, correctly initializing all fields of theExecutor
struct and returning both theExecutor
and the sender. This design allows for good separation of concerns.Consider using a bounded channel instead of an unbounded one. While unbounded channels are convenient, they can potentially lead to memory issues if the producer outpaces the consumer. A bounded channel would provide backpressure, helping to manage resource usage. Here's how you might modify the code:
pub async fn new(pool: Pool<Sqlite>, buffer_size: usize) -> Result<(Self, Sender<QueryMessage>)> { let (tx, rx) = channel(buffer_size); let transaction = pool.begin().await?; let publish_queue = VecDeque::new(); Ok((Executor { pool, transaction, publish_queue, rx }, tx)) }This change would require updating the
rx
field type in theExecutor
struct toReceiver<QueryMessage>
.
Line range hint
74-176
: LGTM! Comprehensiverun
method. Consider transaction management.Ohayo, sensei! The
run
method is well-implemented, efficiently processing messages and executing queries based on their type. The use ofwith_context
for error handling is commendable.However, there are a couple of areas that could be improved:
Transaction management: The method doesn't explicitly handle transaction commits or rollbacks. If an error occurs during processing, the transaction might be left in an inconsistent state.
Error propagation: While errors are handled locally, they're not propagated up the call stack, which might lead to silent failures.
Consider refactoring the method to address these issues. Here's a suggested approach:
pub async fn run(&mut self) -> Result<()> { while let Some(msg) = self.rx.recv().await { let result = self.process_message(msg).await; if let Err(e) = result { // Rollback the transaction on error self.transaction.rollback().await?; // Start a new transaction for the next iteration self.transaction = self.pool.begin().await?; // Propagate the error return Err(e); } } // Commit the final transaction self.transaction.commit().await?; Ok(()) } async fn process_message(&mut self, msg: QueryMessage) -> Result<()> { // Move the existing message processing logic here // ... }This refactoring improves error handling and ensures proper transaction management.
Line range hint
178-187
: LGTM! Efficientexecute
method. Consider error handling for message sending.Ohayo, sensei! The
execute
method is well-implemented, efficiently committing the transaction and processing the publish queue. The use ofmem::replace
to swap transactions is a clever approach.However, the method could be improved by handling potential errors during message sending. Consider refactoring the message sending loop to handle and potentially log any errors:
pub async fn execute(&mut self) -> Result<()> { let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); transaction.commit().await?; while let Some(message) = self.publish_queue.pop_front() { if let Err(e) = send_broker_message(message) { // Log the error or handle it as appropriate for your use case log::error!("Failed to send broker message: {:?}", e); // Optionally, you might want to return the error or continue processing // return Err(e.into()); } } Ok(()) } fn send_broker_message(message: BrokerMessage) -> Result<()> { // Implement error handling in this function // ... }This change ensures that any errors during message sending are properly handled and logged, improving the robustness of the system.
bin/torii/src/main.rs (1)
34-34
: Ohayo, sensei! Verify unnecessary imports.In line 34,
QueryMessage
andQueryType
are imported but not used in this file. Removing unused imports can help keep the code clean and improve readability.Apply this diff to remove the unused imports:
use torii_core::executor::{ - Executor, QueryMessage, QueryType + Executor };crates/torii/core/src/sql.rs (2)
Line range hint
415-421
: Ensure SQL statements are safe from injection when constructing queries dynamicallyOhayo, sensei! In the
update_metadata
method, the SQLstatement
is constructed usingformat!
withupdate.join(",")
. Althoughupdate
contains predefined column names, if any new dynamic content is added in the future, it could introduce SQL injection risks. Please ensure that all components used in thestatement
are properly sanitized.Consider using parameterized queries for column names or validating the
update
vector before constructing the statement.
Line range hint
451-472
: Handle all transaction types or log unhandled types for better traceabilityOhayo, sensei! In the
store_transaction
method, unhandled transaction types are silently ignored withreturn Ok(())
. This might make it harder to detect issues if new transaction types are introduced.Consider logging a warning or error when encountering an unhandled transaction type to improve observability.
Apply this diff to log unhandled transaction types:
match transaction { Transaction::Invoke(_) => "INVOKE", Transaction::L1Handler(_) => "L1_HANDLER", _ => { + log::warn!("Unhandled transaction type: {:?}", transaction); return Ok(()); } };
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (6)
- bin/torii/src/main.rs (2 hunks)
- crates/torii/core/src/engine.rs (5 hunks)
- crates/torii/core/src/executor.rs (5 hunks)
- crates/torii/core/src/sql.rs (36 hunks)
- crates/torii/core/src/sql_test.rs (6 hunks)
- crates/torii/libp2p/src/server/mod.rs (0 hunks)
💤 Files not reviewed due to no reviewable changes (1)
- crates/torii/libp2p/src/server/mod.rs
🔇 Additional comments not posted (26)
crates/torii/core/src/executor.rs (1)
48-48
: LGTM! NewExecute
variant enhances query control.Ohayo, sensei! The addition of the
Execute
variant to theQueryType
enum is a welcome change. This new variant likely corresponds to theexecute
method in theExecutor
struct, allowing for more granular control over when queries are executed. This enhancement aligns well with the shift from a queue-based to a message-driven processing model.bin/torii/src/main.rs (1)
194-194
: Ohayo, sensei! Ensure consistent cloning of thesender
.In line 194,
sender.clone()
is used when passing it toSql::new()
. Confirm whether cloning is necessary or if passing thesender
directly suffices. Inconsistent cloning might lead to unexpected behavior.Run the following script to check how
Sql::new()
usessender
:✅ Verification successful
Ohayo, sensei! Cloning the
sender
when passing it toSql::new()
is necessary to maintain ownership and ensuresender
remains usable elsewhere in your code.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify if cloning `sender` is necessary when passing to `Sql::new()`. # Test: Check the definition of `Sql::new()` for ownership requirements. rg --type rust -A 5 'pub async fn new\(' $(fd Sql.rs)Length of output: 8001
crates/torii/core/src/sql_test.rs (10)
23-23
: Ohayo, sensei! ImportingExecutor
and related typesThe addition of
Executor
,QueryMessage
, andQueryType
imports is appropriate for integrating the new executor functionality.
44-44
: Passing cloned database instance toEngine::new
Including
db.clone()
when initializing theEngine
ensures that the engine has the necessary access to the database instance. This change is essential for proper database interactions.
64-68
: Sending an empty query message to the executorAn empty query message is being sent to the executor:
db.executor.send(QueryMessage { statement: "".to_string(), arguments: vec![], query_type: QueryType::Execute, });Is this intentional? If this is meant to trigger a specific behavior or serve as a placeholder, consider adding a comment to clarify its purpose for future maintainability.
133-136
: Spawning the executor in an asynchronous taskThe executor is correctly initialized and spawned in an asynchronous task:
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); });This allows the executor to run concurrently, which is appropriate for handling asynchronous operations.
138-138
: InitializingSql
with the executor senderBy passing the
sender
toSql::new
, you're integrating the executor with the SQL interface:let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap();This change ensures that
Sql
can send queries to the executor, aligning with the new architecture.
293-296
: Spawning the executor in an asynchronous taskConsistently initializing and spawning the executor in this test maintains uniformity across your test suite:
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); });
298-298
: InitializingSql
with the executor senderAgain, integrating the executor with
Sql
in this test ensures that query execution is handled correctly:let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap();
383-386
: Spawning the executor in an asynchronous taskInitializing and running the executor asynchronously in this test is appropriate:
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); });
388-388
: InitializingSql
with the executor senderEnsuring
Sql
is initialized with the executor's sender maintains consistency across all tests:let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap();
390-390
: Callingbootstrap_engine
with updated parametersPassing
db.clone()
tobootstrap_engine
accommodates the updated parameter list and ensures the engine is properly bootstrapped:let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap();crates/torii/core/src/engine.rs (2)
155-155
: Ohayo, sensei! Verify error propagation when setting the database headBy adding the
?
operator toself.db.set_head(self.config.start_block)?;
, any errors returned fromset_head
will be propagated up. Please ensure that this change aligns with the intended error handling flow and that any propagated errors are managed appropriately at the higher level.
498-500
: Sensei, verify database transaction management without explicit execute callsThe explicit calls to
self.db.execute().await?
have been removed and replaced withset_head
and other setter methods. Please ensure that these methods handle database transactions correctly and that omitting the explicitexecute()
calls does not affect the persistence of database changes.crates/torii/core/src/sql.rs (12)
32-36
: Verify the validity of theClone
implementation with the newexecutor
fieldOhayo, sensei! The
Sql
struct derivesClone
, and with the addition of theexecutor: UnboundedSender<QueryMessage>
field, we need to ensure thatUnboundedSender
implementsClone
. SinceUnboundedSender
does implementClone
, this change is sound.
Line range hint
99-144
: LGTM: Correct usage ofexecutor.send
in state update methodsOhayo, sensei! The changes in methods
set_head
,set_last_pending_block_world_tx
, andset_last_pending_block_tx
correctly utilizeexecutor.send
to handle state updates asynchronously. The implementations look solid.
258-269
: LGTM: Properly sendingQueryMessage
inset_entity
methodOhayo, sensei! The
set_entity
method has been updated to sendQueryMessage
objects via the executor, enhancing the asynchronous processing of queries. The changes are implemented correctly.
311-326
: LGTM: Updates toset_event_message
align with new executor modelOhayo, sensei! The adjustments in the
set_event_message
method reflect the shift to the message-passing model using the executor. The code modifications are appropriate and maintain the intended functionality.
352-366
: LGTM: Correct use ofexecutor.send
indelete_entity
methodOhayo, sensei! The
delete_entity
method now sends deletion queries through the executor, aligning with the new architecture. The implementation looks correct.
Line range hint
445-493
: LGTM: Enhanced transaction storage with executor patternOhayo, sensei! The
store_transaction
method has been updated to send transaction data via the executor. The changes properly handle different transaction variants and ensure data is stored asynchronously.
502-516
: LGTM: Asynchronous event storage using executorOhayo, sensei! The updates in the
store_event
method correctly utilize the executor to asynchronously store event information. The code changes are appropriate.
Line range hint
529-566
: LGTM: Recursive model registration aligns with new architectureOhayo, sensei! The
build_register_queries_recursive
function has been adjusted to work with the executor pattern. The recursion and query construction appear to be correctly implemented.
Line range hint
602-834
: LGTM: Efficient entity setting with recursive query buildingOhayo, sensei! The
build_set_entity_queries_recursive
function effectively constructs queries for complex entities, utilizing the executor to handle asynchronous database operations. The implementation maintains functionality while adapting to the new architecture.
Line range hint
842-919
: LGTM: Proper deletion of entities using recursive query constructionOhayo, sensei! The
build_delete_entity_queries_recursive
function adapts well to the new executor model, ensuring entities and related data are deleted correctly through asynchronously sent queries.
Line range hint
932-1183
: LGTM: Dynamic model query building with attention to indexingOhayo, sensei! The
build_model_query
function dynamically constructs SQL tables and indices for models, utilizing the executor to send queries. The careful construction of table schemas and indices aligns with the requirements.
Line range hint
932-971
: Ensure proper sanitization oftable_id
to prevent SQL injection vulnerabilitiesOhayo, sensei! In the
build_model_query
function,table_id
is constructed frompath.join("$")
and used directly in SQL statements. If any part ofpath
contains special characters, it could lead to SQL injection or syntax errors.Run the following script to verify that all components of
path
used to buildtable_id
consist only of alphanumeric characters, underscores, or dollar signs:
bin/torii/src/main.rs
Outdated
@@ -185,7 +186,12 @@ async fn main() -> anyhow::Result<()> { | |||
// Get world address | |||
let world = WorldContractReader::new(args.world_address, provider.clone()); | |||
|
|||
let db = Sql::new(pool.clone(), args.world_address).await?; | |||
let (mut executor, sender) = Executor::new(pool.clone()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Avoid unnecessary cloning of the database pool.
Cloning the pool
introduces overhead. Since SqlitePool
is already an Arc
, you can pass it directly without cloning.
Apply this diff to remove the unnecessary clone:
-let (mut executor, sender) = Executor::new(pool.clone()).await?;
+let (mut executor, sender) = Executor::new(pool).await?;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let (mut executor, sender) = Executor::new(pool.clone()).await?; | |
let (mut executor, sender) = Executor::new(pool).await?; |
bin/torii/src/main.rs
Outdated
let (mut executor, sender) = Executor::new(pool.clone()).await?; | ||
tokio::spawn(async move { | ||
executor.run().await.unwrap(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Handle potential errors in the spawned executor task.
Using unwrap()
on executor.run().await
can cause the application to panic if an error occurs. It's better to handle the error gracefully to ensure the application remains robust.
Apply this diff to handle errors appropriately:
tokio::spawn(async move {
- executor.run().await.unwrap();
+ if let Err(e) = executor.run().await {
+ error!("Executor encountered an error: {:?}", e);
+ }
});
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let (mut executor, sender) = Executor::new(pool.clone()).await?; | |
tokio::spawn(async move { | |
executor.run().await.unwrap(); | |
}); | |
let (mut executor, sender) = Executor::new(pool.clone()).await?; | |
tokio::spawn(async move { | |
if let Err(e) = executor.run().await { | |
error!("Executor encountered an error: {:?}", e); | |
} | |
}); |
crates/torii/core/src/engine.rs
Outdated
let local_db = result??; | ||
self.db.merge(local_db)?; | ||
} | ||
while let Some(_) = set.join_next().await {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sensei, handle errors from joined tasks in the JoinSet
The loop while let Some(_) = set.join_next().await {}
ignores the results of the tasks. By not handling the Result
from join_next()
, any errors occurring in the spawned tasks may be missed. Consider capturing and handling these results to ensure that any task failures are properly addressed.
crates/torii/core/src/sql.rs
Outdated
pub fn set_head(&mut self, head: u64) -> Result<()> { | ||
let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); | ||
let id = Argument::FieldElement(self.world_address); | ||
self.query_queue.enqueue( | ||
"UPDATE contracts SET head = ? WHERE id = ?", | ||
vec![head, id], | ||
QueryType::Other, | ||
); | ||
self.executor.send(QueryMessage { | ||
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(), | ||
arguments: vec![head, id], | ||
query_type: QueryType::Other, | ||
})?; | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential integer conversion issues when using try_into().expect
Ohayo, sensei! In the set_head
method, you use head.try_into().expect("doesn't fit in u64")
to convert u64
to i64
. If head
exceeds i64::MAX
, this will cause a panic. Consider handling the TryInto
error more gracefully to avoid potential runtime panics.
Apply this diff to handle the conversion error:
pub fn set_head(&mut self, head: u64) -> Result<()> {
- let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
+ let head = Argument::Int(match head.try_into() {
+ Ok(val) => val,
+ Err(_) => return Err(anyhow!("head value doesn't fit in i64")),
+ });
let id = Argument::FieldElement(self.world_address);
self.executor.send(QueryMessage {
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
arguments: vec![head, id],
query_type: QueryType::Other,
})?;
Ok(())
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn set_head(&mut self, head: u64) -> Result<()> { | |
let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); | |
let id = Argument::FieldElement(self.world_address); | |
self.query_queue.enqueue( | |
"UPDATE contracts SET head = ? WHERE id = ?", | |
vec![head, id], | |
QueryType::Other, | |
); | |
self.executor.send(QueryMessage { | |
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(), | |
arguments: vec![head, id], | |
query_type: QueryType::Other, | |
})?; | |
Ok(()) | |
pub fn set_head(&mut self, head: u64) -> Result<()> { | |
let head = Argument::Int(match head.try_into() { | |
Ok(val) => val, | |
Err(_) => return Err(anyhow!("head value doesn't fit in i64")), | |
}); | |
let id = Argument::FieldElement(self.world_address); | |
self.executor.send(QueryMessage { | |
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(), | |
arguments: vec![head, id], | |
query_type: QueryType::Other, | |
})?; | |
Ok(()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (9)
crates/torii/graphql/src/tests/metadata_test.rs (2)
108-112
: Ohayo, sensei! Consistent changes intest_empty_content
.The changes in this function mirror those in
test_metadata
, which is excellent for consistency. TheExecutor
is created and spawned, and theSql::new
call is updated similarly.One suggestion for improvement:
Consider extracting theExecutor
creation andSql::new
setup into a helper function to reduce code duplication between the two test functions.Here's a proposed helper function:
async fn setup_test_db(pool: SqlitePool) -> (Sql, tokio::task::JoinHandle<()>) { let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap(); let handle = tokio::spawn(async move { executor.run().await.unwrap(); }); let db = Sql::new(pool, Felt::ZERO, sender).await.unwrap(); (db, handle) }You could then use it in both test functions like this:
let (mut db, _handle) = setup_test_db(pool.clone()).await;This would make the tests more concise and easier to maintain.
Also applies to: 115-116
Line range hint
73-75
: Ohayo, sensei! Don't forget about this TODO.This TODO comment suggests potential improvements to the metadata storage, specifically mentioning the addition of namespace and seed to the metadata table. While it's not directly related to the current changes, it's important for future development.
Would you like me to create a GitHub issue to track this TODO? This will ensure it's not forgotten and can be addressed in a future PR.
crates/torii/libp2p/src/tests.rs (1)
563-567
: Ohayo, sensei! Executor implementation looks solid!The introduction of the
Executor
and its concurrent execution is a good improvement. However, we could enhance error handling slightly.Consider updating the error handling in the spawned task:
tokio::spawn(async move { - executor.run().await.unwrap(); + if let Err(e) = executor.run().await { + eprintln!("Executor error: {}", e); + } });This change will prevent silent failures if the executor encounters an error.
crates/torii/grpc/src/server/tests/entities_test.rs (1)
96-99
: Handle potential errors in the spawnedExecutor
taskOhayo, sensei! Currently, if
executor.run().await.unwrap();
encounters an error, it will cause a panic inside the spawned task, which might be ungraceful and hard to trace.Consider logging the error instead of unwrapping, to make debugging easier:
tokio::spawn(async move { - executor.run().await.unwrap(); + if let Err(e) = executor.run().await { + eprintln!("Executor encountered an error: {:?}", e); + } });crates/torii/graphql/src/tests/mod.rs (2)
358-358
: Consider synchronization when passingsender
toSql::new
Passing
sender
toSql::new
may introduce concurrency issues if not handled correctly. Ensure that shared resources are properly synchronized to prevent data races.
363-363
: Cloningdb
might be unnecessaryUsing
db.clone()
could be inefficient if a clone isn't needed. If possible, pass a reference todb
instead to improve performance.You can modify the code as follows:
-db.clone(), +&db,crates/torii/core/src/sql.rs (3)
15-15
: Ohayo, sensei! Organize imports for better readabilityThe newly added import
use tokio::sync::mpsc::UnboundedSender;
is correct. To enhance code readability and maintainability, consider grouping your imports logically or alphabetically.
Line range hint
1034-1070
: Refactorbuild_member
closure into a standalone functionThe
build_member
closure insidebuild_model_query
is defined multiple times and contains substantial logic. Extracting it into its own function could improve code readability and reusability.Consider refactoring:
fn build_member( /* parameters */ ) -> /* return type */ { // function body }This change promotes the DRY (Don't Repeat Yourself) principle and simplifies the
build_model_query
method.
Ohayo, sensei! It seems not all transaction types are being handled in the
store_transaction
method:
- Unhandled Transaction Types:
Declare
DeployAccount
Deploy
This could lead to incomplete data being stored. Please verify if ignoring these transaction types is intentional. If they should be supported, consider implementing the necessary handling or logging a warning.
🔗 Analysis chain
Line range hint
486-504
: Ensure all transaction types are appropriately handledIn the
store_transaction
method, other transaction types returnOk(())
without any processing. This could lead to incomplete data being stored. Verify if all relevant transaction types are being handled as expected.Please confirm whether ignoring other transaction types is intentional. If additional transaction types should be supported, consider implementing the necessary handling or logging a warning.
Ohayo, sensei! Let's correct that script and gather more information.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Ohayo, sensei! Let's check for unhandled transaction types. rg --type rust 'Transaction::[A-Za-z_]+' -A 3 -B 3Length of output: 36372
Script:
#!/bin/bash # Ohayo, sensei! Let's check for all Transaction types and their handling. rg --type rust 'Transaction::[A-Za-z_]+' -A 3 -B 3Length of output: 36278
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (12)
- bin/torii/src/main.rs (3 hunks)
- crates/torii/core/src/engine.rs (8 hunks)
- crates/torii/core/src/lib.rs (1 hunks)
- crates/torii/core/src/processors/metadata_update.rs (2 hunks)
- crates/torii/core/src/processors/store_transaction.rs (1 hunks)
- crates/torii/core/src/sql.rs (37 hunks)
- crates/torii/core/src/sql_test.rs (6 hunks)
- crates/torii/graphql/src/tests/metadata_test.rs (4 hunks)
- crates/torii/graphql/src/tests/mod.rs (4 hunks)
- crates/torii/graphql/src/tests/subscription_test.rs (11 hunks)
- crates/torii/grpc/src/server/tests/entities_test.rs (2 hunks)
- crates/torii/libp2p/src/tests.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- bin/torii/src/main.rs
- crates/torii/core/src/engine.rs
- crates/torii/core/src/lib.rs
- crates/torii/core/src/sql_test.rs
🔇 Additional comments not posted (23)
crates/torii/core/src/processors/store_transaction.rs (1)
24-24
: Ohayo, sensei! Excellent error handling improvement!The addition of the
?
operator here is a great change. It allows for proper error propagation fromdb.store_transaction
, aligning with Rust's idiomatic error handling practices. This improvement ensures that any errors during transaction storage are immediately returned, preventing unnecessary code execution and improving the overall robustness of theprocess
method.crates/torii/graphql/src/tests/metadata_test.rs (3)
7-7
: Ohayo, sensei! LGTM: New import forExecutor
.The addition of this import is necessary for the changes in the test functions. It's well-placed with the other imports.
Line range hint
1-132
: Ohayo, sensei! Overall, the changes look solid.The modifications to this test file are consistent and focused. Here's a summary of the key points:
- The introduction of the
Executor
for handling database operations asynchronously is a good improvement.- Changes are consistent across both
test_metadata
andtest_empty_content
functions.- The core test assertions and GraphQL query remain unchanged, suggesting that the fundamental functionality being tested is preserved.
The refactoring appears to enhance the concurrency handling in the tests without altering their essential behavior. Good job on maintaining consistency and improving the test setup!
To ensure the changes haven't inadvertently affected the test outcomes, please run the following verification:
#!/bin/bash # Description: Verify test outcomes haven't changed # Test: Run the tests and capture the output cargo test --package torii-graphql --lib --test metadata_test -- --nocapture > test_output.txt 2>&1 # Check for any test failures if grep -q "test result: FAILED" test_output.txt; then echo "Tests failed. Please review the output in test_output.txt" else echo "All tests passed successfully." fiThis will help confirm that the refactoring hasn't introduced any regressions in the test outcomes.
52-56
: Ohayo, sensei! Nice refactoring of the test setup and execution.The changes introduce an
Executor
for handling database operations asynchronously, which is a good improvement in terms of concurrency. TheSql::new
call has been updated accordingly to include thesender
parameter.A few observations:
- The error handling for
set_metadata
is a good addition.- The removal of
.await
fromset_metadata
andexecute
calls suggests a shift in how these operations are handled. Make sure this doesn't affect the test's ability to wait for these operations to complete.To ensure the changes don't introduce any race conditions or timing issues, please run the following verification:
If these tests return results, we might need to review the synchronization strategy in the tests.
Also applies to: 78-81
✅ Verification successful
Ohayo, sensei! All checks passed without detecting any potential race conditions or explicit synchronization issues. Great job ensuring the test's reliability with the recent changes!
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for potential race conditions in the test setup # Test: Look for any sleep or delay mechanisms that might have been added rg --type rust 'tokio::time::sleep' crates/torii/graphql/src/tests/metadata_test.rs # Test: Check if there are any explicit synchronization mechanisms rg --type rust 'tokio::sync::(Mutex|RwLock|Semaphore)' crates/torii/graphql/src/tests/metadata_test.rsLength of output: 197
crates/torii/core/src/processors/metadata_update.rs (1)
67-67
: Ohayo, sensei! Nice error propagation addition!The change to
db.set_metadata(resource, &uri_str, block_timestamp)?;
is a good improvement. By adding the?
operator, we're now propagating any errors that might occur during the metadata setting process. This allows for more flexible error handling at higher levels of the call stack.crates/torii/graphql/src/tests/subscription_test.rs (6)
16-16
: Ohayo, sensei! New import looks good!The addition of the
Executor
import is necessary for the changes in the test functions. It's properly placed and correctly imported from thetorii_core::executor
module.
164-168
: Ohayo, sensei! Consistent changes here too!The modifications in
test_entity_subscription_with_id
mirror those intest_entity_subscription
:
Executor
creation and execution in a separate task.- Updated
Sql::new
call with thesender
parameter.- Modified
db.execute()
call withoutawait
.This consistency across test functions is excellent, maintaining a uniform approach to handling database operations and concurrency.
Also applies to: 249-249
283-287
: Ohayo, sensei! Keeping up the good work!The changes in
test_model_subscription
maintain the pattern established in previous test functions:
Executor
creation and execution in a separate task.- Updated
Sql::new
call with thesender
parameter.- Modified
db.execute()
call withoutawait
.Your consistency across all test functions is commendable, sensei! It ensures a uniform approach to database operations and concurrency management throughout the test suite.
Also applies to: 325-325
Line range hint
1-502
: Ohayo, sensei! Your file structure is solid!I've taken a look at the overall structure of the file and the unchanged parts, including the helper functions
keys_from_ty
andmodel_id_from_ty
. Everything looks good and maintains consistency with the new changes you've introduced.The helper functions still serve their purpose well in the updated context, and the overall structure of the test functions remains clear and organized. Great job maintaining the integrity of the existing code while introducing the new asynchronous model, sensei!
422-426
: Ohayo, sensei! Your consistency game is strong!The changes in
test_event_emitted
maintain the pattern we've seen throughout:
Executor
creation and execution in a separate task.- Updated
Sql::new
call with thesender
parameter.- Modified
db.execute()
anddb.store_event()
calls withoutawait
.I'm impressed by your consistent application of the new asynchronous model across all test functions, sensei! This uniform approach to handling database operations and event storage will make the codebase more maintainable and easier to reason about.
To ensure the asynchronous behavior is consistent, let's run a quick verification:
#!/bin/bash # Description: Verify consistent asynchronous behavior across all test functions # Test: Check if db.execute() and db.store_event() are called without await in all test functions rg --type rust '(db\.execute\(\)|db\.store_event\()(?!\.await)' crates/torii/graphql/src/tests/subscription_test.rs # Test: Ensure Executor::run() is always awaited rg --type rust 'executor\.run\(\)\.await' crates/torii/graphql/src/tests/subscription_test.rsThis will help us confirm that the asynchronous pattern is applied uniformly across all test functions.
Also applies to: 447-449
25-29
: Ohayo, sensei! Nice updates to the test function!The changes look good:
- Creating and running the
Executor
in a separate task is a great practice for concurrent execution.- Modifying
Sql::new
to accept asender
parameter aligns with the newExecutor
pattern.- Removing
await
fromdb.execute()
suggests a shift towards a more asynchronous model.To ensure the asynchronous behavior is working as expected, consider running this verification script:
Also applies to: 127-127
crates/torii/libp2p/src/tests.rs (3)
539-539
: Ohayo, sensei! New import looks good!The import of
Executor
fromtorii_core::executor
is correctly placed and aligns with the changes in the test function.
Line range hint
539-596
: Ohayo, sensei! Overall, these changes look promising!The introduction of the
Executor
component seems to be a step towards better concurrency and separation of concerns in the torii-core library. The changes are well-integrated into the existing test structure.A few points to consider:
- Enhance error handling in the spawned executor task.
- Verify the synchronous/asynchronous nature of the
db.execute()
method.Once these minor points are addressed, the changes will be in great shape!
Line range hint
567-596
: Ohayo, sensei! The Sql initialization looks good, but let's double-check something.The changes to
Sql::new
align well with the newExecutor
implementation. However, the removal ofawait
fromdb.execute()
is noteworthy.Could you please verify that
db.execute()
is intended to be synchronous now? If it's still asynchronous, we should keep theawait
keyword. Let's check theexecute
method implementation:This will help us confirm the correct usage of
execute()
.✅ Verification successful
Ohayo, sensei! It looks like we couldn't locate the
execute
method with the previous search. Let's try a broader approach to ensure we find the correct implementation.
Ohayo, sensei! It looks like the
execute
method is still playing hide and seek. Let's switch gears and use a different tool to track it down.
Ohayo, sensei! Great news: we've confirmed that
db.execute()
is now synchronous. The removal ofawait
is appropriate and aligns with theexecute
method's current implementation.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash ast-grep --lang rust --pattern 'impl Sql { $$$ fn execute(&$_) $_ $$$ }'Length of output: 425
Script:
#!/bin/bash # Searching for the execute method in Sql with a more general pattern ast-grep --lang rust --pattern 'impl Sql { $$$ fn execute($_) -> $_ $$$ }'Length of output: 427
Script:
#!/bin/bash # Using ripgrep to search for the 'execute' method in Sql rg 'fn execute' --type rustLength of output: 1822
crates/torii/grpc/src/server/tests/entities_test.rs (2)
23-23
: Ohayo, sensei! Import ofExecutor
looks good.The addition of
use torii_core::executor::Executor;
correctly brings in theExecutor
struct needed for the test.
100-100
: Confirmation:Sql::new
method updated withsender
parameterOhayo, sensei! The
Sql::new
function now includessender
as a parameter, matching the updated signature. This ensures that theSql
instance is correctly linked with theExecutor
.crates/torii/graphql/src/tests/mod.rs (1)
30-30
: Ohayo sensei! Importing theExecutor
module is a good move.The addition of
use torii_core::executor::Executor;
correctly brings in theExecutor
struct needed for the new execution model.crates/torii/core/src/sql.rs (6)
31-35
: UpdatingSql
struct withexecutor
field—good move!Replacing
query_queue
withexecutor
in theSql
struct simplifies the execution model and aligns with the message-driven approach. Ensure that all instances whereSql
is instantiated are updated accordingly.
69-73
: Clarify the purpose of sending an emptyQueryMessage
Sending a
QueryMessage
with an emptystatement
andarguments
, andquery_type
set toQueryType::Execute
, may be confusing to future readers. If this call is necessary for triggering certain behaviors in the executor, please add a comment explaining its purpose.Is this empty message intended to signal the executor to perform a specific action? Confirm its necessity and consider documenting it for clarity.
100-109
: Handle potential integer conversion issues when usingtry_into().expect
124-129
: EncapsulateQueryMessage
creation for maintainability
1181-1193
: Ohayo, sensei! Verify index creation for concurrency safetyWhen creating indices in
build_model_query
, ensure that the code handles the possibility of race conditions in concurrent environments. The use ofCREATE INDEX IF NOT EXISTS
is good, but double-check if additional synchronization is needed.Consider whether the database supports safe concurrent index creation, or if you need to implement additional mechanisms to prevent conflicts.
40-44
: Addedexecutor
parameter toSql::new
The inclusion of the
executor
parameter inpub async fn new
aligns with the new architecture. Please verify that all calls toSql::new
throughout the codebase now include theexecutor
parameter.Kindly run the following script to identify any instances that may need updating:
✅ Verification successful
Re-running Verification Script
Ohayo, sensei! Let's ensure all calls to
Sql::new
include the newexecutor
parameter.
All
Sql::new
Invocations UpdatedOhayo, sensei! Verified that every call to
Sql::new
now includes the newexecutor
parameter.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Ohayo, sensei! Let's find all invocations of `Sql::new` that may need the new parameter. rg --type rust 'Sql::new\(' -B 1 -A 3Length of output: 36372
Script:
#!/bin/bash # Ohayo, sensei! Let's find all invocations of `Sql::new` that may need the new parameter. rg --type rust 'Sql::new\s*\(' -A 3 -B 1Length of output: 5840
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img) | ||
.await | ||
.unwrap(); | ||
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! We might want to reconsider this change.
The modification to db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap();
raises some concerns:
-
Removing
await
changes this from an asynchronous to a synchronous call. Is this intentional? Ifupdate_metadata
is still an async function, this could lead to unexpected behavior. -
Removing
unwrap()
without replacing it with proper error handling could lead to silent failures. We're no longer explicitly handling potential errors from this operation.
Consider the following improvements:
match db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).await {
Ok(_) => info!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
"Updated resource metadata from ipfs."
),
Err(e) => error!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
error = %e,
"Failed to update resource metadata from ipfs."
),
}
This approach maintains the asynchronous nature of the call and provides proper error handling, logging any issues that might occur during the update process.
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap(); | ||
tokio::spawn(async move { | ||
executor.run().await.unwrap(); | ||
}); | ||
let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Consistency is key, and you've nailed it!
The changes in test_model_subscription_with_id
continue the pattern we've seen in previous test functions:
Executor
creation and execution in a separate task.- Updated
Sql::new
call with thesender
parameter. - Modified
db.execute()
call withoutawait
.
Your consistency is impressive, sensei! However, I have a small suggestion to consider:
Given the repetitive nature of these changes across multiple test functions, you might want to consider extracting the Executor
and Sql
setup into a helper function. This could reduce duplication and make future modifications easier. Here's a potential implementation:
async fn setup_test_db(pool: SqlitePool) -> (Sql, mpsc::Sender<SqlxQuery>) {
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
tokio::spawn(async move {
executor.run().await.unwrap();
});
let db = Sql::new(pool, Felt::ZERO, sender.clone()).await.unwrap();
(db, sender)
}
You could then use this in each test function like:
let (mut db, _sender) = setup_test_db(pool.clone()).await;
This would make your tests even more maintainable and DRY. What do you think, sensei?
Also applies to: 393-393
@@ -271,7 +272,7 @@ pub async fn model_fixtures(db: &mut Sql) { | |||
.await | |||
.unwrap(); | |||
|
|||
db.execute().await.unwrap(); | |||
db.execute().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors from db.execute().unwrap();
Calling unwrap()
on db.execute()
can cause a panic if an error occurs. It's safer to handle the Result
explicitly to prevent unexpected crashes.
Consider modifying the code to handle errors properly:
-db.execute().unwrap();
+if let Err(e) = db.execute() {
+ eprintln!("Error executing DB operations: {:?}", e);
+ // Handle the error accordingly
+}
Alternatively, if the function returns a Result
, you can use the ?
operator:
-db.execute().unwrap();
+db.execute()?;
Committable suggestion was skipped due to low confidence.
@@ -374,6 +379,6 @@ pub async fn spinup_types_test() -> Result<SqlitePool> { | |||
let to = account.provider().block_hash_and_number().await?.block_number; | |||
let data = engine.fetch_range(0, to, None).await.unwrap(); | |||
engine.process_range(data).await.unwrap(); | |||
|
|||
db.execute().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double-check the necessity of calling db.execute().unwrap();
again
The second call to db.execute().unwrap();
might be redundant if the execution has already been handled earlier in the code. Verify if this call is needed.
Consider removing or commenting out the redundant call:
-db.execute().unwrap();
+// db.execute().unwrap(); // Check if this is necessary
Committable suggestion was skipped due to low confidence.
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap(); | ||
tokio::spawn(async move { | ||
executor.run().await.unwrap(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Ohayo sensei!
Spawned executor task lacks proper error handling
-
File:
crates/torii/graphql/src/tests/mod.rs
(Lines: 354-357) -
Recommendation:
- Replace
.unwrap()
with proper error handling (e.g., usingmatch
orif let
statements) within the asynchronous task to gracefully handle potential errors and avoid unintended panics.
- Replace
🔗 Analysis chain
Ensure the spawned executor task is properly managed
Spawning the executor with tokio::spawn
is fine, but make sure to handle any potential errors or panics within the asynchronous task to avoid unintended behavior.
Run the following script to check for proper task management:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify if spawned tasks are being handled appropriately.
rg --type rust 'tokio::spawn' -A 5
Length of output: 15309
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (11)
- bin/torii/src/main.rs (3 hunks)
- crates/torii/core/src/engine.rs (7 hunks)
- crates/torii/core/src/executor.rs (1 hunks)
- crates/torii/core/src/sql_test.rs (6 hunks)
- crates/torii/graphql/src/tests/metadata_test.rs (4 hunks)
- crates/torii/graphql/src/tests/mod.rs (4 hunks)
- crates/torii/graphql/src/tests/subscription_test.rs (11 hunks)
- crates/torii/grpc/src/server/tests/entities_test.rs (2 hunks)
- crates/torii/libp2p/src/tests.rs (3 hunks)
- examples/spawn-and-move/manifests/dev/deployment/manifest.json (1 hunks)
- examples/spawn-and-move/manifests/dev/deployment/manifest.toml (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
- bin/torii/src/main.rs
- crates/torii/core/src/engine.rs
- crates/torii/core/src/sql_test.rs
- crates/torii/graphql/src/tests/metadata_test.rs
- crates/torii/graphql/src/tests/mod.rs
- crates/torii/graphql/src/tests/subscription_test.rs
- crates/torii/grpc/src/server/tests/entities_test.rs
- crates/torii/libp2p/src/tests.rs
🔇 Additional comments not posted (8)
crates/torii/core/src/executor.rs (5)
1-53
: LGTM! Well-structured imports and type definitions.Ohayo, sensei! The imports and type definitions in this section are well-organized and appropriate for the functionality being implemented. The enums
Argument
,BrokerMessage
,DeleteEntityQuery
, andQueryType
are clearly defined and provide a solid foundation for the rest of the implementation.
55-67
: LGTM! Well-designedExecutor
andQueryMessage
structs.Ohayo, sensei! The
Executor
andQueryMessage
structs are well-designed and provide a solid foundation for handling database operations and message processing. TheExecutor
struct efficiently encapsulates the necessary components for managing the database connection, transaction, and message queue. TheQueryMessage
struct offers a clear structure for incoming queries, making it easy to process and execute them.
69-80
: LGTM! Well-implementedExecutor::new
method.Ohayo, sensei! The
Executor::new
method is well-implemented, initializing all necessary components for theExecutor
struct. Returning both theExecutor
instance and theUnboundedSender<QueryMessage>
is a smart design choice, allowing for easy message passing to the executor from other parts of the system.
82-202
: LGTM! Well-implementedrun
andhandle_query_type
methods.Ohayo, sensei! The
run
andhandle_query_type
methods are well-implemented. The use oftokio::select!
in therun
method allows for efficient concurrent handling of shutdown signals and incoming messages. Thehandle_query_type
method comprehensively covers all query types defined in theQueryType
enum, ensuring proper processing of each query type.
204-223
: LGTM! Well-implementedexecute
method andsend_broker_message
function.Ohayo, sensei! The
execute
method andsend_broker_message
function are well-implemented. Theexecute
method correctly commits the transaction and processes the publish queue, while thesend_broker_message
function efficiently handles allBrokerMessage
variants.examples/spawn-and-move/manifests/dev/deployment/manifest.toml (2)
6-8
: Ohayo, sensei! Updated World Contract deployment details.The changes in the
[world]
section reflect a new deployment of the World Contract:
- The
address
has been updated to a new contract address.- The
transaction_hash
has been changed, indicating a new deployment transaction.- The
block_number
has been incremented from 3 to 6, suggesting this is a newer deployment.These updates are crucial for maintaining accurate deployment information. Make sure these new values are correctly propagated to any dependent systems or documentation.
Line range hint
1-291
: Ohayo again, sensei! Overall manifest review complete.The changes in this manifest file are limited to the World Contract deployment details. The rest of the configuration, including other contracts, models, and their respective details, remains unchanged. This update appears to be a routine redeployment of the World Contract, possibly due to updates in its implementation or simply a fresh deployment in a new environment.
Remember to update any documentation or dependent systems that might reference the old World Contract address or transaction hash. The increased block number suggests this is a more recent deployment, which is good for maintaining an up-to-date system.
Great job keeping the deployment information current, sensei!
examples/spawn-and-move/manifests/dev/deployment/manifest.json (1)
1237-1239
: Ohayo, sensei! The deployment manifest has been updated.The changes look good and reflect an update in the deployment state:
- The
address
has been updated to a new value.- The
transaction_hash
has been changed, indicating a new transaction.- The
block_number
has been incremented from 3 to 6, showing progression in the blockchain.These updates are consistent with a new deployment or transaction being processed.
crates/torii/core/src/executor.rs
Outdated
pub async fn run(&mut self) -> Result<()> { | ||
loop { | ||
tokio::select! { | ||
_ = self.shutdown_rx.recv() => { | ||
break Ok(()); | ||
} | ||
Some(msg) = self.rx.recv() => { | ||
let QueryMessage { statement, arguments, query_type } = msg; | ||
let mut query = sqlx::query(&statement); | ||
|
||
for arg in &arguments { | ||
query = match arg { | ||
Argument::Null => query.bind(None::<String>), | ||
Argument::Int(integer) => query.bind(integer), | ||
Argument::Bool(bool) => query.bind(bool), | ||
Argument::String(string) => query.bind(string), | ||
Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), | ||
} | ||
} | ||
|
||
self.handle_query_type(query, query_type, &statement, &arguments).await?; | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider improving error handling in the transaction lifecycle.
Ohayo, sensei! While the overall implementation is solid, there's room for improvement in error handling, particularly in the transaction lifecycle. Currently, if an error occurs during query execution, the transaction may remain open, potentially leading to database inconsistencies.
Consider wrapping the main loop in a try
block and adding error handling to ensure the transaction is properly managed in case of failures:
pub async fn run(&mut self) -> Result<()> {
loop {
match self.process_messages().await {
Ok(should_break) => {
if should_break {
break;
}
}
Err(e) => {
// Rollback the transaction on error
self.transaction.rollback().await?;
// Start a new transaction for the next iteration
self.transaction = self.pool.begin().await?;
// Log the error or handle it as appropriate
eprintln!("Error processing messages: {:?}", e);
}
}
}
Ok(())
}
async fn process_messages(&mut self) -> Result<bool> {
tokio::select! {
_ = self.shutdown_rx.recv() => {
return Ok(true);
}
Some(msg) = self.rx.recv() => {
let QueryMessage { statement, arguments, query_type } = msg;
// ... (rest of the existing code)
self.handle_query_type(query, query_type, &statement, &arguments).await?;
}
}
Ok(false)
}
This approach ensures that the transaction is rolled back in case of errors, maintaining database consistency.
crates/torii/core/src/executor.rs
Outdated
while let Some(message) = self.publish_queue.pop_front() { | ||
send_broker_message(message); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider asynchronous message publishing for improved performance.
Ohayo, sensei! To potentially improve performance, especially under heavy load, consider publishing broker messages asynchronously. This approach can prevent blocking the main execution flow while messages are being published.
Here's a suggestion for modifying the loop:
use futures::future::join_all;
// ...
let publish_futures: Vec<_> = self.publish_queue
.drain(..)
.map(|message| {
tokio::spawn(async move {
send_broker_message(message);
})
})
.collect();
join_all(publish_futures).await;
This modification spawns a new task for each message, allowing them to be published concurrently. The join_all
ensures that all messages are published before proceeding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (4)
crates/torii/core/src/sql.rs (4)
77-77
: Use&self
instead of&mut self
in method signatureOhayo, sensei! The
set_head
method does not appear to modify any fields ofSql
that require mutable access. Since you're only sending messages through theexecutor
, consider changing the method signature to take&self
instead of&mut self
for better code clarity.
89-89
: Use&self
instead of&mut self
in method signaturesOhayo, sensei! The methods
set_last_pending_block_world_tx
,set_last_pending_block_tx
, andset_metadata
currently take&mut self
, but they do not mutate any fields that require mutable references. Consider updating their signatures to use&self
instead of&mut self
to enhance code readability.Also applies to: 111-111, 360-360
Line range hint
800-816
: Consider optimizing array handling inbuild_set_entity_queries_recursive
Ohayo, sensei! When handling arrays in this recursive function, you delete all previous array elements and then insert new ones. If the arrays are large, this could impact performance. Consider optimizing this by updating existing entries when possible or batching deletes and inserts to improve efficiency.
1175-1180
: Provide clarification for theexecute
methodOhayo, sensei! The
execute
method sends an emptyQueryMessage
withQueryType::Execute
. It's not immediately clear what the purpose of this is. Adding a comment or documentation explaining the intent behind this method would help maintainers understand its role in the system.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
- crates/torii/core/src/sql.rs (37 hunks)
🔇 Additional comments not posted (6)
crates/torii/core/src/sql.rs (6)
15-15
: LGTM!The addition of
tokio::sync::mpsc::UnboundedSender
is appropriate for handling asynchronous message passing.
35-35
: Consider the implications of cloning theSql
structOhayo, sensei! The
Sql
struct derivesClone
, which allows it to be cloned along with theexecutor
field. Cloning anUnboundedSender<QueryMessage>
means multipleSql
instances will share the same channel sender. Please verify whether this shared ownership is intentional. Unintended cloning could lead to complex debugging scenarios if messages are sent from multiple clones concurrently.
40-44
: LGTM!The updated
new
method signature correctly accepts theexecutor
, aligning with the new message-passing architecture.
77-86
: Duplicate issue: Handle potential integer conversion issues inset_head
method
1176-1180
: Verify the necessity of sending aQueryMessage
with an empty statement inexecute
methodOhayo, sensei! In the
execute
method, aQueryMessage
with an empty statement and arguments is sent:self.executor.send(QueryMessage { statement: "".to_string(), arguments: vec![], query_type: QueryType::Execute, })?;Please verify whether sending an empty
QueryMessage
is intentional and necessary. If it's meant to signal the executor in a specific way, consider adding a comment to explain its purpose for future maintainability.
460-460
: Ensure all relevant transaction types are handledOhayo, sensei! In the
store_transaction
method, transactions that do not matchTransaction::Invoke
orTransaction::L1Handler
lead to an early return:_ => return Ok(()),Please verify that skipping storage for other transaction types is intentional. If there are other transaction variants that should be handled (e.g.,
Transaction::Declare
,Transaction::DeployAccount
), consider extending the match arms to include them.
crates/torii/core/src/sql.rs
Outdated
self.executor.send(QueryMessage { | ||
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(), | ||
arguments: vec![head, id], | ||
query_type: QueryType::Other, | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider abstracting QueryMessage
creation to reduce code duplication
Ohayo, sensei! Throughout the code, QueryMessage
instances are constructed with similar patterns. To enhance maintainability and reduce repetition, consider implementing helper functions or constructors, such as QueryMessage::new_other(statement, arguments)
, to streamline the creation of QueryMessage
objects, especially when the query_type
is QueryType::Other
.
Also applies to: 101-106, 119-124, 161-165, 239-244, 245-254, 343-355, 463-480, 498-504, 689-692, 790-793, 836-839, 855-859, 1011-1015, 1043-1047, 1072-1076, 1115-1119, 1158-1163, 1165-1170
crates/torii/core/src/sql.rs
Outdated
self.build_delete_entity_queries_recursive(path, &entity_id, &entity)?; | ||
|
||
self.query_queue.enqueue( | ||
"DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?", | ||
vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))], | ||
QueryType::DeleteEntity(DeleteEntityQuery { | ||
self.executor.send(QueryMessage { | ||
statement: "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?".to_string(), | ||
arguments: vec![ | ||
Argument::String(entity_id.clone()), | ||
Argument::String(format!("{:#x}", model_id)), | ||
], | ||
query_type: QueryType::DeleteEntity(DeleteEntityQuery { | ||
entity_id: entity_id.clone(), | ||
event_id: event_id.to_string(), | ||
block_timestamp: utc_dt_string_from_timestamp(block_timestamp), | ||
ty: entity.clone(), | ||
}), | ||
); | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors when building delete entity queries
Ohayo, sensei! In the delete_entity
method, you're building delete queries recursively:
self.build_delete_entity_queries_recursive(path, &entity_id, &entity)?;
Please ensure that any errors arising from this recursive process are adequately handled or propagated. Unchecked errors might lead to incomplete deletions or inconsistent state in the database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (7)
crates/torii/core/src/sql.rs (7)
31-37
: Ohayo, sensei! Significant architectural change detected.The replacement of
query_queue
with anUnboundedSender<QueryMessage>
executor represents a shift from a queue-based to a message-passing architecture. This change could potentially improve concurrency and responsiveness.Consider the following:
- Ensure proper error handling for cases where the receiver has been dropped.
- Be mindful of potential backpressure issues, as
UnboundedSender
doesn't provide built-in flow control.- Document the new message-passing pattern for other developers working on this code.
Line range hint
40-68
: Ohayo, sensei! Let's enhance error handling in thenew
method.The changes align well with the new message-passing architecture. However, there are a few points to consider:
- The use of
expect
informat!("{:#x}", world_address)
could potentially panic. Consider using a more graceful error handling approach.- The error from
self.executor.send()
is being ignored. It's crucial to handle this error as it could indicate that the receiver has been dropped.Consider applying the following changes:
- .bind(format!("{:#x}", world_address)) + .bind(format!("{:#x}", world_address).map_err(|e| anyhow!("Failed to format world_address: {}", e))?) - .execute(&pool) - .await?; + .execute(&pool) + .await + .map_err(|e| anyhow!("Failed to execute initial query: {}", e))?; - Ok(Self { - pool: pool.clone(), - world_address, - executor, - model_cache: Arc::new(ModelCache::new(pool)), - }) + let sql = Self { + pool: pool.clone(), + world_address, + executor, + model_cache: Arc::new(ModelCache::new(pool)), + }; + + // Handle potential send error + sql.executor.send(QueryMessage::other( + "Initial setup query".to_string(), + vec![] + )).map_err(|e| anyhow!("Failed to send initial setup message: {}", e))?; + + Ok(sql)These changes will provide more robust error handling and ensure that the initial setup message is sent successfully.
Line range hint
91-108
: Ohayo, sensei! Let's enhanceset_last_pending_block_world_tx
for better robustness.The changes align well with the new message-passing architecture. However, we can improve error handling and code structure:
- The error from
self.executor.send()
is being propagated, which is good, but we might want to provide more context.- We can simplify the code by using
map_or
instead ofif let
.Consider applying the following changes:
pub fn set_last_pending_block_world_tx( &mut self, last_pending_block_world_tx: Option<Felt>, ) -> Result<()> { - let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx { - Argument::String(format!("{:#x}", f)) - } else { - Argument::Null - }; + let last_pending_block_world_tx = last_pending_block_world_tx + .map_or(Argument::Null, |f| Argument::String(format!("{:#x}", f))); let id = Argument::FieldElement(self.world_address); - self.executor.send(QueryMessage::other( - "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?".to_string(), - vec![last_pending_block_world_tx, id], - ))?; + self.executor.send(QueryMessage::other( + "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?".to_string(), + vec![last_pending_block_world_tx, id], + )).map_err(|e| anyhow!("Failed to send set_last_pending_block_world_tx message: {}", e))?; Ok(()) }These changes simplify the code and provide more context in case of a send error.
Line range hint
416-468
: Ohayo, sensei! Let's enhancestore_transaction
for better robustness and clarity.The changes align with the new message-passing architecture, but we can make some improvements:
- Error handling for
self.executor.send()
can be enhanced with more context.- The early return for unsupported transaction types can be made more explicit.
- The match expression can be simplified using the
if let
pattern.Consider applying the following changes:
pub fn store_transaction( &mut self, transaction: &Transaction, transaction_id: &str, block_timestamp: u64, ) -> Result<()> { let id = Argument::String(transaction_id.to_string()); - let transaction_type = match transaction { - Transaction::Invoke(_) => "INVOKE", - Transaction::L1Handler(_) => "L1_HANDLER", - _ => return Ok(()), + let (transaction_type, transaction_hash, sender_address, calldata, max_fee, signature, nonce) = match transaction { + Transaction::Invoke(InvokeTransaction::V1(invoke_v1_transaction)) => ( + "INVOKE", + Argument::FieldElement(invoke_v1_transaction.transaction_hash), + Argument::FieldElement(invoke_v1_transaction.sender_address), + Argument::String(felts_sql_string(&invoke_v1_transaction.calldata)), + Argument::FieldElement(invoke_v1_transaction.max_fee), + Argument::String(felts_sql_string(&invoke_v1_transaction.signature)), + Argument::FieldElement(invoke_v1_transaction.nonce), + ), + Transaction::L1Handler(l1_handler_transaction) => ( + "L1_HANDLER", + Argument::FieldElement(l1_handler_transaction.transaction_hash), + Argument::FieldElement(l1_handler_transaction.contract_address), + Argument::String(felts_sql_string(&l1_handler_transaction.calldata)), + Argument::FieldElement(Felt::ZERO), + Argument::String("".to_string()), + Argument::FieldElement((l1_handler_transaction.nonce).into()), + ), + _ => return Ok(()), // Explicitly return for unsupported transaction types }; - let (transaction_hash, sender_address, calldata, max_fee, signature, nonce) = - match transaction { - Transaction::Invoke(InvokeTransaction::V1(invoke_v1_transaction)) => ( - Argument::FieldElement(invoke_v1_transaction.transaction_hash), - Argument::FieldElement(invoke_v1_transaction.sender_address), - Argument::String(felts_sql_string(&invoke_v1_transaction.calldata)), - Argument::FieldElement(invoke_v1_transaction.max_fee), - Argument::String(felts_sql_string(&invoke_v1_transaction.signature)), - Argument::FieldElement(invoke_v1_transaction.nonce), - ), - Transaction::L1Handler(l1_handler_transaction) => ( - Argument::FieldElement(l1_handler_transaction.transaction_hash), - Argument::FieldElement(l1_handler_transaction.contract_address), - Argument::String(felts_sql_string(&l1_handler_transaction.calldata)), - Argument::FieldElement(Felt::ZERO), // has no max_fee - Argument::String("".to_string()), // has no signature - Argument::FieldElement((l1_handler_transaction.nonce).into()), - ), - _ => return Ok(()), - }; - self.executor.send(QueryMessage::other( - "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \ - max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \ - ?, ?, ?)" - .to_string(), - vec![ - id, - transaction_hash, - sender_address, - calldata, - max_fee, - signature, - nonce, - Argument::String(transaction_type.to_string()), - Argument::String(utc_dt_string_from_timestamp(block_timestamp)), - ], - ))?; + self.executor.send(QueryMessage::other( + "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \ + max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \ + ?, ?, ?)" + .to_string(), + vec![ + id, + transaction_hash, + sender_address, + calldata, + max_fee, + signature, + nonce, + Argument::String(transaction_type.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + )).map_err(|e| anyhow!("Failed to send store_transaction message: {}", e))?; Ok(()) }These changes simplify the code structure, make the early return more explicit, and provide more context in case of a send error.
Line range hint
470-492
: Ohayo, sensei! Let's optimizestore_event
for better performance and error handling.The changes align with the new message-passing architecture, but we can make some improvements:
- Error handling for
self.executor.send()
can be enhanced with more context.- The SQL query string is being constructed in a way that might be prone to SQL injection. While the use of parameterized queries mitigates this risk, it's still a good practice to use constants for table and column names.
- The use of
QueryMessage::new
withQueryType::StoreEvent
is good, but we can make it more explicit.Consider applying the following changes:
+ const EVENTS_TABLE: &str = "events"; + const ID_COLUMN: &str = "id"; + const KEYS_COLUMN: &str = "keys"; + const DATA_COLUMN: &str = "data"; + const TRANSACTION_HASH_COLUMN: &str = "transaction_hash"; + const EXECUTED_AT_COLUMN: &str = "executed_at"; pub fn store_event( &mut self, event_id: &str, event: &Event, transaction_hash: Felt, block_timestamp: u64, ) -> Result<()> { let id = Argument::String(event_id.to_string()); let keys = Argument::String(felts_sql_string(&event.keys)); let data = Argument::String(felts_sql_string(&event.data)); let hash = Argument::FieldElement(transaction_hash); let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp)); + let query = format!( + "INSERT OR IGNORE INTO {table} ({id}, {keys}, {data}, {transaction_hash}, {executed_at}) \ + VALUES (?, ?, ?, ?, ?) RETURNING *", + table = EVENTS_TABLE, + id = ID_COLUMN, + keys = KEYS_COLUMN, + data = DATA_COLUMN, + transaction_hash = TRANSACTION_HASH_COLUMN, + executed_at = EXECUTED_AT_COLUMN + ); - self.executor.send(QueryMessage::new( - "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ - (?, ?, ?, ?, ?) RETURNING *" - .to_string(), - vec![id, keys, data, hash, executed_at], - QueryType::StoreEvent, - ))?; + self.executor.send(QueryMessage::new( + query, + vec![id, keys, data, hash, executed_at], + QueryType::StoreEvent, + )).map_err(|e| anyhow!("Failed to send store_event message: {}", e))?; Ok(()) }These changes improve code readability, reduce the risk of SQL injection (although it was already low due to parameterized queries), and provide more context in case of a send error.
Line range hint
496-570
: Ohayo, sensei! Let's enhancebuild_register_queries_recursive
for better robustness and clarity.The changes align with the new message-passing architecture, but we can make some improvements:
- Error handling for
self.executor.send()
can be enhanced with more context.- The method is quite complex and could benefit from some refactoring to improve readability and maintainability.
- The use of
unwrap()
in the method could lead to panics in case of errors.Consider the following improvements:
- Add error context to
self.executor.send()
calls:- self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; + self.executor.send(QueryMessage::other(statement.to_string(), arguments)) + .map_err(|e| anyhow!("Failed to send register query message: {}", e))?;
- Refactor the method to reduce complexity:
fn build_register_queries_recursive( &mut self, selector: Felt, model: &Ty, path: Vec<String>, model_idx: &mut i64, block_timestamp: u64, array_idx: &mut usize, parent_array_idx: &mut usize, ) -> Result<()> { if let Ty::Enum(e) = model { if e.options.iter().all(|o| matches!(&o.ty, Ty::Tuple(t) if t.is_empty())) { return Ok(()); } } self.build_model_query( selector, path.clone(), model, *model_idx, block_timestamp, *array_idx, *parent_array_idx, )?; self.process_model_members(selector, model, path, model_idx, block_timestamp, array_idx, parent_array_idx) } fn process_model_members( &mut self, selector: Felt, model: &Ty, path: Vec<String>, model_idx: &mut i64, block_timestamp: u64, array_idx: &mut usize, parent_array_idx: &mut usize, ) -> Result<()> { let mut build_member = |pathname: &str, member: &Ty| -> Result<()> { if matches!(member, Ty::Primitive(_) | Ty::ByteArray(_)) { return Ok(()); } let mut path_clone = path.clone(); path_clone.push(pathname.to_string()); self.build_register_queries_recursive( selector, member, path_clone, &mut (*model_idx + 1), block_timestamp, &mut (*array_idx + if matches!(member, Ty::Array(_)) { 1 } else { 0 }), &mut (*parent_array_idx + if matches!(model, Ty::Array(_)) { 1 } else { 0 }), ) }; match model { Ty::Struct(s) => { for member in &s.children { build_member(&member.name, &member.ty)?; } } Ty::Tuple(t) => { for (idx, member) in t.iter().enumerate() { build_member(&format!("_{}", idx), member)?; } } Ty::Array(array) => { build_member("data", &array[0])?; } Ty::Enum(e) => { for child in &e.options { if let Ty::Tuple(t) = &child.ty { if t.is_empty() { continue; } } build_member(&child.name, &child.ty)?; } } _ => {} } Ok(()) }
- Replace
unwrap()
with proper error handling:- options.unwrap_or(Argument::Null), + options.unwrap_or_else(|| Argument::Null),These changes will improve the robustness and readability of the code, making it easier to maintain and less prone to panics.
Line range hint
572-803
: Ohayo, sensei! Let's fortifybuild_set_entity_queries_recursive
against SQL injection and improve its structure.The changes align with the new message-passing architecture, but there are several areas we can enhance:
- SQL Injection Prevention: The method constructs SQL queries using string interpolation, which can be vulnerable to SQL injection attacks.
- Error Handling: The error handling for
self.executor.send()
can be improved with more context.- Code Complexity: The method is quite long and complex, making it difficult to understand and maintain.
Let's address these issues:
- SQL Injection Prevention:
Replace string interpolation with parameterized queries wherever possible. For example:- let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); + let statement = "DELETE FROM ? WHERE entity_id = ?"; - self.executor.send(QueryMessage::other(statement, vec![Argument::String(entity_id.to_string())]))?; + self.executor.send(QueryMessage::other( + statement.to_string(), + vec![Argument::String(table_id.clone()), Argument::String(entity_id.to_string())] + ))?;
- Error Handling:
Add more context to error messages:- self.executor.send(QueryMessage::other(query, arguments))?; + self.executor.send(QueryMessage::other(query, arguments)) + .map_err(|e| anyhow!("Failed to send set entity query message: {}", e))?;
- Code Complexity:
Consider breaking down this method into smaller, more manageable functions. For example:fn build_set_entity_queries_recursive( &mut self, path: Vec<String>, event_id: &str, entity_id: (&str, IsEventMessage), entity: (&Ty, IsStoreUpdate), block_timestamp: u64, indexes: &Vec<i64>, ) -> Result<()> { let (entity_id, is_event_message) = entity_id; let (entity, is_store_update_member) = entity; match entity { Ty::Struct(s) => self.handle_struct(path, event_id, entity_id, is_event_message, s, is_store_update_member, block_timestamp, indexes), Ty::Enum(e) => self.handle_enum(path, event_id, entity_id, is_event_message, e, is_store_update_member, block_timestamp, indexes), Ty::Tuple(t) => self.handle_tuple(path, event_id, entity_id, is_event_message, t, is_store_update_member, block_timestamp, indexes), Ty::Array(array) => self.handle_array(path, event_id, entity_id, is_event_message, array, is_store_update_member, block_timestamp, indexes), _ => Ok(()), } } fn handle_struct(&mut self, path: Vec<String>, event_id: &str, entity_id: &str, is_event_message: bool, s: &Struct, is_store_update_member: bool, block_timestamp: u64, indexes: &Vec<i64>) -> Result<()> { // ... implementation ... } // Similar methods for handle_enum, handle_tuple, and handle_arrayThese changes will significantly improve the security, robustness, and maintainability of the code. Consider implementing these suggestions in stages, starting with the SQL injection prevention, then improving error handling, and finally refactoring for reduced complexity.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
- crates/torii/core/src/executor.rs (1 hunks)
- crates/torii/core/src/sql.rs (37 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- crates/torii/core/src/executor.rs
🔇 Additional comments not posted (1)
crates/torii/core/src/sql.rs (1)
Line range hint
805-884
: **Ohayo, sensei! Let's fortify `build_delete_entity
crates/torii/core/src/sql.rs
Outdated
pub fn set_head(&mut self, head: u64) -> Result<()> { | ||
let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); | ||
let id = Argument::FieldElement(self.world_address); | ||
self.query_queue.enqueue( | ||
"UPDATE contracts SET head = ? WHERE id = ?", | ||
self.executor.send(QueryMessage::other( | ||
"UPDATE contracts SET head = ? WHERE id = ?".to_string(), | ||
vec![head, id], | ||
QueryType::Other, | ||
); | ||
))?; | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Let's address potential integer conversion issues in set_head
.
The changes align well with the new message-passing architecture. However, there are two points that need attention:
- The use of
try_into().expect("doesn't fit in u64")
could potentially panic if the conversion fails. This is risky, especially for a public method. - The error from
self.executor.send()
is being propagated, which is good, but we might want to provide more context.
Consider applying the following changes to improve error handling and prevent potential panics:
pub fn set_head(&mut self, head: u64) -> Result<()> {
- let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
+ let head = Argument::Int(head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?);
let id = Argument::FieldElement(self.world_address);
- self.executor.send(QueryMessage {
- statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
- arguments: vec![head, id],
- query_type: QueryType::Other,
- })?;
+ self.executor.send(QueryMessage::other(
+ "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
+ vec![head, id],
+ )).map_err(|e| anyhow!("Failed to send set_head message: {}", e))?;
Ok(())
}
These changes will prevent panics from integer conversion and provide more context in case of a send error.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn set_head(&mut self, head: u64) -> Result<()> { | |
let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); | |
let id = Argument::FieldElement(self.world_address); | |
self.query_queue.enqueue( | |
"UPDATE contracts SET head = ? WHERE id = ?", | |
self.executor.send(QueryMessage::other( | |
"UPDATE contracts SET head = ? WHERE id = ?".to_string(), | |
vec![head, id], | |
QueryType::Other, | |
); | |
))?; | |
Ok(()) | |
pub fn set_head(&mut self, head: u64) -> Result<()> { | |
let head = Argument::Int(head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?); | |
let id = Argument::FieldElement(self.world_address); | |
self.executor.send(QueryMessage::other( | |
"UPDATE contracts SET head = ? WHERE id = ?".to_string(), | |
vec![head, id], | |
)).map_err(|e| anyhow!("Failed to send set_head message: {}", e))?; | |
Ok(()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good suggestion, we should apply this. (hmm github doesn't show it properly, but i meant the coderabbit suggestion for this part)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lambda-0x Ohayo, sensei! I'm glad you agree with the suggestion. 🐰🎉
pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) -> Result<()> { | ||
let resource = Argument::FieldElement(*resource); | ||
let uri = Argument::String(uri.to_string()); | ||
let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp)); | ||
|
||
self.query_queue.enqueue( | ||
self.executor.send(QueryMessage::other( | ||
"INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \ | ||
UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \ | ||
updated_at=CURRENT_TIMESTAMP", | ||
updated_at=CURRENT_TIMESTAMP" | ||
.to_string(), | ||
vec![resource, uri, executed_at], | ||
QueryType::Other, | ||
); | ||
))?; | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Let's optimize set_metadata
for better performance and error handling.
The changes align with the new message-passing architecture, but we can make some improvements:
- Error handling for
self.executor.send()
can be enhanced with more context. - The SQL query string is being constructed in a way that might be prone to SQL injection. While the use of parameterized queries mitigates this risk, it's still a good practice to use constants for table and column names.
Consider applying the following changes:
+ const METADATA_TABLE: &str = "metadata";
+ const ID_COLUMN: &str = "id";
+ const URI_COLUMN: &str = "uri";
+ const EXECUTED_AT_COLUMN: &str = "executed_at";
+ const UPDATED_AT_COLUMN: &str = "updated_at";
pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) -> Result<()> {
let resource = Argument::FieldElement(*resource);
let uri = Argument::String(uri.to_string());
let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp));
- self.executor.send(QueryMessage::other(
- "INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \
- UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \
- updated_at=CURRENT_TIMESTAMP"
- .to_string(),
- vec![resource, uri, executed_at],
- ))?;
+ let query = format!(
+ "INSERT INTO {table} ({id}, {uri}, {executed_at}) VALUES (?, ?, ?) \
+ ON CONFLICT({id}) DO UPDATE SET \
+ {id}=excluded.{id}, \
+ {executed_at}=excluded.{executed_at}, \
+ {updated_at}=CURRENT_TIMESTAMP",
+ table = METADATA_TABLE,
+ id = ID_COLUMN,
+ uri = URI_COLUMN,
+ executed_at = EXECUTED_AT_COLUMN,
+ updated_at = UPDATED_AT_COLUMN
+ );
+
+ self.executor.send(QueryMessage::other(
+ query,
+ vec![resource, uri, executed_at],
+ )).map_err(|e| anyhow!("Failed to send set_metadata message: {}", e))?;
Ok(())
}
These changes improve code readability, reduce the risk of SQL injection (although it was already low due to parameterized queries), and provide more context in case of a send error.
Committable suggestion was skipped due to low confidence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
- crates/torii/core/src/engine.rs (9 hunks)
- crates/torii/core/src/executor.rs (1 hunks)
- crates/torii/core/src/sql_test.rs (7 hunks)
- crates/torii/libp2p/src/tests.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- crates/torii/core/src/engine.rs
- crates/torii/core/src/executor.rs
- crates/torii/core/src/sql_test.rs
🔇 Additional comments not posted (5)
crates/torii/libp2p/src/tests.rs (5)
538-538
: Ohayo, sensei! New import looks good!The addition of
broadcast
fromtokio::sync
is appropriate for the newExecutor
implementation. It will allow for efficient message distribution in the asynchronous context.
540-540
: Ohayo, sensei! Crucial import detected!The addition of
Executor
fromtorii_core::executor
is a key import for this PR. It introduces the new asynchronous execution model, which is at the core of the optimization changes.
Line range hint
1-724
: Ohayo, sensei! Overall changes look promising!The introduction of the
Executor
-based execution model in this test file is a significant improvement. These changes should lead to more efficient handling of database operations and better asynchronous processing. The updates to the test cases ensure that the new model is properly tested.A few key points to note:
- The new
Executor
component is well-integrated into the existing test structure.- The changes to
Sql::new()
anddb.execute()
reflect the shift in the execution model.- Error handling could be improved in the executor spawning process.
These changes align well with the PR objective of moving away from the query queue for executing transactions. Great work on optimizing the core functionality!
569-569
: Ohayo, sensei! Sql initialization updated nicely!The addition of the
sender
parameter toSql::new()
aligns well with the newExecutor
implementation. This change facilitates communication between the SQL component and the Executor.Let's make sure the
Sql::new()
method signature has been updated accordingly:This will help ensure consistency across the codebase.
598-598
: Ohayo, sensei! Execution model update detected!The removal of
await
fromdb.execute()
aligns with the newExecutor
-based execution model. This change reflects the shift to a more efficient asynchronous processing approach.Let's verify the updated
execute()
method signature to ensure it matches the new execution model:This will help confirm that the method signature has been updated consistently across the codebase.
crates/torii/libp2p/src/tests.rs
Outdated
let (shutdown_tx, _) = broadcast::channel(1); | ||
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx).await.unwrap(); | ||
tokio::spawn(async move { | ||
executor.run().await.unwrap(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Executor setup looks solid!
The initialization and spawning of the Executor
are well-implemented. The use of a broadcast channel for shutdown is a nice touch for clean termination.
One small suggestion to improve robustness:
Consider adding error handling for the spawned task:
tokio::spawn(async move {
- executor.run().await.unwrap();
+ if let Err(e) = executor.run().await {
+ eprintln!("Executor error: {}", e);
+ }
});
This will prevent silent failures if the executor encounters an error.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let (shutdown_tx, _) = broadcast::channel(1); | |
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx).await.unwrap(); | |
tokio::spawn(async move { | |
executor.run().await.unwrap(); | |
}); | |
let (shutdown_tx, _) = broadcast::channel(1); | |
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx).await.unwrap(); | |
tokio::spawn(async move { | |
if let Err(e) = executor.run().await { | |
eprintln!("Executor error: {}", e); | |
} | |
}); |
crates/torii/core/src/sql.rs
Outdated
pub fn set_head(&mut self, head: u64) -> Result<()> { | ||
let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); | ||
let id = Argument::FieldElement(self.world_address); | ||
self.query_queue.enqueue( | ||
"UPDATE contracts SET head = ? WHERE id = ?", | ||
self.executor.send(QueryMessage::other( | ||
"UPDATE contracts SET head = ? WHERE id = ?".to_string(), | ||
vec![head, id], | ||
QueryType::Other, | ||
); | ||
))?; | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good suggestion, we should apply this. (hmm github doesn't show it properly, but i meant the coderabbit suggestion for this part)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
- bin/torii/src/main.rs (4 hunks)
🔇 Additional comments (3)
bin/torii/src/main.rs (3)
30-35
: Ohayo, sensei! New imports look good.The added imports for
NamedTempFile
,Sender
, andExecutor
are necessary for the new functionality introduced in the main function. They align well with the changes made in the rest of the file.
69-69
: Ohayo, sensei! Good update to theArgs
struct.The change of the default value for the
database
argument from":memory:"
to an empty string is a smart move. It allows for more flexible handling of the database path in the main function, particularly when creating a temporary file if no specific path is provided.
199-199
: Ohayo, sensei! Good update to theSql::new()
call.The addition of the
sender
parameter to theSql::new()
method call is consistent with the newExecutor
implementation. You've correctly passed thesender
from the newly createdExecutor
, which should allow for proper communication between the SQL module and the executor.
let tempfile = NamedTempFile::new()?; | ||
let database_path = | ||
if args.database.is_empty() { tempfile.path().to_str().unwrap() } else { &args.database }; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Consider more robust error handling for temporary file path.
The new logic for handling the database path is a good improvement. However, using unwrap()
on tempfile.path().to_str()
could potentially panic if the path contains non-UTF-8 characters. Consider using a more robust error handling approach.
Here's a suggestion to improve error handling:
let database_path = if args.database.is_empty() {
tempfile.path().to_str().ok_or_else(|| anyhow::anyhow!("Failed to get temporary file path"))?
} else {
&args.database
};
This change will propagate the error up the call stack instead of panicking, allowing for more graceful error handling.
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?; | ||
tokio::spawn(async move { | ||
executor.run().await.unwrap(); | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Consider improving error handling in the executor task.
The creation and running of the Executor
in a separate tokio task is a good approach for concurrent execution. However, using unwrap()
in the spawned task could lead to a panic if an error occurs, which might cause the entire application to crash.
Consider handling potential errors more gracefully:
tokio::spawn(async move {
if let Err(e) = executor.run().await {
error!("Executor encountered an error: {:?}", e);
// Optionally, you could send a shutdown signal here
// let _ = shutdown_tx.send(());
}
});
This change will log the error instead of panicking, allowing the application to continue running even if the executor encounters an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (13)
crates/torii/core/src/sql.rs (13)
40-64
: Ohayo, sensei! LGTM with a minor suggestionThe changes in the
new
method align well with the new asynchronous message-passing model. The addition of theexecutor
parameter and the use ofQueryMessage::other
are appropriate.However, we could improve the error handling slightly:
Consider updating the error handling for the
execute
call:- db.execute().await?; + db.execute().await.map_err(|e| anyhow!("Failed to execute initial query: {}", e))?;This change provides more context in case of an execution error.
Line range hint
104-137
: Ohayo, sensei! Good changes, with room for improvementThe modifications to
set_last_pending_block_world_tx
andset_last_pending_block_tx
are in line with the new asynchronous model. However, we can enhance the error handling:Consider updating the error handling in both methods:
- self.executor.send(QueryMessage::other( + self.executor.send(QueryMessage::other( "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?".to_string(), vec![last_pending_block_world_tx, id], - ))?; + )).map_err(|e| anyhow!("Failed to send last_pending_block_world_tx update: {}", e))?;Apply a similar change to
set_last_pending_block_tx
. This improvement provides more context in case of send errors.
Line range hint
173-188
: Ohayo, sensei! Good transition, but let's enhance error handlingThe change to use
QueryMessage::new
withQueryType::RegisterModel
is a good adaptation to the new asynchronous model. It allows for more specific handling of different query types.However, we can improve the error handling:
- self.executor.send(QueryMessage::new( + self.executor.send(QueryMessage::new( insert_models.to_string(), arguments, QueryType::RegisterModel, - ))?; + )).map_err(|e| anyhow!("Failed to send register model query: {}", e))?;This change provides more context in case of a send error, making debugging easier.
Line range hint
251-272
: Ohayo, sensei! Good adaptation, but let's refine error handlingThe changes in the
set_entity
method align well with the new asynchronous model:
- The use of
QueryMessage::new
withQueryType::SetEntity
allows for specific handling of entity setting operations.- The separate
QueryMessage::other
for entity_model insertion maintains flexibility for different query types.However, we can enhance the error handling:
- self.executor.send(QueryMessage::new( + self.executor.send(QueryMessage::new( insert_entities.to_string(), arguments, QueryType::SetEntity(entity.clone()), - ))?; + )).map_err(|e| anyhow!("Failed to send set entity query: {}", e))?; - self.executor.send(QueryMessage::other( + self.executor.send(QueryMessage::other( "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ model_id) DO NOTHING" .to_string(), vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - ))?; + )).map_err(|e| anyhow!("Failed to send entity_model insertion query: {}", e))?;These changes provide more context in case of send errors, facilitating easier debugging and error tracing.
Line range hint
304-329
: Ohayo, sensei! Good progress, but let's polish error handlingThe modifications to the
set_event_message
method are well-aligned with the new asynchronous model:
- The use of
QueryMessage::new
withQueryType::EventMessage
allows for specific handling of event message operations.- The separate
QueryMessage::other
for event_model insertion maintains flexibility for different query types.However, we can refine the error handling:
- self.executor.send(QueryMessage::new( + self.executor.send(QueryMessage::new( insert_entities.to_string(), vec![ Argument::String(entity_id.clone()), Argument::String(keys_str), Argument::String(event_id.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], QueryType::EventMessage(entity.clone()), - ))?; + )).map_err(|e| anyhow!("Failed to send event message query: {}", e))?; - self.executor.send(QueryMessage::other( + self.executor.send(QueryMessage::other( "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ model_id) DO NOTHING" .to_string(), vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - ))?; + )).map_err(|e| anyhow!("Failed to send event_model insertion query: {}", e))?;These enhancements provide more context in case of send errors, which will aid in debugging and error tracing.
347-356
: Ohayo, sensei! Solid improvements, but let's enhance error handlingThe changes in the
delete_entity
method are well-implemented:
- The use of
QueryMessage::new
withQueryType::DeleteEntity
aligns with the new asynchronous model.- The introduction of
DeleteEntityQuery
provides a structured way to package deletion information, which is excellent for maintainability and clarity.However, we can further improve the error handling:
- self.executor.send(QueryMessage::new( + self.executor.send(QueryMessage::new( "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?".to_string(), vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))], QueryType::DeleteEntity(DeleteEntityQuery { entity_id: entity_id.clone(), event_id: event_id.to_string(), block_timestamp: utc_dt_string_from_timestamp(block_timestamp), ty: entity.clone(), }), - ))?; + )).map_err(|e| anyhow!("Failed to send delete entity query: {}", e))?;This change provides more context in case of a send error, making debugging easier.
361-374
: Ohayo, sensei! Good adaptation, but let's refine error handlingThe changes in the
set_metadata
method align well with the new asynchronous model:
- The method now returns
Result<()>
, allowing for proper error propagation.- The use of
QueryMessage::other
is consistent with the new execution model.However, we can enhance the error handling:
- self.executor.send(QueryMessage::other( + self.executor.send(QueryMessage::other( "INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \ UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \ updated_at=CURRENT_TIMESTAMP" .to_string(), vec![resource, uri, executed_at], - ))?; + )).map_err(|e| anyhow!("Failed to send set_metadata query: {}", e))?;This change provides more context in case of a send error, facilitating easier debugging and error tracing.
403-403
: Ohayo, sensei! Good transition, but let's polish error handlingThe change to use
QueryMessage::other
in theupdate_metadata
method aligns well with the new asynchronous execution model.However, we can enhance the error handling:
- self.executor.send(QueryMessage::other(statement, arguments))?; + self.executor.send(QueryMessage::other(statement, arguments)) + .map_err(|e| anyhow!("Failed to send update_metadata query: {}", e))?;This improvement provides more context in case of a send error, making debugging easier.
Line range hint
463-481
: Ohayo, sensei! Good adaptation, but let's refine error handlingThe change to use
QueryMessage::other
in thestore_transaction
method is well-aligned with the new asynchronous execution model.However, we can enhance the error handling:
- self.executor.send(QueryMessage::other( + self.executor.send(QueryMessage::other( "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \ max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \ ?, ?, ?)" .to_string(), vec![ id, transaction_hash, sender_address, calldata, max_fee, signature, nonce, Argument::String(transaction_type.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], - ))?; + )).map_err(|e| anyhow!("Failed to send store_transaction query: {}", e))?;This improvement provides more context in case of a send error, facilitating easier debugging and error tracing.
497-505
: Ohayo, sensei! Excellent transition, but let's enhance error handlingThe changes in the
store_event
method are well-implemented:
- The use of
QueryMessage::new
withQueryType::StoreEvent
aligns perfectly with the new asynchronous model.- The specific
QueryType::StoreEvent
allows for more granular handling of different query types, which is a great improvement.However, we can further improve the error handling:
- self.executor.send(QueryMessage::new( + self.executor.send(QueryMessage::new( "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ (?, ?, ?, ?, ?) RETURNING *" .to_string(), vec![id, keys, data, hash, executed_at], QueryType::StoreEvent, - ))?; + )).map_err(|e| anyhow!("Failed to send store_event query: {}", e))?;This change provides more context in case of a send error, making debugging easier.
599-691
: Ohayo, sensei! Good adaptation, but let's refine error handlingThe changes in the
build_set_entity_queries_recursive
method are well-implemented:
- The use of
executor: &mut UnboundedSender<QueryMessage>
instead of&mut Self
provides more flexibility and aligns with the new asynchronous model.- The transition to
QueryMessage::other
is consistent with the new execution model.However, we can enhance the error handling. Here's an example of how to improve it:
- executor.send(QueryMessage::other(statement, arguments))?; + executor.send(QueryMessage::other(statement, arguments)) + .map_err(|e| anyhow!("Failed to send entity query: {}", e))?;Apply this pattern throughout the method where
executor.send
is called. This change provides more context in case of send errors, facilitating easier debugging and error tracing.
Line range hint
824-897
: Ohayo, sensei! Good transition, but let's polish error handlingThe change to use
QueryMessage::other
in thebuild_delete_entity_queries_recursive
method aligns well with the new asynchronous execution model.However, we can enhance the error handling. Here's an example of how to improve it:
- self.executor.send(QueryMessage::other( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - ))?; + )).map_err(|e| anyhow!("Failed to send delete entity query: {}", e))?;Apply this pattern throughout the method where
self.executor.send
is called. This change provides more context in case of send errors, making debugging easier.
Line range hint
910-1137
: Ohayo, sensei! Solid adaptation, but let's enhance error handlingThe transition to use
QueryMessage::other
in thebuild_model_query
method is well-aligned with the new asynchronous execution model. This change maintains consistency throughout the codebase.However, we can improve the error handling. Here's an example of how to enhance it:
- self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; + self.executor.send(QueryMessage::other(statement.to_string(), arguments)) + .map_err(|e| anyhow!("Failed to send model member query: {}", e))?;Apply this pattern throughout the method where
self.executor.send
is called. This change provides more context in case of send errors, facilitating easier debugging and error tracing.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
- crates/torii/core/src/sql.rs (37 hunks)
🔇 Additional comments (4)
crates/torii/core/src/sql.rs (4)
15-15
: Ohayo, sensei! LGTM: Transition to asynchronous message-passing modelThe changes in imports and the
Sql
struct definition align well with the transition from a queue-based execution model to an asynchronous message-passing model. This shift should improve the overall performance and scalability of the system.Also applies to: 18-18, 35-35
89-101
: Ohayo, sensei! Excellent improvements inset_head
The changes in the
set_head
method are well-implemented:
- The method now returns
Result<()>
, allowing for proper error propagation.- The use of
QueryMessage::other
aligns with the new asynchronous model.- The improved error handling for integer conversion addresses potential overflow issues.
These changes enhance both the reliability and consistency of the code.
518-518
: Ohayo, sensei! Subtle but important improvementThe change in the
build_register_queries_recursive
method to returnResult<()>
instead of()
is a good improvement. This change allows for better error propagation and is consistent with the error handling enhancements throughout the file. It's a small but significant step towards more robust error management.
1142-1146
: Ohayo, sensei! Excellent refactoring of theexecute
methodThe changes in the
execute
method are well-implemented and align perfectly with the new asynchronous message-passing model:
- Changing the method signature from
&mut self
to&self
allows for more flexible usage, as it no longer requires mutable access to the entireSql
struct.- The use of
QueryMessage::execute_recv()
to create a new channel for execution is a clever way to handle the execution process asynchronously.- Sending an execution message through the executor leverages the new message-passing model effectively.
These changes should improve the overall performance and concurrency of the system, allowing for better scalability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
- crates/torii/core/src/executor.rs (1 hunks)
🔇 Additional comments (4)
crates/torii/core/src/executor.rs (4)
23-30
: Enums are well-defined and enhance code clarity.Ohayo, sensei! The
Argument
enum effectively represents various SQL argument types, improving type safety and code readability.
33-38
: BrokerMessage enum cleanly encapsulates message types.The
BrokerMessage
enum provides a clear structure for different broker messages, facilitating better message handling.
138-168
: Implement transaction rollback on error inrun
method.Ohayo, sensei! When
handle_query_type
returns an error in therun
method, the transaction is not rolled back, which could lead to database inconsistencies. Consider rolling back the transaction in case of errors to maintain data integrity.
178-276
: Ensure transaction is properly managed inhandle_query_type
.In the
handle_query_type
method, errors during query execution may leave the transaction open. To maintain database integrity, consider implementing error handling to rollback the transaction when an error occurs.
pub struct QueryMessage { | ||
pub statement: String, | ||
pub arguments: Vec<Argument>, | ||
pub query_type: QueryType, | ||
tx: Option<oneshot::Sender<Result<()>>>, | ||
} | ||
|
||
impl QueryMessage { | ||
pub fn new(statement: String, arguments: Vec<Argument>, query_type: QueryType) -> Self { | ||
Self { statement, arguments, query_type, tx: None } | ||
} | ||
|
||
pub fn new_recv( | ||
statement: String, | ||
arguments: Vec<Argument>, | ||
query_type: QueryType, | ||
) -> (Self, oneshot::Receiver<Result<()>>) { | ||
let (tx, rx) = oneshot::channel(); | ||
(Self { statement, arguments, query_type, tx: Some(tx) }, rx) | ||
} | ||
|
||
pub fn other(statement: String, arguments: Vec<Argument>) -> Self { | ||
Self { statement, arguments, query_type: QueryType::Other, tx: None } | ||
} | ||
|
||
pub fn other_recv( | ||
statement: String, | ||
arguments: Vec<Argument>, | ||
) -> (Self, oneshot::Receiver<Result<()>>) { | ||
let (tx, rx) = oneshot::channel(); | ||
(Self { statement, arguments, query_type: QueryType::Other, tx: Some(tx) }, rx) | ||
} | ||
|
||
pub fn execute() -> Self { | ||
Self { | ||
statement: "".to_string(), | ||
arguments: vec![], | ||
query_type: QueryType::Execute, | ||
tx: None, | ||
} | ||
} | ||
|
||
pub fn execute_recv() -> (Self, oneshot::Receiver<Result<()>>) { | ||
let (tx, rx) = oneshot::channel(); | ||
( | ||
Self { | ||
statement: "".to_string(), | ||
arguments: vec![], | ||
query_type: QueryType::Execute, | ||
tx: Some(tx), | ||
}, | ||
rx, | ||
) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify QueryMessage
constructors using builder pattern.
Ohayo, sensei! Consider using the builder pattern for QueryMessage
to reduce the number of constructor methods and enhance code maintainability.
async fn execute(&mut self) -> Result<()> { | ||
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); | ||
transaction.commit().await?; | ||
|
||
for message in self.publish_queue.drain(..) { | ||
send_broker_message(message); | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors during transaction commit.
Ohayo, sensei! If transaction.commit().await?
fails in the execute
method, the replaced transaction may cause inconsistencies. Please add error handling to manage commit failures appropriately.
Apply this diff to handle errors during transaction commit:
async fn execute(&mut self) -> Result<()> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
- transaction.commit().await?;
+ if let Err(e) = transaction.commit().await {
+ error!(target: LOG_TARGET, error = %e, "Failed to commit transaction.");
+ // Optionally, attempt to rollback or handle the error as needed
+ return Err(e);
+ }
for message in self.publish_queue.drain(..) {
send_broker_message(message);
}
Ok(())
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn execute(&mut self) -> Result<()> { | |
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); | |
transaction.commit().await?; | |
for message in self.publish_queue.drain(..) { | |
send_broker_message(message); | |
} | |
Ok(()) | |
} | |
async fn execute(&mut self) -> Result<()> { | |
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); | |
if let Err(e) = transaction.commit().await { | |
error!(target: LOG_TARGET, error = %e, "Failed to commit transaction."); | |
// Optionally, attempt to rollback or handle the error as needed | |
return Err(e); | |
} | |
for message in self.publish_queue.drain(..) { | |
send_broker_message(message); | |
} | |
Ok(()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job on extracting the executor @Larkooo.
Let's keep an eye on the unbounded channel behavior.
Summary by CodeRabbit
New Features
Executor
component for improved management of database operations.tempfile
crate across multiple components.serial_test
dependency for enhanced testing capabilities.Bug Fixes
Refactor
query_queue
module and adding theexecutor
module.