Skip to content

Commit

Permalink
use vec
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Oct 2, 2024
1 parent 61f0a4b commit 63cca75
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::VecDeque;
use std::mem;

use anyhow::{Context, Result};
Expand Down Expand Up @@ -61,7 +60,7 @@ pub enum QueryType {
pub struct Executor<'c> {
pool: Pool<Sqlite>,
transaction: Transaction<'c, Sqlite>,
publish_queue: VecDeque<BrokerMessage>,
publish_queue: Vec<BrokerMessage>,
rx: UnboundedReceiver<QueryMessage>,
shutdown_rx: Receiver<()>,
}
Expand Down Expand Up @@ -130,7 +129,7 @@ impl<'c> Executor<'c> {
) -> Result<(Self, UnboundedSender<QueryMessage>)> {
let (tx, rx) = unbounded_channel();
let transaction = pool.begin().await?;
let publish_queue = VecDeque::new();
let publish_queue = Vec::new();
let shutdown_rx = shutdown_tx.subscribe();

Ok((Executor { pool, transaction, publish_queue, rx, shutdown_rx }, tx))
Expand Down Expand Up @@ -187,7 +186,7 @@ impl<'c> Executor<'c> {
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push_back(broker_message);
self.publish_queue.push(broker_message);
}
QueryType::DeleteEntity(entity) => {
let delete_model = query.execute(&mut **tx).await.with_context(|| {
Expand Down Expand Up @@ -227,14 +226,14 @@ impl<'c> Executor<'c> {
}

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push_back(broker_message);
self.publish_queue.push(broker_message);
}
QueryType::RegisterModel => {
let row = query.fetch_one(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)

Check warning on line 233 in crates/torii/core/src/executor.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor.rs#L233

Added line #L233 was not covered by tests
})?;
let model_registered = ModelRegistered::from_row(&row)?;
self.publish_queue.push_back(BrokerMessage::ModelRegistered(model_registered));
self.publish_queue.push(BrokerMessage::ModelRegistered(model_registered));
}
QueryType::EventMessage(entity) => {
let row = query.fetch_one(&mut **tx).await.with_context(|| {
Expand All @@ -243,14 +242,14 @@ impl<'c> Executor<'c> {
let mut event_message = EventMessageUpdated::from_row(&row)?;
event_message.updated_model = Some(entity);
let broker_message = BrokerMessage::EventMessageUpdated(event_message);
self.publish_queue.push_back(broker_message);
self.publish_queue.push(broker_message);
}
QueryType::StoreEvent => {
let row = query.fetch_one(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)

Check warning on line 249 in crates/torii/core/src/executor.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor.rs#L249

Added line #L249 was not covered by tests
})?;
let event = EventEmitted::from_row(&row)?;
self.publish_queue.push_back(BrokerMessage::EventEmitted(event));
self.publish_queue.push(BrokerMessage::EventEmitted(event));
}
QueryType::Execute => {
debug!(target: LOG_TARGET, "Executing query.");
Expand Down Expand Up @@ -280,7 +279,7 @@ impl<'c> Executor<'c> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.commit().await?;

while let Some(message) = self.publish_queue.pop_front() {
for message in self.publish_queue.drain(..) {
send_broker_message(message);
}

Expand Down

0 comments on commit 63cca75

Please sign in to comment.