Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tps metric to contracts table #2468

Merged
merged 16 commits into from
Oct 8, 2024
28 changes: 24 additions & 4 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
// use the start block provided by user if head is 0
let (head, _, _) = self.db.head().await?;
if head == 0 {
self.db.set_head(self.config.start_block)?;
self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?;

Check warning on line 161 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L161

Added line #L161 was not covered by tests
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}
Expand Down Expand Up @@ -389,6 +389,7 @@

let timestamp = data.pending_block.timestamp;

let mut world_txns_count = 0;

Check warning on line 392 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L392

Added line #L392 was not covered by tests
for t in data.pending_block.transactions {
let transaction_hash = t.transaction.transaction_hash();
if let Some(tx) = last_pending_block_tx_cursor {
Expand All @@ -409,7 +410,14 @@
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt.");
self.db.set_head(data.block_number - 1)?;
self.db
.set_head(
data.block_number - 1,
timestamp,
world_txns_count,
self.world.address,
)
.await?;

Check warning on line 420 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L413-L420

Added lines #L413 - L420 were not covered by tests
if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx))?;
}
Expand All @@ -430,6 +438,7 @@
}
}
Ok(true) => {
world_txns_count += 1;

Check warning on line 441 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L441

Added line #L441 was not covered by tests
last_pending_block_world_tx = Some(*transaction_hash);
last_pending_block_tx = Some(*transaction_hash);
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction.");
Expand All @@ -446,7 +455,9 @@

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(data.block_number - 1)?;
self.db
.set_head(data.block_number - 1, timestamp, world_txns_count, self.world.address)
.await?;

Check warning on line 460 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L458-L460

Added lines #L458 - L460 were not covered by tests

if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx))?;
Expand All @@ -466,6 +477,7 @@
pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<EngineHead> {
// Process all transactions
let mut last_block = 0;
let transactions_count = data.transactions.len();
for ((block_number, transaction_hash), events) in data.transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
Expand Down Expand Up @@ -498,7 +510,15 @@
// Process parallelized events
self.process_tasks().await?;

self.db.set_head(data.latest_block_number)?;
let last_block_timestamp = self.get_block_timestamp(data.latest_block_number).await?;
self.db
.set_head(
data.latest_block_number,
last_block_timestamp,
transactions_count as u64,
self.world.address,
)
.await?;

Check warning on line 521 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L521

Added line #L521 was not covered by tests
self.db.set_last_pending_block_world_tx(None)?;
self.db.set_last_pending_block_tx(None)?;

Expand Down
44 changes: 42 additions & 2 deletions crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use crate::simple_broker::SimpleBroker;
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted,
EventMessage as EventMessageUpdated, Model as ModelRegistered,
};

pub(crate) const LOG_TARGET: &str = "torii_core::executor";
Expand All @@ -31,6 +31,7 @@

#[derive(Debug, Clone)]
pub enum BrokerMessage {
SetHead(ContractUpdated),
ModelRegistered(ModelRegistered),
EntityUpdated(EntityUpdated),
EventMessageUpdated(EventMessageUpdated),
Expand All @@ -45,8 +46,17 @@
pub ty: Ty,
}

#[derive(Debug, Clone)]
pub struct SetHeadQuery {
pub head: u64,
pub last_block_timestamp: u64,
pub txns_count: u64,
pub contract_address: Felt,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetHead(SetHeadQuery),
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
EventMessage(Ty),
Expand Down Expand Up @@ -178,6 +188,35 @@
let tx = &mut self.transaction;

match query_type {
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
Comment on lines +191 to +199
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo! Watch out for that sneaky i64 to u64 conversion, sensei!

The conversion from i64 to u64 for previous_block_timestamp could fail if the value is negative. Consider using a safe conversion method or handling potential errors. Here's a suggestion:

let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
    "SELECT last_block_timestamp FROM contracts WHERE id = ?"
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp is negative or doesn't fit in u64"))?;

This approach will provide a more informative error message if the conversion fails.


let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count

Check warning on line 204 in crates/torii/core/src/executor.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor.rs#L204

Added line #L204 was not covered by tests
};
Comment on lines +191 to +205
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo! Potential underflow and division by zero in TPS calculation

Sensei, in the calculation of tps, subtracting previous_block_timestamp from set_head.last_block_timestamp without checking for underflow may lead to incorrect results or panic in debug mode. If set_head.last_block_timestamp is less than previous_block_timestamp, the subtraction will underflow. Consider using checked_sub to safely handle this scenario.

Apply this diff to prevent underflow:

+let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0);
+let tps: u64 = if time_diff != 0 {
+    set_head.txns_count / time_diff
+} else {
+    set_head.txns_count
+};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count
};
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0);
let tps: u64 = if time_diff != 0 {
set_head.txns_count / time_diff
} else {
set_head.txns_count
};

Comment on lines +201 to +205
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo! Let's make that TPS calculation more robust, sensei!

The current TPS calculation might not handle edge cases well. Consider using a more precise calculation method that avoids potential issues with integer division. Here's a suggestion:

let time_diff = set_head.last_block_timestamp.saturating_sub(previous_block_timestamp);
let tps = if time_diff > 0 {
    (set_head.txns_count as f64 / time_diff as f64).round() as u64
} else {
    0 // or another appropriate default value
};

This approach uses floating-point division for more precise results and handles the case where time_diff is zero or when set_head.last_block_timestamp is less than previous_block_timestamp.


query.execute(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)

Check warning on line 208 in crates/torii/core/src/executor.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor.rs#L208

Added line #L208 was not covered by tests
})?;

let row = sqlx::query("UPDATE contracts SET tps = ? WHERE id = ? RETURNING *")
.bind(tps as i64)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?;

let contract = ContractUpdated::from_row(&row)?;
self.publish_queue.push(BrokerMessage::SetHead(contract));
}
QueryType::SetEntity(entity) => {
let row = query.fetch_one(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down Expand Up @@ -289,6 +328,7 @@

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::SetHead(update) => SimpleBroker::publish(update),
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
Expand Down
33 changes: 24 additions & 9 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use tokio::sync::mpsc::UnboundedSender;

use crate::cache::{Model, ModelCache};
use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType};
use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType, SetHeadQuery};
use crate::utils::utc_dt_string_from_timestamp;

type IsEventMessage = bool;
Expand Down Expand Up @@ -86,17 +86,32 @@
))
}

pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(
pub async fn set_head(
&mut self,
head: u64,
last_block_timestamp: u64,
world_txns_count: u64,
contract_address: Felt,
) -> Result<()> {
let head_arg = Argument::Int(
head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?,
);
let last_block_timestamp_arg =
Argument::Int(last_block_timestamp.try_into().map_err(|_| {
anyhow!("Last block timestamp value {} doesn't fit in i64", last_block_timestamp)

Check warning on line 101 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L101

Added line #L101 was not covered by tests
})?);
let id = Argument::FieldElement(self.world_address);
self.executor
.send(QueryMessage::other(
"UPDATE contracts SET head = ? WHERE id = ?".to_string(),
vec![head, id],
))
.map_err(|e| anyhow!("Failed to send set_head message: {}", e))?;

self.executor.send(QueryMessage::new(
"UPDATE contracts SET head = ?, last_block_timestamp = ? WHERE id = ?".to_string(),
vec![head_arg, last_block_timestamp_arg, id],
Comment on lines +106 to +107
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Should world_txns_count and contract_address be included in the SQL update?

Currently, the SQL statement only updates head and last_block_timestamp in the contracts table. If world_txns_count and contract_address need to be persisted in the database, consider including them in the update query.

QueryType::SetHead(SetHeadQuery {
head,
last_block_timestamp,
txns_count: world_txns_count,
contract_address,
}),
))?;

Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,12 @@
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
}

#[derive(FromRow, Deserialize, Debug, Clone, Default)]

Check warning on line 88 in crates/torii/core/src/types.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/types.rs#L88

Added line #L88 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct Contract {
pub head: i64,
pub tps: i64,
pub last_block_timestamp: i64,
pub contract_address: String,
}
21 changes: 18 additions & 3 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import "google/protobuf/empty.proto";

// The World service provides information about the world.
service World {
// Subscribes to updates about the indexer. Like the head block number, tps, etc.
rpc SubscribeIndexer (SubscribeIndexerRequest) returns (stream SubscribeIndexerResponse);

// Retrieves metadata about the World including all the registered components and systems.
rpc WorldMetadata (MetadataRequest) returns (MetadataResponse);
rpc WorldMetadata (WorldMetadataRequest) returns (WorldMetadataResponse);

// Subscribes to models updates.
rpc SubscribeModels (SubscribeModelsRequest) returns (stream SubscribeModelsResponse);
Expand Down Expand Up @@ -38,14 +41,26 @@ service World {
rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse);
}

// A request to subscribe to indexer updates.
message SubscribeIndexerRequest {
bytes contract_address = 1;
}

// A response containing indexer updates.
message SubscribeIndexerResponse {
int64 head = 1;
int64 tps = 2;
int64 last_block_timestamp = 3;
bytes contract_address = 4;
}

// A request to retrieve metadata for a specific world ID.
message MetadataRequest {
message WorldMetadataRequest {

}

// The metadata response contains addresses and class hashes for the world.
message MetadataResponse {
message WorldMetadataResponse {
types.WorldMetadata metadata = 1;
}

Expand Down
49 changes: 43 additions & 6 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
use tonic::transport::Endpoint;

use crate::proto::world::{
world_client, MetadataRequest, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse,
SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest,
world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest,
RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query,
};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -68,7 +71,7 @@
/// Retrieve the metadata of the World.
pub async fn metadata(&mut self) -> Result<dojo_types::WorldMetadata, Error> {
self.inner
.world_metadata(MetadataRequest {})
.world_metadata(WorldMetadataRequest {})

Check warning on line 74 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L74

Added line #L74 was not covered by tests
.await
.map_err(Error::Grpc)
.and_then(|res| {
Expand Down Expand Up @@ -107,6 +110,22 @@
self.inner.retrieve_events(request).await.map_err(Error::Grpc).map(|res| res.into_inner())
}

/// Subscribe to indexer updates.
pub async fn subscribe_indexer(
&mut self,
contract_address: Felt,
) -> Result<IndexerUpdateStreaming, Error> {
let request =
SubscribeIndexerRequest { contract_address: contract_address.to_bytes_be().to_vec() };
let stream = self
.inner
.subscribe_indexer(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(IndexerUpdateStreaming(stream.map_ok(Box::new(|res| res.into()))))
}

Check warning on line 127 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L114-L127

Added lines #L114 - L127 were not covered by tests

/// Subscribe to entities updates of a World.
pub async fn subscribe_entities(
&mut self,
Expand Down Expand Up @@ -282,6 +301,24 @@
}
}

type IndexerMappedStream = MapOk<
tonic::Streaming<SubscribeIndexerResponse>,
Box<dyn Fn(SubscribeIndexerResponse) -> IndexerUpdate + Send>,
>;

#[derive(Debug)]
pub struct IndexerUpdateStreaming(IndexerMappedStream);

impl Stream for IndexerUpdateStreaming {
type Item = <IndexerMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}

Check warning on line 319 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L314-L319

Added lines #L314 - L319 were not covered by tests
}

fn empty_state_update() -> StateUpdate {
StateUpdate {
block_hash: Felt::ZERO,
Expand Down
Loading
Loading