From 33fd66c68505f681054584ccf59856177c22b7a6 Mon Sep 17 00:00:00 2001 From: Larko <59736843+Larkooo@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:55:14 -0400 Subject: [PATCH] refactor(torii-grpc): event subscription with multiple clauses (#2555) --- crates/torii/client/src/client/mod.rs | 4 +- crates/torii/grpc/proto/world.proto | 2 +- crates/torii/grpc/src/client.rs | 8 +- crates/torii/grpc/src/server/mod.rs | 9 +- .../grpc/src/server/subscriptions/entity.rs | 65 +-------- .../grpc/src/server/subscriptions/event.rs | 35 +---- .../src/server/subscriptions/event_message.rs | 65 +-------- .../grpc/src/server/subscriptions/mod.rs | 130 ++++++++++++++++++ 8 files changed, 152 insertions(+), 166 deletions(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index cff1cabda7..9f47b613f2 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -13,7 +13,7 @@ use tokio::sync::RwLock as AsyncRwLock; use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming}; use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse}; use torii_grpc::types::schema::Entity; -use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, Query}; +use torii_grpc::types::{EntityKeysClause, Event, EventQuery, Query}; use torii_relay::client::EventLoop; use torii_relay::types::Message; @@ -159,7 +159,7 @@ impl Client { /// A direct stream to grpc subscribe starknet events pub async fn on_starknet_event( &self, - keys: Option, + keys: Vec, ) -> Result { let mut grpc_client = self.inner.write().await; let stream = grpc_client.subscribe_events(keys).await?; diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index dc0fa26deb..fa3e6ef385 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -116,7 +116,7 @@ message RetrieveEventsResponse { } message SubscribeEventsRequest { - types.KeysClause keys = 1; + repeated types.EntityKeysClause keys = 1; } message SubscribeEventsResponse { diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index cfaff960de..0ca8c463e5 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -16,9 +16,7 @@ use crate::proto::world::{ UpdateEntitiesSubscriptionRequest, WorldMetadataRequest, }; use crate::types::schema::{Entity, SchemaError}; -use crate::types::{ - EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query, -}; +use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -211,9 +209,9 @@ impl WorldClient { /// Subscribe to the events of a World. pub async fn subscribe_events( &mut self, - keys: Option, + keys: Vec, ) -> Result { - let keys = keys.map(|c| c.into()); + let keys = keys.into_iter().map(|c| c.into()).collect(); let stream = self .inner diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 8ec0acaf5f..b1fb797836 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -857,9 +857,11 @@ impl DojoWorld { async fn subscribe_events( &self, - clause: proto::types::KeysClause, + clause: Vec, ) -> Result>, Error> { - self.event_manager.add_subscriber(clause.into()).await + self.event_manager + .add_subscriber(clause.into_iter().map(|keys| keys.into()).collect()) + .await } } @@ -1260,8 +1262,7 @@ impl proto::world::world_server::World for DojoWorld { &self, request: Request, ) -> ServiceResult { - let keys = request.into_inner().keys.unwrap_or_default(); - + let keys = request.into_inner().keys; let rx = self.subscribe_events(keys).await.map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEventsStream)) diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 8d3272edcf..ec09301def 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -19,9 +19,10 @@ use torii_core::sql::FELT_DELIMITER; use torii_core::types::OptimisticEntity; use tracing::{error, trace}; +use super::match_entity_keys; use crate::proto; use crate::proto::world::SubscribeEntityResponse; -use crate::types::{EntityKeysClause, PatternMatching}; +use crate::types::EntityKeysClause; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity"; @@ -128,67 +129,7 @@ impl Service { // If we have a clause of keys, then check that the key pattern of the entity // matches the key pattern of the subscriber. - if !sub.clauses.is_empty() - && !sub.clauses.iter().any(|clause| match clause { - EntityKeysClause::HashedKeys(hashed_keys) => { - hashed_keys.is_empty() || hashed_keys.contains(&hashed) - } - EntityKeysClause::Keys(clause) => { - // if we have a model clause, then we need to check that the entity - // has an updated model and that the model name matches the clause - if let Some(updated_model) = &entity.updated_model { - let name = updated_model.name(); - let (namespace, name) = name.split_once('-').unwrap(); - - if !clause.models.is_empty() - && !clause.models.iter().any(|clause_model| { - let (clause_namespace, clause_model) = - clause_model.split_once('-').unwrap(); - // if both namespace and model are empty, we should match all. - // if namespace is specified and model is empty or * we should - // match all models in the - // namespace if namespace - // and model are specified, we should match the - // specific model - (clause_namespace.is_empty() - || clause_namespace == namespace - || clause_namespace == "*") - && (clause_model.is_empty() - || clause_model == name - || clause_model == "*") - }) - { - return false; - } - } - - // if the key pattern doesnt match our subscribers key pattern, skip - // ["", "0x0"] would match with keys ["0x...", "0x0", ...] - if clause.pattern_matching == PatternMatching::FixedLen - && keys.len() != clause.keys.len() - { - return false; - } - - return keys.iter().enumerate().all(|(idx, key)| { - // this is going to be None if our key pattern overflows the subscriber - // key pattern in this case we should skip - let sub_key = clause.keys.get(idx); - - match sub_key { - // the key in the subscriber must match the key of the entity - // athis index - Some(Some(sub_key)) => key == sub_key, - // otherwise, if we have no key we should automatically match. - // or.. we overflowed the subscriber key pattern - // but we're in VariableLen pattern matching - // so we should match all next keys - _ => true, - } - }); - } - }) - { + if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) { continue; } diff --git a/crates/torii/grpc/src/server/subscriptions/event.rs b/crates/torii/grpc/src/server/subscriptions/event.rs index c1deb332cc..3d19c1fd1a 100644 --- a/crates/torii/grpc/src/server/subscriptions/event.rs +++ b/crates/torii/grpc/src/server/subscriptions/event.rs @@ -19,16 +19,17 @@ use torii_core::sql::FELT_DELIMITER; use torii_core::types::Event; use tracing::{error, trace}; +use super::match_keys; use crate::proto; use crate::proto::world::SubscribeEventsResponse; -use crate::types::{KeysClause, PatternMatching}; +use crate::types::EntityKeysClause; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event"; #[derive(Debug)] pub struct EventSubscriber { /// Event keys that the subscriber is interested in - keys: KeysClause, + keys: Vec, /// The channel to send the response back to the subscriber. sender: Sender>, } @@ -41,7 +42,7 @@ pub struct EventManager { impl EventManager { pub async fn add_subscriber( &self, - keys: KeysClause, + keys: Vec, ) -> Result>, Error> { let id = rand::thread_rng().gen::(); let (sender, receiver) = channel(1); @@ -108,33 +109,7 @@ impl Service { .map_err(ParseError::from)?; for (idx, sub) in subs.subscribers.read().await.iter() { - // if the key pattern doesnt match our subscribers key pattern, skip - // ["", "0x0"] would match with keys ["0x...", "0x0", ...] - if sub.keys.pattern_matching == PatternMatching::FixedLen - && keys.len() != sub.keys.keys.len() - { - continue; - } - - if !keys.iter().enumerate().all(|(idx, key)| { - // this is going to be None if our key pattern overflows the subscriber key pattern - // in this case we might want to list all events with the same - // key selector so we can match them all - let sub_key = sub.keys.keys.get(idx); - - // if we have a key in the subscriber, it must match the key in the event - // unless its empty, which is a wildcard - match sub_key { - // the key in the subscriber must match the key of the entity - // athis index - Some(Some(sub_key)) => key == sub_key, - // otherwise, if we have no key we should automatically match. - // or.. we overflowed the subscriber key pattern - // but we're in VariableLen pattern matching - // so we should match all next keys - _ => true, - } - }) { + if !match_keys(&keys, &sub.keys) { continue; } diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index 93de013cd3..c0aa33edfe 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -18,9 +18,10 @@ use torii_core::types::OptimisticEventMessage; use tracing::{error, trace}; use super::entity::EntitiesSubscriber; +use super::match_entity_keys; use crate::proto; use crate::proto::world::SubscribeEntityResponse; -use crate::types::{EntityKeysClause, PatternMatching}; +use crate::types::EntityKeysClause; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message"; @@ -120,67 +121,7 @@ impl Service { // If we have a clause of keys, then check that the key pattern of the entity // matches the key pattern of the subscriber. - if !sub.clauses.is_empty() - && !sub.clauses.iter().any(|clause| match clause { - EntityKeysClause::HashedKeys(hashed_keys) => { - hashed_keys.is_empty() || hashed_keys.contains(&hashed) - } - EntityKeysClause::Keys(clause) => { - // if we have a model clause, then we need to check that the entity - // has an updated model and that the model name matches the clause - if let Some(updated_model) = &entity.updated_model { - let name = updated_model.name(); - let (namespace, name) = name.split_once('-').unwrap(); - - if !clause.models.is_empty() - && !clause.models.iter().any(|clause_model| { - let (clause_namespace, clause_model) = - clause_model.split_once('-').unwrap(); - // if both namespace and model are empty, we should match all. - // if namespace is specified and model is empty or * we should - // match all models in the - // namespace if namespace - // and model are specified, we should match the - // specific model - (clause_namespace.is_empty() - || clause_namespace == namespace - || clause_namespace == "*") - && (clause_model.is_empty() - || clause_model == name - || clause_model == "*") - }) - { - return false; - } - } - - // if the key pattern doesnt match our subscribers key pattern, skip - // ["", "0x0"] would match with keys ["0x...", "0x0", ...] - if clause.pattern_matching == PatternMatching::FixedLen - && keys.len() != clause.keys.len() - { - return false; - } - - return keys.iter().enumerate().all(|(idx, key)| { - // this is going to be None if our key pattern overflows the subscriber - // key pattern in this case we should skip - let sub_key = clause.keys.get(idx); - - match sub_key { - // the key in the subscriber must match the key of the entity - // athis index - Some(Some(sub_key)) => key == sub_key, - // otherwise, if we have no key we should automatically match. - // or.. we overflowed the subscriber key pattern - // but we're in VariableLen pattern matching - // so we should match all next keys - _ => true, - } - }); - } - }) - { + if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) { continue; } diff --git a/crates/torii/grpc/src/server/subscriptions/mod.rs b/crates/torii/grpc/src/server/subscriptions/mod.rs index 3a44537427..b58810d611 100644 --- a/crates/torii/grpc/src/server/subscriptions/mod.rs +++ b/crates/torii/grpc/src/server/subscriptions/mod.rs @@ -1,6 +1,136 @@ +use dojo_types::schema::Ty; +use starknet_crypto::{poseidon_hash_many, Felt}; + +use crate::types::{EntityKeysClause, PatternMatching}; + pub mod entity; pub mod error; pub mod event; pub mod event_message; pub mod indexer; pub mod model_diff; + +pub(crate) fn match_entity_keys( + id: Felt, + keys: &[Felt], + updated_model: &Option, + clauses: &[EntityKeysClause], +) -> bool { + // Check if the subscriber is interested in this entity + // If we have a clause of hashed keys, then check that the id of the entity + // is in the list of hashed keys. + + // If we have a clause of keys, then check that the key pattern of the entity + // matches the key pattern of the subscriber. + if !clauses.is_empty() + && !clauses.iter().any(|clause| match clause { + EntityKeysClause::HashedKeys(hashed_keys) => { + hashed_keys.is_empty() || hashed_keys.contains(&id) + } + EntityKeysClause::Keys(clause) => { + // if we have a model clause, then we need to check that the entity + // has an updated model and that the model name matches the clause + if let Some(updated_model) = &updated_model { + let name = updated_model.name(); + let (namespace, name) = name.split_once('-').unwrap(); + + if !clause.models.is_empty() + && !clause.models.iter().any(|clause_model| { + let (clause_namespace, clause_model) = + clause_model.split_once('-').unwrap(); + // if both namespace and model are empty, we should match all. + // if namespace is specified and model is empty or * we should + // match all models in the + // namespace if namespace + // and model are specified, we should match the + // specific model + (clause_namespace.is_empty() + || clause_namespace == namespace + || clause_namespace == "*") + && (clause_model.is_empty() + || clause_model == name + || clause_model == "*") + }) + { + return false; + } + } + + // if the key pattern doesnt match our subscribers key pattern, skip + // ["", "0x0"] would match with keys ["0x...", "0x0", ...] + if clause.pattern_matching == PatternMatching::FixedLen + && keys.len() != clause.keys.len() + { + return false; + } + + return keys.iter().enumerate().all(|(idx, key)| { + // this is going to be None if our key pattern overflows the subscriber + // key pattern in this case we should skip + let sub_key = clause.keys.get(idx); + + match sub_key { + // the key in the subscriber must match the key of the entity + // athis index + Some(Some(sub_key)) => key == sub_key, + // otherwise, if we have no key we should automatically match. + // or.. we overflowed the subscriber key pattern + // but we're in VariableLen pattern matching + // so we should match all next keys + _ => true, + } + }); + } + }) + { + return false; + } + + true +} + +pub(crate) fn match_keys(keys: &[Felt], clauses: &[EntityKeysClause]) -> bool { + // Check if the subscriber is interested in this entity + // If we have a clause of hashed keys, then check that the id of the entity + // is in the list of hashed keys. + + // If we have a clause of keys, then check that the key pattern of the entity + // matches the key pattern of the subscriber. + if !clauses.is_empty() + && !clauses.iter().any(|clause| match clause { + EntityKeysClause::HashedKeys(hashed_keys) => { + hashed_keys.is_empty() || hashed_keys.contains(&poseidon_hash_many(keys)) + } + EntityKeysClause::Keys(clause) => { + // if the key pattern doesnt match our subscribers key pattern, skip + // ["", "0x0"] would match with keys ["0x...", "0x0", ...] + if clause.pattern_matching == PatternMatching::FixedLen + && keys.len() != clause.keys.len() + { + return false; + } + + return keys.iter().enumerate().all(|(idx, key)| { + // this is going to be None if our key pattern overflows the subscriber + // key pattern in this case we should skip + let sub_key = clause.keys.get(idx); + + match sub_key { + // the key in the subscriber must match the key of the entity + // athis index + Some(Some(sub_key)) => key == sub_key, + // otherwise, if we have no key we should automatically match. + // or.. we overflowed the subscriber key pattern + // but we're in VariableLen pattern matching + // so we should match all next keys + _ => true, + } + }); + } + }) + { + return false; + } + + true +}