From e5bd050a6ae92ce73d840fc7d5811a2254b58abc Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 23 Sep 2024 12:56:55 -0400 Subject: [PATCH 01/15] feat: add tps metric to contracts table --- crates/torii/core/src/sql.rs | 40 +++++++++++++++++-- .../torii/migrations/20240923155431_tps.sql | 4 ++ 2 files changed, 40 insertions(+), 4 deletions(-) create mode 100644 crates/torii/migrations/20240923155431_tps.sql diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 249a3c4fef..34f94a3fba 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -110,14 +110,42 @@ impl Sql { )) } - pub fn set_head(&mut self, head: u64) { + pub async fn set_head( + &mut self, + head: u64, + last_block_timestamp: u64, + txns_count: u64, + ) -> Result<()> { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); let id = Argument::FieldElement(self.world_address); + + let mut conn = self.pool.acquire().await?; + let previous_block_timestamp: u64 = + sqlx::query_scalar::<_, i64>("SELECT last_block_timestamp FROM contracts WHERE id = ?") + .bind(format!("{:#x}", self.world_address)) + .fetch_optional(&mut *conn) + .await? + .unwrap_or(0) + .try_into() + .expect("doesn't fit in u64"); + + let tps: u64 = txns_count / (last_block_timestamp - previous_block_timestamp); + let tps = Argument::Int(tps.try_into().expect("doesn't fit in u64")); + + let last_block_timestamp = + Argument::Int(last_block_timestamp.try_into().expect("doesn't fit in u64")); self.query_queue.enqueue( - "UPDATE contracts SET head = ? WHERE id = ?", - vec![head, id], + "UPDATE contracts SET head = ?, tps = ?, last_block_timestamp = ? WHERE id = ?", + vec![ + head, + tps, + last_block_timestamp, + id, + ], QueryType::Other, ); + + Ok(()) } pub fn set_last_pending_block_world_tx(&mut self, last_pending_block_world_tx: Option) { @@ -716,7 +744,11 @@ impl Sql { Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } + if let Ty::Tuple(t) = &o.ty { + t.is_empty() + } else { + false + } }, ) { return; diff --git a/crates/torii/migrations/20240923155431_tps.sql b/crates/torii/migrations/20240923155431_tps.sql new file mode 100644 index 0000000000..3fbea8ede1 --- /dev/null +++ b/crates/torii/migrations/20240923155431_tps.sql @@ -0,0 +1,4 @@ +-- Add tps column +ALTER TABLE contracts ADD COLUMN tps INTEGER; +-- Add last block timestamp column +ALTER TABLE contracts ADD COLUMN last_block_timestamp DATETIME; \ No newline at end of file From 07ecb4013c655186fb45da8764c528223fbfecb5 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 23 Sep 2024 13:24:34 -0400 Subject: [PATCH 02/15] set head --- crates/torii/core/src/engine.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index f24180ad44..4d917dafaa 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -151,7 +151,7 @@ impl Engine

{ // use the start block provided by user if head is 0 let (head, _, _) = self.db.head().await?; if head == 0 { - self.db.set_head(self.config.start_block); + self.db.set_head(self.config.start_block, 0, 0).await?; } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -383,6 +383,7 @@ impl Engine

{ let timestamp = data.pending_block.timestamp; + let mut world_txns_count = 0; for t in data.pending_block.transactions { let transaction_hash = t.transaction.transaction_hash(); if let Some(tx) = last_pending_block_tx_cursor { @@ -403,7 +404,7 @@ impl Engine

{ // provider. So we can fail silently and try // again in the next iteration. warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1); + self.db.set_head(data.block_number - 1, timestamp, world_txns_count).await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx)); } @@ -421,6 +422,7 @@ impl Engine

{ } } Ok(true) => { + world_txns_count += 1; last_pending_block_world_tx = Some(*transaction_hash); last_pending_block_tx = Some(*transaction_hash); info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction."); @@ -437,7 +439,7 @@ impl Engine

{ // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1); + self.db.set_head(data.block_number - 1, timestamp, world_txns_count).await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx)); @@ -455,6 +457,7 @@ impl Engine

{ pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> { // Process all transactions let mut last_block = 0; + let transactions_count = data.transactions.len(); for ((block_number, transaction_hash), events) in data.transactions { debug!("Processing transaction hash: {:#x}", transaction_hash); // Process transaction @@ -491,7 +494,7 @@ impl Engine

{ // Process parallelized events self.process_tasks().await?; - self.db.set_head(data.latest_block_number); + self.db.set_head(data.latest_block_number, data.blocks[&data.latest_block_number], transactions_count as u64).await?; self.db.set_last_pending_block_world_tx(None); self.db.set_last_pending_block_tx(None); From 6a6fec9db5f3f5ec24c1d3779f44b770daee700c Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 24 Sep 2024 11:28:14 -0400 Subject: [PATCH 03/15] refactor: tps set head --- crates/torii/core/src/engine.rs | 3 ++- crates/torii/core/src/sql.rs | 14 +++++++------- crates/torii/migrations/20240923155431_tps.sql | 5 ++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 4d917dafaa..80063b878c 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -494,7 +494,8 @@ impl Engine

{ // Process parallelized events self.process_tasks().await?; - self.db.set_head(data.latest_block_number, data.blocks[&data.latest_block_number], transactions_count as u64).await?; + let last_block_timestamp = self.get_block_timestamp(data.latest_block_number).await?; + self.db.set_head(data.latest_block_number, last_block_timestamp, transactions_count as u64).await?; self.db.set_last_pending_block_world_tx(None); self.db.set_last_pending_block_tx(None); diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 34f94a3fba..8252902a63 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -129,19 +129,19 @@ impl Sql { .try_into() .expect("doesn't fit in u64"); - let tps: u64 = txns_count / (last_block_timestamp - previous_block_timestamp); + let tps: u64 = if last_block_timestamp - previous_block_timestamp != 0 { + txns_count / (last_block_timestamp - previous_block_timestamp) + } else { + 0 + }; let tps = Argument::Int(tps.try_into().expect("doesn't fit in u64")); let last_block_timestamp = Argument::Int(last_block_timestamp.try_into().expect("doesn't fit in u64")); + self.query_queue.enqueue( "UPDATE contracts SET head = ?, tps = ?, last_block_timestamp = ? WHERE id = ?", - vec![ - head, - tps, - last_block_timestamp, - id, - ], + vec![head, tps, last_block_timestamp, id], QueryType::Other, ); diff --git a/crates/torii/migrations/20240923155431_tps.sql b/crates/torii/migrations/20240923155431_tps.sql index 3fbea8ede1..11516e475b 100644 --- a/crates/torii/migrations/20240923155431_tps.sql +++ b/crates/torii/migrations/20240923155431_tps.sql @@ -1,4 +1,3 @@ --- Add tps column +-- Add tps related columns ALTER TABLE contracts ADD COLUMN tps INTEGER; --- Add last block timestamp column -ALTER TABLE contracts ADD COLUMN last_block_timestamp DATETIME; \ No newline at end of file +ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER; \ No newline at end of file From e35d72a8ac4842429bf3841ef451fc33c2ab67f6 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 10:43:25 -0400 Subject: [PATCH 04/15] wip --- crates/torii/core/src/sql.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 66f5ba4e89..59730c402f 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -92,7 +92,7 @@ impl Sql { last_block_timestamp: u64, txns_count: u64, ) -> Result<()> { - let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); + let head = Argument::Int(head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in u64", head))?); let id = Argument::FieldElement(self.world_address); let mut conn = self.pool.acquire().await?; @@ -110,16 +110,15 @@ impl Sql { } else { 0 }; - let tps = Argument::Int(tps.try_into().expect("doesn't fit in u64")); + let tps = Argument::Int(tps.try_into().map_err(|_| anyhow!("Tps value {} doesn't fit in u64", tps))?); let last_block_timestamp = - Argument::Int(last_block_timestamp.try_into().expect("doesn't fit in u64")); + Argument::Int(last_block_timestamp.try_into().map_err(|_| anyhow!("Last block timestamp value {} doesn't fit in u64", last_block_timestamp))?); - self.query_queue.enqueue( - "UPDATE contracts SET head = ?, tps = ?, last_block_timestamp = ? WHERE id = ?", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET head = ?, tps = ?, last_block_timestamp = ? WHERE id = ?".to_string(), vec![head, tps, last_block_timestamp, id], - QueryType::Other, - ); + ))?; Ok(()) } From 244c277adc472adc8b266ba5bddf97fab60b0085 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 12:40:38 -0400 Subject: [PATCH 05/15] tps --- crates/torii/core/src/engine.rs | 15 +++++++++------ crates/torii/core/src/sql.rs | 24 ++++++++++++++++-------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index fed0bb3d95..cd3995055a 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -158,7 +158,7 @@ impl Engine

{ // use the start block provided by user if head is 0 let (head, _, _) = self.db.head().await?; if head == 0 { - self.db.set_head(self.config.start_block, 0, 0).await?; + self.db.set_head(self.config.start_block).await?; } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -410,7 +410,8 @@ impl Engine

{ // provider. So we can fail silently and try // again in the next iteration. warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1, timestamp, world_txns_count).await?; + self.db.set_head(data.block_number - 1).await?; + self.db.set_tps(world_txns_count, timestamp).await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx))?; } @@ -448,7 +449,8 @@ impl Engine

{ // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1, timestamp, world_txns_count).await?; + self.db.set_head(data.block_number - 1).await?; + self.db.set_tps(world_txns_count, timestamp).await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx))?; @@ -502,9 +504,10 @@ impl Engine

{ self.process_tasks().await?; let last_block_timestamp = self.get_block_timestamp(data.latest_block_number).await?; - self.db.set_head(data.latest_block_number, last_block_timestamp, transactions_count as u64).await?; - self.db.set_last_pending_block_world_tx(None); - self.db.set_last_pending_block_tx(None); + self.db.set_head(data.latest_block_number).await?; + self.db.set_tps(transactions_count as u64, last_block_timestamp).await?; + self.db.set_last_pending_block_world_tx(None)?; + self.db.set_last_pending_block_tx(None)?; Ok(EngineHead { block_number: data.latest_block_number, diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 59730c402f..8001333599 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -89,12 +89,21 @@ impl Sql { pub async fn set_head( &mut self, head: u64, - last_block_timestamp: u64, - txns_count: u64, ) -> Result<()> { let head = Argument::Int(head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in u64", head))?); let id = Argument::FieldElement(self.world_address); + self.executor.send(QueryMessage::other( + "UPDATE contracts SET head = ? WHERE id = ?".to_string(), + vec![head, id], + ))?; + + Ok(()) + } + + pub async fn set_tps(&mut self, txns_count: u64, last_block_timestamp: u64) -> Result<()> { + let id = Argument::FieldElement(self.world_address); + let mut conn = self.pool.acquire().await?; let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>("SELECT last_block_timestamp FROM contracts WHERE id = ?") @@ -108,16 +117,15 @@ impl Sql { let tps: u64 = if last_block_timestamp - previous_block_timestamp != 0 { txns_count / (last_block_timestamp - previous_block_timestamp) } else { - 0 + txns_count }; - let tps = Argument::Int(tps.try_into().map_err(|_| anyhow!("Tps value {} doesn't fit in u64", tps))?); - let last_block_timestamp = - Argument::Int(last_block_timestamp.try_into().map_err(|_| anyhow!("Last block timestamp value {} doesn't fit in u64", last_block_timestamp))?); + let tps = Argument::Int(tps.try_into().map_err(|_| anyhow!("Tps value {} doesn't fit in u64", tps))?); + let last_block_timestamp = Argument::Int(last_block_timestamp.try_into().map_err(|_| anyhow!("Last block timestamp value {} doesn't fit in u64", last_block_timestamp))?); self.executor.send(QueryMessage::other( - "UPDATE contracts SET head = ?, tps = ?, last_block_timestamp = ? WHERE id = ?".to_string(), - vec![head, tps, last_block_timestamp, id], + "UPDATE contracts SET tps = ?, last_block_timestamp = ? WHERE id = ?".to_string(), + vec![tps, last_block_timestamp, id], ))?; Ok(()) From d938bab3277b1c52e0b41d381955b0ffb9b290ba Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 12:40:54 -0400 Subject: [PATCH 06/15] fmt --- crates/torii/core/src/sql.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 8001333599..cd4d02b0dd 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -86,11 +86,10 @@ impl Sql { )) } - pub async fn set_head( - &mut self, - head: u64, - ) -> Result<()> { - let head = Argument::Int(head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in u64", head))?); + pub async fn set_head(&mut self, head: u64) -> Result<()> { + let head = Argument::Int( + head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in u64", head))?, + ); let id = Argument::FieldElement(self.world_address); self.executor.send(QueryMessage::other( @@ -120,8 +119,13 @@ impl Sql { txns_count }; - let tps = Argument::Int(tps.try_into().map_err(|_| anyhow!("Tps value {} doesn't fit in u64", tps))?); - let last_block_timestamp = Argument::Int(last_block_timestamp.try_into().map_err(|_| anyhow!("Last block timestamp value {} doesn't fit in u64", last_block_timestamp))?); + let tps = Argument::Int( + tps.try_into().map_err(|_| anyhow!("Tps value {} doesn't fit in u64", tps))?, + ); + let last_block_timestamp = + Argument::Int(last_block_timestamp.try_into().map_err(|_| { + anyhow!("Last block timestamp value {} doesn't fit in u64", last_block_timestamp) + })?); self.executor.send(QueryMessage::other( "UPDATE contracts SET tps = ?, last_block_timestamp = ? WHERE id = ?".to_string(), @@ -740,11 +744,7 @@ impl Sql { Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { - t.is_empty() - } else { - false - } + if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } }, ) { return Ok(()); From de3f985e38f9a710230c6bf45b3e046b4e549aad Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 15:14:33 -0400 Subject: [PATCH 07/15] indexer updates subscription --- crates/torii/core/src/engine.rs | 27 +++- crates/torii/core/src/executor.rs | 41 +++++- crates/torii/core/src/sql.rs | 63 +++------ crates/torii/core/src/types.rs | 9 ++ crates/torii/grpc/proto/world.proto | 21 ++- crates/torii/grpc/src/client.rs | 41 +++++- crates/torii/grpc/src/server/mod.rs | 44 +++++- .../grpc/src/server/subscriptions/indexer.rs | 127 ++++++++++++++++++ .../grpc/src/server/subscriptions/mod.rs | 1 + crates/torii/grpc/src/types/mod.rs | 14 ++ .../20240923155431_last_block_timestamp.sql | 2 + .../torii/migrations/20240923155431_tps.sql | 3 - 12 files changed, 324 insertions(+), 69 deletions(-) create mode 100644 crates/torii/grpc/src/server/subscriptions/indexer.rs create mode 100644 crates/torii/migrations/20240923155431_last_block_timestamp.sql delete mode 100644 crates/torii/migrations/20240923155431_tps.sql diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index cd3995055a..d35451564e 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -158,7 +158,7 @@ impl Engine

{ // use the start block provided by user if head is 0 let (head, _, _) = self.db.head().await?; if head == 0 { - self.db.set_head(self.config.start_block).await?; + self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?; } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -410,8 +410,14 @@ impl Engine

{ // provider. So we can fail silently and try // again in the next iteration. warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1).await?; - self.db.set_tps(world_txns_count, timestamp).await?; + self.db + .set_head( + data.block_number - 1, + timestamp, + world_txns_count, + self.world.address, + ) + .await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx))?; } @@ -449,8 +455,9 @@ impl Engine

{ // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1).await?; - self.db.set_tps(world_txns_count, timestamp).await?; + self.db + .set_head(data.block_number - 1, timestamp, world_txns_count, self.world.address) + .await?; if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx))?; @@ -504,8 +511,14 @@ impl Engine

{ self.process_tasks().await?; let last_block_timestamp = self.get_block_timestamp(data.latest_block_number).await?; - self.db.set_head(data.latest_block_number).await?; - self.db.set_tps(transactions_count as u64, last_block_timestamp).await?; + self.db + .set_head( + data.latest_block_number, + last_block_timestamp, + transactions_count as u64, + self.world.address, + ) + .await?; self.db.set_last_pending_block_world_tx(None)?; self.db.set_last_pending_block_tx(None)?; diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 503759e43f..0060767824 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -14,8 +14,7 @@ use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; use crate::types::{ - Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - Model as ModelRegistered, + Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, IndexerUpdate, Model as ModelRegistered }; pub(crate) const LOG_TARGET: &str = "torii_core::executor"; @@ -31,6 +30,7 @@ pub enum Argument { #[derive(Debug, Clone)] pub enum BrokerMessage { + SetHead(IndexerUpdate), ModelRegistered(ModelRegistered), EntityUpdated(EntityUpdated), EventMessageUpdated(EventMessageUpdated), @@ -45,8 +45,17 @@ pub struct DeleteEntityQuery { pub ty: Ty, } +#[derive(Debug, Clone)] +pub struct SetHeadQuery { + pub head: u64, + pub last_block_timestamp: u64, + pub txns_count: u64, + pub contract_address: Felt, +} + #[derive(Debug, Clone)] pub enum QueryType { + SetHead(SetHeadQuery), SetEntity(Ty), DeleteEntity(DeleteEntityQuery), EventMessage(Ty), @@ -178,6 +187,33 @@ impl<'c> Executor<'c> { let tx = &mut self.transaction; match query_type { + QueryType::SetHead(set_head) => { + let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>( + "SELECT last_block_timestamp FROM contracts WHERE id = ?", + ) + .bind(format!("{:#x}", set_head.contract_address)) + .fetch_one(&mut **tx) + .await? + .try_into() + .map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?; + + let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 { + set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp) + } else { + set_head.txns_count + }; + + query.execute(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + + self.publish_queue.push(BrokerMessage::SetHead(IndexerUpdate { + head: set_head.head, + tps, + last_block_timestamp: set_head.last_block_timestamp, + contract_address: set_head.contract_address, + })); + } QueryType::SetEntity(entity) => { let row = query.fetch_one(&mut **tx).await.with_context(|| { format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) @@ -289,6 +325,7 @@ impl<'c> Executor<'c> { fn send_broker_message(message: BrokerMessage) { match message { + BrokerMessage::SetHead(update) => SimpleBroker::publish(update), BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index cd4d02b0dd..257f906a05 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -15,7 +15,7 @@ use starknet_crypto::poseidon_hash_many; use tokio::sync::mpsc::UnboundedSender; use crate::cache::{Model, ModelCache}; -use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType}; +use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType, SetHeadQuery}; use crate::utils::utc_dt_string_from_timestamp; type IsEventMessage = bool; @@ -86,50 +86,31 @@ impl Sql { )) } - pub async fn set_head(&mut self, head: u64) -> Result<()> { - let head = Argument::Int( - head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in u64", head))?, - ); - let id = Argument::FieldElement(self.world_address); - - self.executor.send(QueryMessage::other( - "UPDATE contracts SET head = ? WHERE id = ?".to_string(), - vec![head, id], - ))?; - - Ok(()) - } - - pub async fn set_tps(&mut self, txns_count: u64, last_block_timestamp: u64) -> Result<()> { - let id = Argument::FieldElement(self.world_address); - - let mut conn = self.pool.acquire().await?; - let previous_block_timestamp: u64 = - sqlx::query_scalar::<_, i64>("SELECT last_block_timestamp FROM contracts WHERE id = ?") - .bind(format!("{:#x}", self.world_address)) - .fetch_optional(&mut *conn) - .await? - .unwrap_or(0) - .try_into() - .expect("doesn't fit in u64"); - - let tps: u64 = if last_block_timestamp - previous_block_timestamp != 0 { - txns_count / (last_block_timestamp - previous_block_timestamp) - } else { - txns_count - }; - - let tps = Argument::Int( - tps.try_into().map_err(|_| anyhow!("Tps value {} doesn't fit in u64", tps))?, + pub async fn set_head( + &mut self, + head: u64, + last_block_timestamp: u64, + world_txns_count: u64, + contract_address: Felt, + ) -> Result<()> { + let head_arg = Argument::Int( + head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?, ); - let last_block_timestamp = + let last_block_timestamp_arg = Argument::Int(last_block_timestamp.try_into().map_err(|_| { - anyhow!("Last block timestamp value {} doesn't fit in u64", last_block_timestamp) + anyhow!("Last block timestamp value {} doesn't fit in i64", last_block_timestamp) })?); + let id = Argument::FieldElement(self.world_address); - self.executor.send(QueryMessage::other( - "UPDATE contracts SET tps = ?, last_block_timestamp = ? WHERE id = ?".to_string(), - vec![tps, last_block_timestamp, id], + self.executor.send(QueryMessage::new( + "UPDATE contracts SET head = ?, last_block_timestamp = ? WHERE id = ?".to_string(), + vec![head_arg, last_block_timestamp_arg, id], + QueryType::SetHead(SetHeadQuery { + head, + last_block_timestamp, + txns_count: world_txns_count, + contract_address, + }), ))?; Ok(()) diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index de75fca94a..8eb13372dd 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -84,3 +84,12 @@ pub struct Event { pub executed_at: DateTime, pub created_at: DateTime, } + + +#[derive(Debug, Clone)] +pub struct IndexerUpdate { + pub head: u64, + pub tps: u64, + pub last_block_timestamp: u64, + pub contract_address: Felt, +} diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index 8e8010fef1..1fc9173ec0 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -7,8 +7,11 @@ import "google/protobuf/empty.proto"; // The World service provides information about the world. service World { + // Subscribes to updates about the indexer. Like the head block number, tps, etc. + rpc SubscribeIndexer (SubscribeIndexerRequest) returns (stream SubscribeIndexerResponse); + // Retrieves metadata about the World including all the registered components and systems. - rpc WorldMetadata (MetadataRequest) returns (MetadataResponse); + rpc WorldMetadata (WorldMetadataRequest) returns (WorldMetadataResponse); // Subscribes to models updates. rpc SubscribeModels (SubscribeModelsRequest) returns (stream SubscribeModelsResponse); @@ -38,14 +41,26 @@ service World { rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse); } +// A request to subscribe to indexer updates. +message SubscribeIndexerRequest { + bytes contract_address = 1; +} + +// A response containing indexer updates. +message SubscribeIndexerResponse { + uint64 head = 1; + uint64 tps = 2; + uint64 last_block_timestamp = 3; + bytes contract_address = 4; +} // A request to retrieve metadata for a specific world ID. -message MetadataRequest { +message WorldMetadataRequest { } // The metadata response contains addresses and class hashes for the world. -message MetadataResponse { +message WorldMetadataResponse { types.WorldMetadata metadata = 1; } diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index 38630f321a..50981e102a 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -8,13 +8,10 @@ use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate}; use tonic::transport::Endpoint; use crate::proto::world::{ - world_client, MetadataRequest, RetrieveEntitiesRequest, RetrieveEntitiesResponse, - RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest, - SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse, - SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, + world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, WorldMetadataRequest }; use crate::types::schema::{Entity, SchemaError}; -use crate::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query}; +use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -68,7 +65,7 @@ impl WorldClient { /// Retrieve the metadata of the World. pub async fn metadata(&mut self) -> Result { self.inner - .world_metadata(MetadataRequest {}) + .world_metadata(WorldMetadataRequest {}) .await .map_err(Error::Grpc) .and_then(|res| { @@ -107,6 +104,18 @@ impl WorldClient { self.inner.retrieve_events(request).await.map_err(Error::Grpc).map(|res| res.into_inner()) } + /// Subscribe to indexer updates. + pub async fn subscribe_indexer( + &mut self, + contract_address: Felt, + ) -> Result { + let request = SubscribeIndexerRequest { contract_address: contract_address.to_bytes_be().to_vec() }; + let stream = self.inner.subscribe_indexer(request).await.map_err(Error::Grpc).map(|res| res.into_inner())?; + Ok(IndexerUpdateStreaming(stream.map_ok(Box::new(|res| { + res.into() + })))) + } + /// Subscribe to entities updates of a World. pub async fn subscribe_entities( &mut self, @@ -282,6 +291,26 @@ impl Stream for EventUpdateStreaming { } } +type IndexerMappedStream = MapOk< + tonic::Streaming, + Box IndexerUpdate + Send>, +>; + +#[derive(Debug)] +pub struct IndexerUpdateStreaming(IndexerMappedStream); + +impl Stream for IndexerUpdateStreaming { + type Item = ::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.poll_next_unpin(cx) + } +} + + + fn empty_state_update() -> StateUpdate { StateUpdate { block_hash: Felt::ZERO, diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 4dd245d03a..193a0fa947 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -17,8 +17,8 @@ use dojo_types::schema::Ty; use dojo_world::contracts::naming::compute_selector_from_names; use futures::Stream; use proto::world::{ - MetadataRequest, MetadataResponse, RetrieveEntitiesRequest, RetrieveEntitiesResponse, - RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse, + RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, + RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, }; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; @@ -29,6 +29,7 @@ use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use subscriptions::event::EventManager; +use subscriptions::indexer::IndexerManager; use tokio::net::TcpListener; use tokio::sync::mpsc::Receiver; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; @@ -47,6 +48,7 @@ use crate::proto::types::LogicalOperator; use crate::proto::world::world_server::WorldServer; use crate::proto::world::{ SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventsResponse, + SubscribeIndexerRequest, SubscribeIndexerResponse, WorldMetadataRequest, WorldMetadataResponse, }; use crate::proto::{self}; use crate::types::schema::SchemaError; @@ -84,6 +86,7 @@ pub struct DojoWorld { event_message_manager: Arc, event_manager: Arc, state_diff_manager: Arc, + indexer_manager: Arc, } impl DojoWorld { @@ -98,6 +101,7 @@ impl DojoWorld { let event_message_manager = Arc::new(EventMessageManager::default()); let event_manager = Arc::new(EventManager::default()); let state_diff_manager = Arc::new(StateDiffManager::default()); + let indexer_manager = Arc::new(IndexerManager::default()); tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv( block_rx, @@ -114,6 +118,8 @@ impl DojoWorld { tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager))); + tokio::task::spawn(subscriptions::indexer::Service::new(Arc::clone(&indexer_manager))); + Self { pool, world_address, @@ -122,12 +128,13 @@ impl DojoWorld { event_message_manager, event_manager, state_diff_manager, + indexer_manager, } } } impl DojoWorld { - pub async fn metadata(&self) -> Result { + pub async fn world(&self) -> Result { let world_address = sqlx::query_scalar(&format!( "SELECT contract_address FROM contracts WHERE id = '{:#x}'", self.world_address @@ -684,6 +691,14 @@ impl DojoWorld { }) } + async fn subscribe_indexer( + &self, + contract_address: Felt, + ) -> Result>, Error> + { + self.indexer_manager.add_subscriber(contract_address).await + } + async fn subscribe_models( &self, models_keys: Vec, @@ -1009,6 +1024,8 @@ type SubscribeEntitiesResponseStream = Pin> + Send>>; type SubscribeEventsResponseStream = Pin> + Send>>; +type SubscribeIndexerResponseStream = + Pin> + Send>>; #[tonic::async_trait] impl proto::world::world_server::World for DojoWorld { @@ -1016,17 +1033,30 @@ impl proto::world::world_server::World for DojoWorld { type SubscribeEntitiesStream = SubscribeEntitiesResponseStream; type SubscribeEventMessagesStream = SubscribeEntitiesResponseStream; type SubscribeEventsStream = SubscribeEventsResponseStream; + type SubscribeIndexerStream = SubscribeIndexerResponseStream; async fn world_metadata( &self, - _request: Request, - ) -> Result, Status> { - let metadata = Some(self.metadata().await.map_err(|e| match e { + _request: Request, + ) -> Result, Status> { + let metadata = Some(self.world().await.map_err(|e| match e { Error::Sql(sqlx::Error::RowNotFound) => Status::not_found("World not found"), e => Status::internal(e.to_string()), })?); - Ok(Response::new(MetadataResponse { metadata })) + Ok(Response::new(WorldMetadataResponse { metadata })) + } + + async fn subscribe_indexer( + &self, + request: Request, + ) -> ServiceResult { + let SubscribeIndexerRequest { contract_address } = request.into_inner(); + let rx = self + .subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice())) + .await + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream)) } async fn subscribe_models( diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs new file mode 100644 index 0000000000..c1008d428e --- /dev/null +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -0,0 +1,127 @@ +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::{Stream, StreamExt}; +use rand::Rng; +use starknet::core::types::Felt; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::RwLock; +use torii_core::error::Error; +use torii_core::simple_broker::SimpleBroker; +use torii_core::types::IndexerUpdate; +use tracing::{error, trace}; + +use crate::proto; +use crate::proto::world::SubscribeIndexerResponse; + +pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event"; + +#[derive(Debug)] +pub struct IndexerSubscriber { + /// Event keys that the subscriber is interested in + contract_address: Felt, + /// The channel to send the response back to the subscriber. + sender: Sender>, +} + +#[derive(Debug, Default)] +pub struct IndexerManager { + subscribers: RwLock>, +} + +impl IndexerManager { + pub async fn add_subscriber( + &self, + contract_address: Felt, + ) -> Result>, Error> + { + let id = rand::thread_rng().gen::(); + let (sender, receiver) = channel(1); + + // NOTE: unlock issue with firefox/safari + // initially send empty stream message to return from + // initial subscribe call + let _ = sender + .send(Ok(SubscribeIndexerResponse { + head: 0, + tps: 0, + last_block_timestamp: 0, + contract_address: contract_address.to_bytes_be().to_vec(), + })) + .await; + + self.subscribers.write().await.insert(id, IndexerSubscriber { contract_address, sender }); + + Ok(receiver) + } + + pub(super) async fn remove_subscriber(&self, id: usize) { + self.subscribers.write().await.remove(&id); + } +} + +#[must_use = "Service does nothing unless polled"] +#[allow(missing_debug_implementations)] +pub struct Service { + subs_manager: Arc, + simple_broker: Pin + Send>>, +} + +impl Service { + pub fn new(subs_manager: Arc) -> Self { + Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + } + + async fn publish_updates( + subs: Arc, + update: &IndexerUpdate, + ) -> Result<(), Error> { + let mut closed_stream = Vec::new(); + + for (idx, sub) in subs.subscribers.read().await.iter() { + if sub.contract_address != update.contract_address { + continue; + } + + let resp = SubscribeIndexerResponse { + head: update.head, + tps: update.tps, + last_block_timestamp: update.last_block_timestamp, + contract_address: update.contract_address.to_bytes_be().to_vec(), + }; + + if sub.sender.send(Ok(resp)).await.is_err() { + closed_stream.push(*idx); + } + } + + for id in closed_stream { + trace!(target = LOG_TARGET, id = %id, "Closing events stream."); + subs.remove_subscriber(id).await + } + + Ok(()) + } +} + +impl Future for Service { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { + let pin = self.get_mut(); + + while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { + let subs = Arc::clone(&pin.subs_manager); + tokio::spawn(async move { + if let Err(e) = Service::publish_updates(subs, &event).await { + error!(target = LOG_TARGET, error = %e, "Publishing indexer update."); + } + }); + } + + Poll::Pending + } +} diff --git a/crates/torii/grpc/src/server/subscriptions/mod.rs b/crates/torii/grpc/src/server/subscriptions/mod.rs index f591862bd9..3a44537427 100644 --- a/crates/torii/grpc/src/server/subscriptions/mod.rs +++ b/crates/torii/grpc/src/server/subscriptions/mod.rs @@ -2,4 +2,5 @@ pub mod entity; pub mod error; pub mod event; pub mod event_message; +pub mod indexer; pub mod model_diff; diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index fad16b3739..3d30fa69fa 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -16,6 +16,20 @@ use crate::proto::{self}; pub mod schema; +#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] +pub struct IndexerUpdate { + pub head: u64, + pub tps: u64, + pub last_block_timestamp: u64, + pub contract_address: Felt, +} + +impl From for IndexerUpdate { + fn from(value: proto::world::SubscribeIndexerResponse) -> Self { + Self { head: value.head, tps: value.tps, last_block_timestamp: value.last_block_timestamp, contract_address: Felt::from_bytes_be_slice(&value.contract_address) } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] pub struct Query { pub clause: Option, diff --git a/crates/torii/migrations/20240923155431_last_block_timestamp.sql b/crates/torii/migrations/20240923155431_last_block_timestamp.sql new file mode 100644 index 0000000000..626ddd4288 --- /dev/null +++ b/crates/torii/migrations/20240923155431_last_block_timestamp.sql @@ -0,0 +1,2 @@ +-- Add last_block_timestamp column for TPS calculation +ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER; \ No newline at end of file diff --git a/crates/torii/migrations/20240923155431_tps.sql b/crates/torii/migrations/20240923155431_tps.sql deleted file mode 100644 index 11516e475b..0000000000 --- a/crates/torii/migrations/20240923155431_tps.sql +++ /dev/null @@ -1,3 +0,0 @@ --- Add tps related columns -ALTER TABLE contracts ADD COLUMN tps INTEGER; -ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER; \ No newline at end of file From b12107de926fc3032689426650c0c2db7034748c Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 15:14:48 -0400 Subject: [PATCH 08/15] fmt --- crates/torii/core/src/executor.rs | 3 ++- crates/torii/core/src/types.rs | 1 - crates/torii/grpc/src/client.rs | 26 +++++++++++++++++--------- crates/torii/grpc/src/types/mod.rs | 7 ++++++- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 0060767824..38d137022d 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -14,7 +14,8 @@ use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; use crate::types::{ - Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, IndexerUpdate, Model as ModelRegistered + Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, + IndexerUpdate, Model as ModelRegistered, }; pub(crate) const LOG_TARGET: &str = "torii_core::executor"; diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index 8eb13372dd..9e20b881c2 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -85,7 +85,6 @@ pub struct Event { pub created_at: DateTime, } - #[derive(Debug, Clone)] pub struct IndexerUpdate { pub head: u64, diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index 50981e102a..d94eb2fa7a 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -8,10 +8,16 @@ use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate}; use tonic::transport::Endpoint; use crate::proto::world::{ - world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, WorldMetadataRequest + world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, + RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, + SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, + SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, + UpdateEntitiesSubscriptionRequest, WorldMetadataRequest, }; use crate::types::schema::{Entity, SchemaError}; -use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query}; +use crate::types::{ + EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query, +}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -109,11 +115,15 @@ impl WorldClient { &mut self, contract_address: Felt, ) -> Result { - let request = SubscribeIndexerRequest { contract_address: contract_address.to_bytes_be().to_vec() }; - let stream = self.inner.subscribe_indexer(request).await.map_err(Error::Grpc).map(|res| res.into_inner())?; - Ok(IndexerUpdateStreaming(stream.map_ok(Box::new(|res| { - res.into() - })))) + let request = + SubscribeIndexerRequest { contract_address: contract_address.to_bytes_be().to_vec() }; + let stream = self + .inner + .subscribe_indexer(request) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner())?; + Ok(IndexerUpdateStreaming(stream.map_ok(Box::new(|res| res.into())))) } /// Subscribe to entities updates of a World. @@ -309,8 +319,6 @@ impl Stream for IndexerUpdateStreaming { } } - - fn empty_state_update() -> StateUpdate { StateUpdate { block_hash: Felt::ZERO, diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index 3d30fa69fa..b9bf63d1e3 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -26,7 +26,12 @@ pub struct IndexerUpdate { impl From for IndexerUpdate { fn from(value: proto::world::SubscribeIndexerResponse) -> Self { - Self { head: value.head, tps: value.tps, last_block_timestamp: value.last_block_timestamp, contract_address: Felt::from_bytes_be_slice(&value.contract_address) } + Self { + head: value.head, + tps: value.tps, + last_block_timestamp: value.last_block_timestamp, + contract_address: Felt::from_bytes_be_slice(&value.contract_address), + } } } From 9eb9b2a1471c6ddbb4c60864fe6f0779350667ea Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 15:23:22 -0400 Subject: [PATCH 09/15] naming refactor --- crates/torii/core/src/executor.rs | 14 ++++++++++---- crates/torii/core/src/types.rs | 5 +++-- .../torii/grpc/src/server/subscriptions/indexer.rs | 8 ++++---- .../20240923155431_last_block_timestamp.sql | 2 -- crates/torii/migrations/20240923155431_tps.sql | 3 +++ 5 files changed, 20 insertions(+), 12 deletions(-) delete mode 100644 crates/torii/migrations/20240923155431_last_block_timestamp.sql create mode 100644 crates/torii/migrations/20240923155431_tps.sql diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 38d137022d..4626daf147 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -14,8 +14,8 @@ use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; use crate::types::{ - Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - IndexerUpdate, Model as ModelRegistered, + Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted, + EventMessage as EventMessageUpdated, Model as ModelRegistered, }; pub(crate) const LOG_TARGET: &str = "torii_core::executor"; @@ -31,7 +31,7 @@ pub enum Argument { #[derive(Debug, Clone)] pub enum BrokerMessage { - SetHead(IndexerUpdate), + SetHead(ContractUpdated), ModelRegistered(ModelRegistered), EntityUpdated(EntityUpdated), EventMessageUpdated(EventMessageUpdated), @@ -208,7 +208,13 @@ impl<'c> Executor<'c> { format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) })?; - self.publish_queue.push(BrokerMessage::SetHead(IndexerUpdate { + sqlx::query("UPDATE contracts SET tps = ? WHERE id = ?") + .bind(tps as i64) + .bind(format!("{:#x}", set_head.contract_address)) + .execute(&mut **tx) + .await?; + + self.publish_queue.push(BrokerMessage::SetHead(ContractUpdated { head: set_head.head, tps, last_block_timestamp: set_head.last_block_timestamp, diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index 9e20b881c2..29dedc8659 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -85,8 +85,9 @@ pub struct Event { pub created_at: DateTime, } -#[derive(Debug, Clone)] -pub struct IndexerUpdate { +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Contract { pub head: u64, pub tps: u64, pub last_block_timestamp: u64, diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs index c1008d428e..718f1d7eef 100644 --- a/crates/torii/grpc/src/server/subscriptions/indexer.rs +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use torii_core::error::Error; use torii_core::simple_broker::SimpleBroker; -use torii_core::types::IndexerUpdate; +use torii_core::types::Contract as ContractUpdated; use tracing::{error, trace}; use crate::proto; @@ -67,17 +67,17 @@ impl IndexerManager { #[allow(missing_debug_implementations)] pub struct Service { subs_manager: Arc, - simple_broker: Pin + Send>>, + simple_broker: Pin + Send>>, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } } async fn publish_updates( subs: Arc, - update: &IndexerUpdate, + update: &ContractUpdated, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); diff --git a/crates/torii/migrations/20240923155431_last_block_timestamp.sql b/crates/torii/migrations/20240923155431_last_block_timestamp.sql deleted file mode 100644 index 626ddd4288..0000000000 --- a/crates/torii/migrations/20240923155431_last_block_timestamp.sql +++ /dev/null @@ -1,2 +0,0 @@ --- Add last_block_timestamp column for TPS calculation -ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER; \ No newline at end of file diff --git a/crates/torii/migrations/20240923155431_tps.sql b/crates/torii/migrations/20240923155431_tps.sql new file mode 100644 index 0000000000..231b4c4edc --- /dev/null +++ b/crates/torii/migrations/20240923155431_tps.sql @@ -0,0 +1,3 @@ +-- Add last_block_timestamp column for TPS calculation +ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER; +ALTER TABLE contracts ADD COLUMN tps INTEGER; \ No newline at end of file From 1d6ae93528682edf3734f78964219975fafd8f12 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 15:41:56 -0400 Subject: [PATCH 10/15] wild card felt zero --- crates/torii/grpc/src/server/subscriptions/indexer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs index 718f1d7eef..31025babc4 100644 --- a/crates/torii/grpc/src/server/subscriptions/indexer.rs +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -82,7 +82,8 @@ impl Service { let mut closed_stream = Vec::new(); for (idx, sub) in subs.subscribers.read().await.iter() { - if sub.contract_address != update.contract_address { + if sub.contract_address != Felt::ZERO && sub.contract_address != update.contract_address + { continue; } From caf38b20236657902b5805fbf591ed0788d6af52 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 15:42:34 -0400 Subject: [PATCH 11/15] cleamn --- crates/torii/grpc/src/server/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 193a0fa947..402e15f378 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -1053,7 +1053,7 @@ impl proto::world::world_server::World for DojoWorld { ) -> ServiceResult { let SubscribeIndexerRequest { contract_address } = request.into_inner(); let rx = self - .subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice())) + .subscribe_indexer(Felt::from_bytes_be_slice(&contract_address)) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream)) From 938eabf3490558f595cadd93e58df375d63ada36 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 16:26:26 -0400 Subject: [PATCH 12/15] fetch contract firsdt --- crates/torii/core/src/executor.rs | 12 +++------ crates/torii/core/src/types.rs | 8 +++--- crates/torii/grpc/proto/world.proto | 6 ++--- crates/torii/grpc/src/server/mod.rs | 2 +- .../grpc/src/server/subscriptions/indexer.rs | 26 ++++++++++++++----- crates/torii/grpc/src/types/mod.rs | 6 ++--- 6 files changed, 34 insertions(+), 26 deletions(-) diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 4626daf147..e0a99700f8 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -208,18 +208,14 @@ impl<'c> Executor<'c> { format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) })?; - sqlx::query("UPDATE contracts SET tps = ? WHERE id = ?") + let row = sqlx::query("UPDATE contracts SET tps = ? WHERE id = ? RETURNING *") .bind(tps as i64) .bind(format!("{:#x}", set_head.contract_address)) - .execute(&mut **tx) + .fetch_one(&mut **tx) .await?; - self.publish_queue.push(BrokerMessage::SetHead(ContractUpdated { - head: set_head.head, - tps, - last_block_timestamp: set_head.last_block_timestamp, - contract_address: set_head.contract_address, - })); + let contract = ContractUpdated::from_row(&row)?; + self.publish_queue.push(BrokerMessage::SetHead(contract)); } QueryType::SetEntity(entity) => { let row = query.fetch_one(&mut **tx).await.with_context(|| { diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index 29dedc8659..fdd845ea0d 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -88,8 +88,8 @@ pub struct Event { #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Contract { - pub head: u64, - pub tps: u64, - pub last_block_timestamp: u64, - pub contract_address: Felt, + pub head: i64, + pub tps: i64, + pub last_block_timestamp: i64, + pub contract_address: String, } diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index 1fc9173ec0..49b8ad0742 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -48,9 +48,9 @@ message SubscribeIndexerRequest { // A response containing indexer updates. message SubscribeIndexerResponse { - uint64 head = 1; - uint64 tps = 2; - uint64 last_block_timestamp = 3; + int64 head = 1; + int64 tps = 2; + int64 last_block_timestamp = 3; bytes contract_address = 4; } diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 402e15f378..6a496cc368 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -696,7 +696,7 @@ impl DojoWorld { contract_address: Felt, ) -> Result>, Error> { - self.indexer_manager.add_subscriber(contract_address).await + self.indexer_manager.add_subscriber(&self.pool, contract_address).await } async fn subscribe_models( diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs index 31025babc4..063b7655a2 100644 --- a/crates/torii/grpc/src/server/subscriptions/indexer.rs +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -1,15 +1,17 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; use std::task::{Context, Poll}; use futures::{Stream, StreamExt}; use rand::Rng; +use sqlx::{FromRow, Pool, Sqlite}; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; -use torii_core::error::Error; +use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; use torii_core::types::Contract as ContractUpdated; use tracing::{error, trace}; @@ -35,6 +37,7 @@ pub struct IndexerManager { impl IndexerManager { pub async fn add_subscriber( &self, + pool: &Pool, contract_address: Felt, ) -> Result>, Error> { @@ -44,11 +47,19 @@ impl IndexerManager { // NOTE: unlock issue with firefox/safari // initially send empty stream message to return from // initial subscribe call + let contract = sqlx::query( + "SELECT head, tps, last_block_timestamp, contract_address FROM contracts WHERE id = ?", + ) + .bind(format!("{:#x}", contract_address)) + .fetch_one(pool) + .await?; + let contract = ContractUpdated::from_row(&contract)?; + let _ = sender .send(Ok(SubscribeIndexerResponse { - head: 0, - tps: 0, - last_block_timestamp: 0, + head: contract.head, + tps: contract.tps, + last_block_timestamp: contract.last_block_timestamp, contract_address: contract_address.to_bytes_be().to_vec(), })) .await; @@ -80,10 +91,11 @@ impl Service { update: &ContractUpdated, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); + let contract_address = + Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?; for (idx, sub) in subs.subscribers.read().await.iter() { - if sub.contract_address != Felt::ZERO && sub.contract_address != update.contract_address - { + if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address { continue; } @@ -91,7 +103,7 @@ impl Service { head: update.head, tps: update.tps, last_block_timestamp: update.last_block_timestamp, - contract_address: update.contract_address.to_bytes_be().to_vec(), + contract_address: contract_address.to_bytes_be().to_vec(), }; if sub.sender.send(Ok(resp)).await.is_err() { diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index b9bf63d1e3..b47400d954 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -18,9 +18,9 @@ pub mod schema; #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] pub struct IndexerUpdate { - pub head: u64, - pub tps: u64, - pub last_block_timestamp: u64, + pub head: i64, + pub tps: i64, + pub last_block_timestamp: i64, pub contract_address: Felt, } From 9f57837138a60a424854c95fe2652988984afcba Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 16:35:39 -0400 Subject: [PATCH 13/15] handle m,uiltiple contracts --- crates/torii/core/src/types.rs | 2 +- .../grpc/src/server/subscriptions/indexer.rs | 43 +++++++++++-------- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index fdd845ea0d..e87a1205e7 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -85,7 +85,7 @@ pub struct Event { pub created_at: DateTime, } -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(FromRow, Deserialize, Debug, Clone, Default)] #[serde(rename_all = "camelCase")] pub struct Contract { pub head: i64, diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs index 063b7655a2..c740aa994d 100644 --- a/crates/torii/grpc/src/server/subscriptions/indexer.rs +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use futures::{Stream, StreamExt}; use rand::Rng; -use sqlx::{FromRow, Pool, Sqlite}; +use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; @@ -47,23 +47,30 @@ impl IndexerManager { // NOTE: unlock issue with firefox/safari // initially send empty stream message to return from // initial subscribe call - let contract = sqlx::query( - "SELECT head, tps, last_block_timestamp, contract_address FROM contracts WHERE id = ?", - ) - .bind(format!("{:#x}", contract_address)) - .fetch_one(pool) - .await?; - let contract = ContractUpdated::from_row(&contract)?; - - let _ = sender - .send(Ok(SubscribeIndexerResponse { - head: contract.head, - tps: contract.tps, - last_block_timestamp: contract.last_block_timestamp, - contract_address: contract_address.to_bytes_be().to_vec(), - })) - .await; - + let mut statement = + "SELECT head, tps, last_block_timestamp, contract_address FROM contracts".to_string(); + + let contracts: Vec = if contract_address != Felt::ZERO { + statement += " WHERE id = ?"; + + sqlx::query_as(&statement) + .bind(format!("{:#x}", contract_address)) + .fetch_all(pool) + .await? + } else { + sqlx::query_as(&statement).fetch_all(pool).await? + }; + + for contract in contracts { + let _ = sender + .send(Ok(SubscribeIndexerResponse { + head: contract.head, + tps: contract.tps, + last_block_timestamp: contract.last_block_timestamp, + contract_address: contract_address.to_bytes_be().to_vec(), + })) + .await; + } self.subscribers.write().await.insert(id, IndexerSubscriber { contract_address, sender }); Ok(receiver) From 4be7b4d7e3fcadcb3e86fa7f7234c88761fec23b Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 8 Oct 2024 09:49:58 -0400 Subject: [PATCH 14/15] clean --- crates/torii/grpc/src/server/subscriptions/indexer.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/torii/grpc/src/server/subscriptions/indexer.rs b/crates/torii/grpc/src/server/subscriptions/indexer.rs index c740aa994d..27315b6766 100644 --- a/crates/torii/grpc/src/server/subscriptions/indexer.rs +++ b/crates/torii/grpc/src/server/subscriptions/indexer.rs @@ -19,11 +19,11 @@ use tracing::{error, trace}; use crate::proto; use crate::proto::world::SubscribeIndexerResponse; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event"; +pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::indexer"; #[derive(Debug)] pub struct IndexerSubscriber { - /// Event keys that the subscriber is interested in + /// Contract address that the subscriber is interested in contract_address: Felt, /// The channel to send the response back to the subscriber. sender: Sender>, @@ -44,9 +44,6 @@ impl IndexerManager { let id = rand::thread_rng().gen::(); let (sender, receiver) = channel(1); - // NOTE: unlock issue with firefox/safari - // initially send empty stream message to return from - // initial subscribe call let mut statement = "SELECT head, tps, last_block_timestamp, contract_address FROM contracts".to_string(); @@ -119,7 +116,7 @@ impl Service { } for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing events stream."); + trace!(target = LOG_TARGET, id = %id, "Closing indexer updates stream."); subs.remove_subscriber(id).await } From 10c561de43afd55e8f8d474549edffc4a10177db Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 8 Oct 2024 09:59:40 -0400 Subject: [PATCH 15/15] torii client --- crates/torii/client/src/client/mod.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 42ca0e4c07..cff1cabda7 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -10,7 +10,7 @@ use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use tokio::sync::RwLock as AsyncRwLock; -use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming}; +use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming}; use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse}; use torii_grpc::types::schema::Entity; use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, Query}; @@ -156,6 +156,7 @@ impl Client { Ok(()) } + /// A direct stream to grpc subscribe starknet events pub async fn on_starknet_event( &self, keys: Option, @@ -164,4 +165,17 @@ impl Client { let stream = grpc_client.subscribe_events(keys).await?; Ok(stream) } + + /// Subscribe to indexer updates for a specific contract address. + /// If no contract address is provided, it will subscribe to updates for world contract. + pub async fn on_indexer_updated( + &self, + contract_address: Option, + ) -> Result { + let mut grpc_client = self.inner.write().await; + let stream = grpc_client + .subscribe_indexer(contract_address.unwrap_or(self.world_reader.address)) + .await?; + Ok(stream) + } }