Skip to content

Commit

Permalink
serilization to json
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Nov 20, 2024
1 parent 77ece4c commit 84d8f93
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ async fn main() -> anyhow::Result<()> {
options = options.journal_mode(SqliteJournalMode::Wal);
options = options.synchronous(SqliteSynchronous::Normal);

let pool = SqlitePoolOptions::new().min_connections(1).connect_with(options).await?;
let pool = SqlitePoolOptions::new().min_connections(1).connect_with(options.clone()).await?;

let readonly_options = options.read_only(true);
let readonly_pool = SqlitePoolOptions::new()
.min_connections(1)
.max_connections(100)
.connect_with(readonly_options)
.await?;

// Set the number of threads based on CPU count
let cpu_count = std::thread::available_parallelism().unwrap().get();
Expand All @@ -120,7 +127,7 @@ async fn main() -> anyhow::Result<()> {
.await?;
let executor_handle = tokio::spawn(async move { executor.run().await });

let model_cache = Arc::new(ModelCache::new(pool.clone()));
let model_cache = Arc::new(ModelCache::new(readonly_pool.clone()));
let db = Sql::new(pool.clone(), sender.clone(), &args.indexing.contracts, model_cache.clone())
.await?;

Expand Down Expand Up @@ -166,7 +173,7 @@ async fn main() -> anyhow::Result<()> {
let shutdown_rx = shutdown_tx.subscribe();
let (grpc_addr, grpc_server) = torii_grpc::server::new(
shutdown_rx,
&pool,
&readonly_pool,
block_rx,
world_address,
Arc::clone(&provider),
Expand All @@ -181,8 +188,12 @@ async fn main() -> anyhow::Result<()> {
tokio::fs::create_dir_all(&artifacts_path).await?;
let absolute_path = artifacts_path.canonicalize_utf8()?;

let (artifacts_addr, artifacts_server) =
torii_server::artifacts::new(shutdown_tx.subscribe(), &absolute_path, pool.clone()).await?;
let (artifacts_addr, artifacts_server) = torii_server::artifacts::new(
shutdown_tx.subscribe(),
&absolute_path,
readonly_pool.clone(),
)
.await?;

let mut libp2p_relay_server = torii_relay::server::Relay::new(
db,
Expand All @@ -203,12 +214,12 @@ async fn main() -> anyhow::Result<()> {
Some(grpc_addr),
None,
Some(artifacts_addr),
Arc::new(pool.clone()),
Arc::new(readonly_pool.clone()),
));

let graphql_server = spawn_rebuilding_graphql_server(
shutdown_tx.clone(),
pool.into(),
readonly_pool.into(),
args.external_url,
proxy_server.clone(),
);
Expand Down
1 change: 1 addition & 0 deletions crates/torii/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ tower-http.workspace = true
tower.workspace = true
tracing.workspace = true
warp.workspace = true
form_urlencoded = "1.2.1"
102 changes: 68 additions & 34 deletions crates/torii/server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

use base64::Engine;
use http::header::CONTENT_TYPE;
use http::{HeaderName, Method};
use hyper::client::connect::dns::GaiResolver;
Expand All @@ -16,7 +17,8 @@ use tokio::sync::RwLock;
use tower::ServiceBuilder;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::error;
use sqlx::SqlitePool;
use sqlx::{SqlitePool, Row, Column, TypeInfo};
use base64::engine::general_purpose::STANDARD;

const DEFAULT_ALLOW_HEADERS: [&str; 13] = [
"accept",
Expand Down Expand Up @@ -134,11 +136,13 @@ impl Proxy {
),
});

let pool_clone = pool.clone();
let graphql_addr_clone = graphql_addr.clone();
let service = ServiceBuilder::new().option_layer(cors).service_fn(move |req| {
let pool = pool.clone();
let pool = pool_clone.clone();
let graphql_addr = graphql_addr_clone.clone();
async move {
let graphql_addr = graphql_addr_clone.clone();
let graphql_addr = graphql_addr.read().await;
handle(remote_addr, grpc_addr, artifacts_addr, *graphql_addr, pool, req).await
}
});
Expand Down Expand Up @@ -231,42 +235,72 @@ async fn handle(
}

if req.uri().path().starts_with("/sql") {
if req.method() != Method::POST {
let query = if req.method() == Method::GET {
// Get the query from URL parameters
let params = req.uri().query().unwrap_or_default();
form_urlencoded::parse(params.as_bytes())
.find(|(key, _)| key == "q")
.map(|(_, value)| value.to_string())
.unwrap_or_default()
} else if req.method() == Method::POST {
// Get the query from request body
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap_or_default();
String::from_utf8(body_bytes.to_vec()).unwrap_or_default()
} else {
return Ok(Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(Body::from("Only POST method is allowed"))
.body(Body::from("Only GET and POST methods are allowed"))
.unwrap());
}

let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap_or_default();
let query = String::from_utf8(body_bytes.to_vec()).unwrap_or_default();
};

if !query.trim().to_uppercase().starts_with("SELECT") {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Only SELECT queries are allowed"))
.unwrap());
}
// Execute the query in a read-only transaction
return match sqlx::query(&query)
.fetch_all(&*pool)
.await {
Ok(rows) => {
let result: Vec<_> = rows.iter()
.map(|row| {
let mut obj = serde_json::Map::new();
for (i, column) in row.columns().iter().enumerate() {
let value: serde_json::Value = match column.type_info().name() {
"TEXT" => row.get::<Option<String>, _>(i)
.map_or(serde_json::Value::Null, serde_json::Value::String),
"INTEGER" => row.get::<Option<i64>, _>(i)
.map_or(serde_json::Value::Null, |n| serde_json::Value::Number(n.into())),
"REAL" => row.get::<Option<f64>, _>(i)
.map_or(serde_json::Value::Null, |f|
serde_json::Number::from_f64(f)
.map_or(serde_json::Value::Null, serde_json::Value::Number)
),
"BLOB" => row.get::<Option<Vec<u8>>, _>(i)
.map_or(serde_json::Value::Null, |bytes|
serde_json::Value::String(STANDARD.encode(bytes))
),
_ => row.get::<Option<String>, _>(i)
.map_or(serde_json::Value::Null, serde_json::Value::String),
};
obj.insert(column.name().to_string(), value);
}
serde_json::Value::Object(obj)
})
.collect();

return match sqlx::query(&query).fetch_all(&*pool).await {
Ok(rows) => {
let json = serde_json::to_string(
&rows.iter()
.map(|row| row.columns().iter().map(|col| col.name()).collect::<Vec<_>>())
.collect::<Vec<_>>()
).unwrap();

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(json))
.unwrap())
}
Err(e) => Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(format!("Query error: {}", e)))
.unwrap()),
};
let json = serde_json::to_string(&result).unwrap();

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(json))
.unwrap())
}
Err(e) => {
error!("SQL query error: {:?}", e);
Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Query error"))
.unwrap())
}
};
}

let json = json!({
Expand Down

0 comments on commit 84d8f93

Please sign in to comment.