diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 42ca0e4c07..cff1cabda7 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -10,7 +10,7 @@ use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use tokio::sync::RwLock as AsyncRwLock; -use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming}; +use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming}; use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse}; use torii_grpc::types::schema::Entity; use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, Query}; @@ -156,6 +156,7 @@ impl Client { Ok(()) } + /// A direct stream to grpc subscribe starknet events pub async fn on_starknet_event( &self, keys: Option, @@ -164,4 +165,17 @@ impl Client { let stream = grpc_client.subscribe_events(keys).await?; Ok(stream) } + + /// Subscribe to indexer updates for a specific contract address. + /// If no contract address is provided, it will subscribe to updates for world contract. + pub async fn on_indexer_updated( + &self, + contract_address: Option, + ) -> Result { + let mut grpc_client = self.inner.write().await; + let stream = grpc_client + .subscribe_indexer(contract_address.unwrap_or(self.world_reader.address)) + .await?; + Ok(stream) + } } diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 010f7d7385..d35451564e 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -158,7 +158,7 @@ impl Engine

{ // use the start block provided by user if head is 0 let (head, _, _) = self.db.head().await?; if head == 0 { - self.db.set_head(self.config.start_block)?; + self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?; } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -389,6 +389,7 @@ impl Engine

{ let timestamp = data.pending_block.timestamp; + let mut world_txns_count = 0; for t in data.pending_block.transactions { let transaction_hash = t.transaction.transaction_hash(); if let Some(tx) = last_pending_block_tx_cursor { @@ -409,7 +410,14 @@ impl Engine

{ // provider. So we can fail silently and try // again in the next iteration. warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1)?; + self.db + .set_head( + data.block_number - 1, + timestamp, + world_txns_count, + self.world.address, + ) + .await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx))?; } @@ -430,6 +438,7 @@ impl Engine

{ } } Ok(true) => { + world_txns_count += 1; last_pending_block_world_tx = Some(*transaction_hash); last_pending_block_tx = Some(*transaction_hash); info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction."); @@ -446,7 +455,9 @@ impl Engine

{ // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1)?; + self.db + .set_head(data.block_number - 1, timestamp, world_txns_count, self.world.address) + .await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx))?; @@ -466,6 +477,7 @@ impl Engine

{ pub async fn process_range(&mut self, data: FetchRangeResult) -> Result { // Process all transactions let mut last_block = 0; + let transactions_count = data.transactions.len(); for ((block_number, transaction_hash), events) in data.transactions { debug!("Processing transaction hash: {:#x}", transaction_hash); // Process transaction @@ -498,7 +510,15 @@ impl Engine

{ // Process parallelized events self.process_tasks().await?; - self.db.set_head(data.latest_block_number)?; + let last_block_timestamp = self.get_block_timestamp(data.latest_block_number).await?; + self.db + .set_head( + data.latest_block_number, + last_block_timestamp, + transactions_count as u64, + self.world.address, + ) + .await?; self.db.set_last_pending_block_world_tx(None)?; self.db.set_last_pending_block_tx(None)?; diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 503759e43f..e0a99700f8 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -14,8 +14,8 @@ use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; use crate::types::{ - Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - Model as ModelRegistered, + Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted, + EventMessage as EventMessageUpdated, Model as ModelRegistered, }; pub(crate) const LOG_TARGET: &str = "torii_core::executor"; @@ -31,6 +31,7 @@ pub enum Argument { #[derive(Debug, Clone)] pub enum BrokerMessage { + SetHead(ContractUpdated), ModelRegistered(ModelRegistered), EntityUpdated(EntityUpdated), EventMessageUpdated(EventMessageUpdated), @@ -45,8 +46,17 @@ pub struct DeleteEntityQuery { pub ty: Ty, } +#[derive(Debug, Clone)] +pub struct SetHeadQuery { + pub head: u64, + pub last_block_timestamp: u64, + pub txns_count: u64, + pub contract_address: Felt, +} + #[derive(Debug, Clone)] pub enum QueryType { + SetHead(SetHeadQuery), SetEntity(Ty), DeleteEntity(DeleteEntityQuery), EventMessage(Ty), @@ -178,6 +188,35 @@ impl<'c> Executor<'c> { let tx = &mut self.transaction; match query_type { + QueryType::SetHead(set_head) => { + let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>( + "SELECT last_block_timestamp FROM contracts WHERE id = ?", + ) + .bind(format!("{:#x}", set_head.contract_address)) + .fetch_one(&mut **tx) + .await? + .try_into() + .map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?; + + let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 { + set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp) + } else { + set_head.txns_count + }; + + query.execute(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + + let row = sqlx::query("UPDATE contracts SET tps = ? WHERE id = ? RETURNING *") + .bind(tps as i64) + .bind(format!("{:#x}", set_head.contract_address)) + .fetch_one(&mut **tx) + .await?; + + let contract = ContractUpdated::from_row(&row)?; + self.publish_queue.push(BrokerMessage::SetHead(contract)); + } QueryType::SetEntity(entity) => { let row = query.fetch_one(&mut **tx).await.with_context(|| { format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) @@ -289,6 +328,7 @@ impl<'c> Executor<'c> { fn send_broker_message(message: BrokerMessage) { match message { + BrokerMessage::SetHead(update) => SimpleBroker::publish(update), BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index c42e447080..257f906a05 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -15,7 +15,7 @@ use starknet_crypto::poseidon_hash_many; use tokio::sync::mpsc::UnboundedSender; use crate::cache::{Model, ModelCache}; -use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType}; +use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType, SetHeadQuery}; use crate::utils::utc_dt_string_from_timestamp; type IsEventMessage = bool; @@ -86,17 +86,32 @@ impl Sql { )) } - pub fn set_head(&mut self, head: u64) -> Result<()> { - let head = Argument::Int( + pub async fn set_head( + &mut self, + head: u64, + last_block_timestamp: u64, + world_txns_count: u64, + contract_address: Felt, + ) -> Result<()> { + let head_arg = Argument::Int( head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?, ); + let last_block_timestamp_arg = + Argument::Int(last_block_timestamp.try_into().map_err(|_| { + anyhow!("Last block timestamp value {} doesn't fit in i64", last_block_timestamp) + })?); let id = Argument::FieldElement(self.world_address); - self.executor - .send(QueryMessage::other( - "UPDATE contracts SET head = ? WHERE id = ?".to_string(), - vec![head, id], - )) - .map_err(|e| anyhow!("Failed to send set_head message: {}", e))?; + + self.executor.send(QueryMessage::new( + "UPDATE contracts SET head = ?, last_block_timestamp = ? WHERE id = ?".to_string(), + vec![head_arg, last_block_timestamp_arg, id], + QueryType::SetHead(SetHeadQuery { + head, + last_block_timestamp, + txns_count: world_txns_count, + contract_address, + }), + ))?; Ok(()) } diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index de75fca94a..e87a1205e7 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -84,3 +84,12 @@ pub struct Event { pub executed_at: DateTime, pub created_at: DateTime, } + +#[derive(FromRow, Deserialize, Debug, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub struct Contract { + pub head: i64, + pub tps: i64, + pub last_block_timestamp: i64, + pub contract_address: String, +} diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index 8e8010fef1..49b8ad0742 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -7,8 +7,11 @@ import "google/protobuf/empty.proto"; // The World service provides information about the world. service World { + // Subscribes to updates about the indexer. Like the head block number, tps, etc. + rpc SubscribeIndexer (SubscribeIndexerRequest) returns (stream SubscribeIndexerResponse); + // Retrieves metadata about the World including all the registered components and systems. - rpc WorldMetadata (MetadataRequest) returns (MetadataResponse); + rpc WorldMetadata (WorldMetadataRequest) returns (WorldMetadataResponse); // Subscribes to models updates. rpc SubscribeModels (SubscribeModelsRequest) returns (stream SubscribeModelsResponse); @@ -38,14 +41,26 @@ service World { rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse); } +// A request to subscribe to indexer updates. +message SubscribeIndexerRequest { + bytes contract_address = 1; +} + +// A response containing indexer updates. +message SubscribeIndexerResponse { + int64 head = 1; + int64 tps = 2; + int64 last_block_timestamp = 3; + bytes contract_address = 4; +} // A request to retrieve metadata for a specific world ID. -message MetadataRequest { +message WorldMetadataRequest { } // The metadata response contains addresses and class hashes for the world. -message MetadataResponse { +message WorldMetadataResponse { types.WorldMetadata metadata = 1; } diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index 38630f321a..d94eb2fa7a 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -8,13 +8,16 @@ use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate}; use tonic::transport::Endpoint; use crate::proto::world::{ - world_client, MetadataRequest, RetrieveEntitiesRequest, RetrieveEntitiesResponse, - RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest, - SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse, - SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, + world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, + RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, + SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, + SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, + UpdateEntitiesSubscriptionRequest, WorldMetadataRequest, }; use crate::types::schema::{Entity, SchemaError}; -use crate::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query}; +use crate::types::{ + EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query, +}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -68,7 +71,7 @@ impl WorldClient { /// Retrieve the metadata of the World. pub async fn metadata(&mut self) -> Result { self.inner - .world_metadata(MetadataRequest {}) + .world_metadata(WorldMetadataRequest {}) .await .map_err(Error::Grpc) .and_then(|res| { @@ -107,6 +110,22 @@ impl WorldClient { self.inner.retrieve_events(request).await.map_err(Error::Grpc).map(|res| res.into_inner()) } + /// Subscribe to indexer updates. + pub async fn subscribe_indexer( + &mut self, + contract_address: Felt, + ) -> Result { + let request = + SubscribeIndexerRequest { contract_address: contract_address.to_bytes_be().to_vec() }; + let stream = self + .inner + .subscribe_indexer(request) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner())?; + Ok(IndexerUpdateStreaming(stream.map_ok(Box::new(|res| res.into())))) + } + /// Subscribe to entities updates of a World. pub async fn subscribe_entities( &mut self, @@ -282,6 +301,24 @@ impl Stream for EventUpdateStreaming { } } +type IndexerMappedStream = MapOk< + tonic::Streaming, + Box IndexerUpdate + Send>, +>; + +#[derive(Debug)] +pub struct IndexerUpdateStreaming(IndexerMappedStream); + +impl Stream for IndexerUpdateStreaming { + type Item = ::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.poll_next_unpin(cx) + } +} + fn empty_state_update() -> StateUpdate { StateUpdate { block_hash: Felt::ZERO, diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 4dd245d03a..6a496cc368 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -17,8 +17,8 @@ use dojo_types::schema::Ty; use dojo_world::contracts::naming::compute_selector_from_names; use futures::Stream; use proto::world::{ - MetadataRequest, MetadataResponse, RetrieveEntitiesRequest, RetrieveEntitiesResponse, - RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse, + RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, + RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, }; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; @@ -29,6 +29,7 @@ use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use subscriptions::event::EventManager; +use subscriptions::indexer::IndexerManager; use tokio::net::TcpListener; use tokio::sync::mpsc::Receiver; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; @@ -47,6 +48,7 @@ use crate::proto::types::LogicalOperator; use crate::proto::world::world_server::WorldServer; use crate::proto::world::{ SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventsResponse, + SubscribeIndexerRequest, SubscribeIndexerResponse, WorldMetadataRequest, WorldMetadataResponse, }; use crate::proto::{self}; use crate::types::schema::SchemaError; @@ -84,6 +86,7 @@ pub struct DojoWorld { event_message_manager: Arc, event_manager: Arc, state_diff_manager: Arc, + indexer_manager: Arc, } impl DojoWorld { @@ -98,6 +101,7 @@ impl DojoWorld { let event_message_manager = Arc::new(EventMessageManager::default()); let event_manager = Arc::new(EventManager::default()); let state_diff_manager = Arc::new(StateDiffManager::default()); + let indexer_manager = Arc::new(IndexerManager::default()); tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv( block_rx, @@ -114,6 +118,8 @@ impl DojoWorld { tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager))); + tokio::task::spawn(subscriptions::indexer::Service::new(Arc::clone(&indexer_manager))); + Self { pool, world_address, @@ -122,12 +128,13 @@ impl DojoWorld { event_message_manager, event_manager, state_diff_manager, + indexer_manager, } } } impl DojoWorld { - pub async fn metadata(&self) -> Result { + pub async fn world(&self) -> Result { let world_address = sqlx::query_scalar(&format!( "SELECT contract_address FROM contracts WHERE id = '{:#x}'", self.world_address @@ -684,6 +691,14 @@ impl DojoWorld { }) } + async fn subscribe_indexer( + &self, + contract_address: Felt, + ) -> Result>, Error> + { + self.indexer_manager.add_subscriber(&self.pool, contract_address).await + } + async fn subscribe_models( &self, models_keys: Vec, @@ -1009,6 +1024,8 @@ type SubscribeEntitiesResponseStream = Pin> + Send>>; type SubscribeEventsResponseStream = Pin> + Send>>; +type SubscribeIndexerResponseStream = + Pin> + Send>>; #[tonic::async_trait] impl proto::world::world_server::World for DojoWorld { @@ -1016,17 +1033,30 @@ impl proto::world::world_server::World for DojoWorld { type SubscribeEntitiesStream = SubscribeEntitiesResponseStream; type SubscribeEventMessagesStream = SubscribeEntitiesResponseStream; type SubscribeEventsStream = SubscribeEventsResponseStream; + type SubscribeIndexerStream = SubscribeIndexerResponseStream; async fn world_metadata( &self, - _request: Request, - ) -> Result, Status> { - let metadata = Some(self.metadata().await.map_err(|e| match e { + _request: Request, + ) -> Result, Status> { + let metadata = Some(self.world().await.map_err(|e| match e { Error::Sql(sqlx::Error::RowNotFound) => Status::not_found("World not found"), e => Status::internal(e.to_string()), })?); - Ok(Response::new(MetadataResponse { metadata })) + Ok(Response::new(WorldMetadataResponse { metadata })) + } + + async fn subscribe_indexer( + &self, + request: Request, + ) -> ServiceResult { + let SubscribeIndexerRequest { contract_address } = request.into_inner(); + let rx = self + .subscribe_indexer(Felt::from_bytes_be_slice(&contract_address)) + .await + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream)) } async fn subscribe_models( diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs new file mode 100644 index 0000000000..27315b6766 --- /dev/null +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -0,0 +1,144 @@ +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::str::FromStr; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::{Stream, StreamExt}; +use rand::Rng; +use sqlx::{Pool, Sqlite}; +use starknet::core::types::Felt; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::RwLock; +use torii_core::error::{Error, ParseError}; +use torii_core::simple_broker::SimpleBroker; +use torii_core::types::Contract as ContractUpdated; +use tracing::{error, trace}; + +use crate::proto; +use crate::proto::world::SubscribeIndexerResponse; + +pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::indexer"; + +#[derive(Debug)] +pub struct IndexerSubscriber { + /// Contract address that the subscriber is interested in + contract_address: Felt, + /// The channel to send the response back to the subscriber. + sender: Sender>, +} + +#[derive(Debug, Default)] +pub struct IndexerManager { + subscribers: RwLock>, +} + +impl IndexerManager { + pub async fn add_subscriber( + &self, + pool: &Pool, + contract_address: Felt, + ) -> Result>, Error> + { + let id = rand::thread_rng().gen::(); + let (sender, receiver) = channel(1); + + let mut statement = + "SELECT head, tps, last_block_timestamp, contract_address FROM contracts".to_string(); + + let contracts: Vec = if contract_address != Felt::ZERO { + statement += " WHERE id = ?"; + + sqlx::query_as(&statement) + .bind(format!("{:#x}", contract_address)) + .fetch_all(pool) + .await? + } else { + sqlx::query_as(&statement).fetch_all(pool).await? + }; + + for contract in contracts { + let _ = sender + .send(Ok(SubscribeIndexerResponse { + head: contract.head, + tps: contract.tps, + last_block_timestamp: contract.last_block_timestamp, + contract_address: contract_address.to_bytes_be().to_vec(), + })) + .await; + } + self.subscribers.write().await.insert(id, IndexerSubscriber { contract_address, sender }); + + Ok(receiver) + } + + pub(super) async fn remove_subscriber(&self, id: usize) { + self.subscribers.write().await.remove(&id); + } +} + +#[must_use = "Service does nothing unless polled"] +#[allow(missing_debug_implementations)] +pub struct Service { + subs_manager: Arc, + simple_broker: Pin + Send>>, +} + +impl Service { + pub fn new(subs_manager: Arc) -> Self { + Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + } + + async fn publish_updates( + subs: Arc, + update: &ContractUpdated, + ) -> Result<(), Error> { + let mut closed_stream = Vec::new(); + let contract_address = + Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?; + + for (idx, sub) in subs.subscribers.read().await.iter() { + if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address { + continue; + } + + let resp = SubscribeIndexerResponse { + head: update.head, + tps: update.tps, + last_block_timestamp: update.last_block_timestamp, + contract_address: contract_address.to_bytes_be().to_vec(), + }; + + if sub.sender.send(Ok(resp)).await.is_err() { + closed_stream.push(*idx); + } + } + + for id in closed_stream { + trace!(target = LOG_TARGET, id = %id, "Closing indexer updates stream."); + subs.remove_subscriber(id).await + } + + Ok(()) + } +} + +impl Future for Service { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { + let pin = self.get_mut(); + + while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { + let subs = Arc::clone(&pin.subs_manager); + tokio::spawn(async move { + if let Err(e) = Service::publish_updates(subs, &event).await { + error!(target = LOG_TARGET, error = %e, "Publishing indexer update."); + } + }); + } + + Poll::Pending + } +} diff --git a/crates/torii/grpc/src/server/subscriptions/mod.rs b/crates/torii/grpc/src/server/subscriptions/mod.rs index f591862bd9..3a44537427 100644 --- a/crates/torii/grpc/src/server/subscriptions/mod.rs +++ b/crates/torii/grpc/src/server/subscriptions/mod.rs @@ -2,4 +2,5 @@ pub mod entity; pub mod error; pub mod event; pub mod event_message; +pub mod indexer; pub mod model_diff; diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index fad16b3739..b47400d954 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -16,6 +16,25 @@ use crate::proto::{self}; pub mod schema; +#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] +pub struct IndexerUpdate { + pub head: i64, + pub tps: i64, + pub last_block_timestamp: i64, + pub contract_address: Felt, +} + +impl From for IndexerUpdate { + fn from(value: proto::world::SubscribeIndexerResponse) -> Self { + Self { + head: value.head, + tps: value.tps, + last_block_timestamp: value.last_block_timestamp, + contract_address: Felt::from_bytes_be_slice(&value.contract_address), + } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] pub struct Query { pub clause: Option, diff --git a/crates/torii/migrations/20240923155431_tps.sql b/crates/torii/migrations/20240923155431_tps.sql new file mode 100644 index 0000000000..231b4c4edc --- /dev/null +++ b/crates/torii/migrations/20240923155431_tps.sql @@ -0,0 +1,3 @@ +-- Add last_block_timestamp column for TPS calculation +ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER; +ALTER TABLE contracts ADD COLUMN tps INTEGER; \ No newline at end of file