Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): sql proxy endpoint for querying #2706

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 18 additions & 6 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@
options = options.journal_mode(SqliteJournalMode::Wal);
options = options.synchronous(SqliteSynchronous::Normal);

let pool = SqlitePoolOptions::new().min_connections(1).connect_with(options).await?;
let pool = SqlitePoolOptions::new().min_connections(1).connect_with(options.clone()).await?;

let readonly_options = options.read_only(true);
let readonly_pool = SqlitePoolOptions::new()
.min_connections(1)
.max_connections(100)
.connect_with(readonly_options)
.await?;

Check warning on line 107 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L100-L107

Added lines #L100 - L107 were not covered by tests

// Set the number of threads based on CPU count
let cpu_count = std::thread::available_parallelism().unwrap().get();
Expand All @@ -120,7 +127,7 @@
.await?;
let executor_handle = tokio::spawn(async move { executor.run().await });

let model_cache = Arc::new(ModelCache::new(pool.clone()));
let model_cache = Arc::new(ModelCache::new(readonly_pool.clone()));

Check warning on line 130 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L130

Added line #L130 was not covered by tests
let db = Sql::new(pool.clone(), sender.clone(), &args.indexing.contracts, model_cache.clone())
.await?;

Expand Down Expand Up @@ -166,7 +173,7 @@
let shutdown_rx = shutdown_tx.subscribe();
let (grpc_addr, grpc_server) = torii_grpc::server::new(
shutdown_rx,
&pool,
&readonly_pool,

Check warning on line 176 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L176

Added line #L176 was not covered by tests
block_rx,
world_address,
Arc::clone(&provider),
Expand All @@ -181,8 +188,12 @@
tokio::fs::create_dir_all(&artifacts_path).await?;
let absolute_path = artifacts_path.canonicalize_utf8()?;

let (artifacts_addr, artifacts_server) =
torii_server::artifacts::new(shutdown_tx.subscribe(), &absolute_path, pool.clone()).await?;
let (artifacts_addr, artifacts_server) = torii_server::artifacts::new(
shutdown_tx.subscribe(),
&absolute_path,
readonly_pool.clone(),
)
.await?;

Check warning on line 196 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L191-L196

Added lines #L191 - L196 were not covered by tests

let mut libp2p_relay_server = torii_relay::server::Relay::new(
db,
Expand All @@ -203,11 +214,12 @@
Some(grpc_addr),
None,
Some(artifacts_addr),
Arc::new(readonly_pool.clone()),

Check warning on line 217 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L217

Added line #L217 was not covered by tests
));

let graphql_server = spawn_rebuilding_graphql_server(
shutdown_tx.clone(),
pool.into(),
readonly_pool.into(),

Check warning on line 222 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L222

Added line #L222 was not covered by tests
args.external_url,
proxy_server.clone(),
);
Expand Down
1 change: 1 addition & 0 deletions crates/torii/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ tower-http.workspace = true
tower.workspace = true
tracing.workspace = true
warp.workspace = true
form_urlencoded = "1.2.1"
97 changes: 93 additions & 4 deletions crates/torii/server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use std::sync::Arc;
use std::time::Duration;

use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use http::header::CONTENT_TYPE;
use http::{HeaderName, Method};
use hyper::client::connect::dns::GaiResolver;
Expand All @@ -12,11 +14,14 @@
use hyper::{Body, Client, Request, Response, Server, StatusCode};
use hyper_reverse_proxy::ReverseProxy;
use serde_json::json;
use sqlx::{Column, Row, SqlitePool, TypeInfo};
use tokio::sync::RwLock;
use tower::ServiceBuilder;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::error;

pub(crate) const LOG_TARGET: &str = "torii::server::proxy";

const DEFAULT_ALLOW_HEADERS: [&str; 13] = [
"accept",
"origin",
Expand Down Expand Up @@ -60,6 +65,7 @@
grpc_addr: Option<SocketAddr>,
artifacts_addr: Option<SocketAddr>,
graphql_addr: Arc<RwLock<Option<SocketAddr>>>,
pool: Arc<SqlitePool>,
}

impl Proxy {
Expand All @@ -69,13 +75,15 @@
grpc_addr: Option<SocketAddr>,
graphql_addr: Option<SocketAddr>,
artifacts_addr: Option<SocketAddr>,
pool: Arc<SqlitePool>,

Check warning on line 78 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L78

Added line #L78 was not covered by tests
) -> Self {
Self {
addr,
allowed_origins,
grpc_addr,
graphql_addr: Arc::new(RwLock::new(graphql_addr)),
artifacts_addr,
pool,

Check warning on line 86 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L86

Added line #L86 was not covered by tests
}
}

Expand All @@ -93,6 +101,7 @@
let grpc_addr = self.grpc_addr;
let graphql_addr = self.graphql_addr.clone();
let artifacts_addr = self.artifacts_addr;
let pool = self.pool.clone();

Check warning on line 104 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L104

Added line #L104 was not covered by tests

let make_svc = make_service_fn(move |conn: &AddrStream| {
let remote_addr = conn.remote_addr().ip();
Expand Down Expand Up @@ -129,12 +138,14 @@
),
});

let pool_clone = pool.clone();

Check warning on line 141 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L141

Added line #L141 was not covered by tests
let graphql_addr_clone = graphql_addr.clone();
let service = ServiceBuilder::new().option_layer(cors).service_fn(move |req| {
let pool = pool_clone.clone();

Check warning on line 144 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L144

Added line #L144 was not covered by tests
let graphql_addr = graphql_addr_clone.clone();
async move {
let graphql_addr = graphql_addr.read().await;
handle(remote_addr, grpc_addr, artifacts_addr, *graphql_addr, req).await
handle(remote_addr, grpc_addr, artifacts_addr, *graphql_addr, pool, req).await

Check warning on line 148 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L148

Added line #L148 was not covered by tests
}
});

Expand All @@ -156,6 +167,7 @@
grpc_addr: Option<SocketAddr>,
artifacts_addr: Option<SocketAddr>,
graphql_addr: Option<SocketAddr>,
pool: Arc<SqlitePool>,

Check warning on line 170 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L170

Added line #L170 was not covered by tests
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
if req.uri().path().starts_with("/static") {
Expand All @@ -165,7 +177,7 @@
return match GRAPHQL_PROXY_CLIENT.call(client_ip, &artifacts_addr, req).await {
Ok(response) => Ok(response),
Err(_error) => {
error!("{:?}", _error);
error!(target: LOG_TARGET, "Artifacts proxy error: {:?}", _error);

Check warning on line 180 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L180

Added line #L180 was not covered by tests
Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
Expand All @@ -186,7 +198,7 @@
return match GRAPHQL_PROXY_CLIENT.call(client_ip, &graphql_addr, req).await {
Ok(response) => Ok(response),
Err(_error) => {
error!("{:?}", _error);
error!(target: LOG_TARGET, "GraphQL proxy error: {:?}", _error);

Check warning on line 201 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L201

Added line #L201 was not covered by tests
Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
Expand All @@ -208,7 +220,7 @@
return match GRPC_PROXY_CLIENT.call(client_ip, &grpc_addr, req).await {
Ok(response) => Ok(response),
Err(_error) => {
error!("{:?}", _error);
error!(target: LOG_TARGET, "GRPC proxy error: {:?}", _error);

Check warning on line 223 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L223

Added line #L223 was not covered by tests
Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
Expand All @@ -224,6 +236,83 @@
}
}

if req.uri().path().starts_with("/sql") {
let query = if req.method() == Method::GET {

Check warning on line 240 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L239-L240

Added lines #L239 - L240 were not covered by tests
// Get the query from URL parameters
let params = req.uri().query().unwrap_or_default();
form_urlencoded::parse(params.as_bytes())
.find(|(key, _)| key == "q")
.map(|(_, value)| value.to_string())
.unwrap_or_default()
} else if req.method() == Method::POST {

Check warning on line 247 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L242-L247

Added lines #L242 - L247 were not covered by tests
// Get the query from request body
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap_or_default();
String::from_utf8(body_bytes.to_vec()).unwrap_or_default()

Check warning on line 250 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L249-L250

Added lines #L249 - L250 were not covered by tests
Comment on lines +249 to +250
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Avoid silently ignoring errors when reading request body

Using unwrap_or_default() when reading the request body may mask errors that should be handled appropriately. Consider properly handling errors when converting the request body to bytes and when converting bytes to a UTF-8 string.

Apply this diff to handle errors explicitly:

- let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap_or_default();
- String::from_utf8(body_bytes.to_vec()).unwrap_or_default()
+ let body_bytes = match hyper::body::to_bytes(req.into_body()).await {
+     Ok(bytes) => bytes,
+     Err(e) => {
+         error!(target: LOG_TARGET, "Failed to read request body: {:?}", e);
+         return Ok(Response::builder()
+             .status(StatusCode::BAD_REQUEST)
+             .body(Body::from("Failed to read request body"))
+             .unwrap());
+     }
+ };
+ let query = match String::from_utf8(body_bytes.to_vec()) {
+     Ok(str) => str,
+     Err(e) => {
+         error!(target: LOG_TARGET, "Invalid UTF-8 sequence: {:?}", e);
+         return Ok(Response::builder()
+             .status(StatusCode::BAD_REQUEST)
+             .body(Body::from("Invalid UTF-8 sequence in request body"))
+             .unwrap());
+     }
+ };

Committable suggestion skipped: line range outside the PR's diff.

} else {
return Ok(Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(Body::from("Only GET and POST methods are allowed"))
.unwrap());

Check warning on line 255 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L252-L255

Added lines #L252 - L255 were not covered by tests
};

// Execute the query
return match sqlx::query(&query).fetch_all(&*pool).await {
Ok(rows) => {
let result: Vec<_> = rows
.iter()
.map(|row| {
let mut obj = serde_json::Map::new();
for (i, column) in row.columns().iter().enumerate() {
let value: serde_json::Value = match column.type_info().name() {
"TEXT" => row
.get::<Option<String>, _>(i)
.map_or(serde_json::Value::Null, serde_json::Value::String),

Check warning on line 269 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L259-L269

Added lines #L259 - L269 were not covered by tests
// for operators like count(*) the type info is NULL
// so we default to a number
"INTEGER" | "NULL" => row
.get::<Option<i64>, _>(i)
.map_or(serde_json::Value::Null, |n| {
serde_json::Value::Number(n.into())
}),
"REAL" => row.get::<Option<f64>, _>(i).map_or(
serde_json::Value::Null,
|f| {
serde_json::Number::from_f64(f).map_or(
serde_json::Value::Null,
serde_json::Value::Number,
)
},
),
"BLOB" => row
.get::<Option<Vec<u8>>, _>(i)
.map_or(serde_json::Value::Null, |bytes| {
serde_json::Value::String(STANDARD.encode(bytes))
}),
_ => row
.get::<Option<String>, _>(i)
.map_or(serde_json::Value::Null, serde_json::Value::String),

Check warning on line 293 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L272-L293

Added lines #L272 - L293 were not covered by tests
};
obj.insert(column.name().to_string(), value);

Check warning on line 295 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L295

Added line #L295 was not covered by tests
}
serde_json::Value::Object(obj)
})
.collect();

let json = serde_json::to_string(&result).unwrap();

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(json))
.unwrap())

Check warning on line 307 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L297-L307

Added lines #L297 - L307 were not covered by tests
}
Err(e) => Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(format!("Query error: {:?}", e)))
.unwrap()),

Check warning on line 312 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L309-L312

Added lines #L309 - L312 were not covered by tests
Comment on lines +310 to +312
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid exposing detailed error messages to clients

Including detailed error information in HTTP responses can reveal sensitive internal details and pose a security risk. It's better to log the error internally and return a generic message to the client.

Apply this diff to return a generic error message:

-                Err(e) => Ok(Response::builder()
-                    .status(StatusCode::BAD_REQUEST)
-                    .body(Body::from(format!("Query error: {:?}", e)))
-                    .unwrap()),
+                Err(e) => {
+                    error!(target: LOG_TARGET, "Query execution error: {:?}", e);
+                    Ok(Response::builder()
+                        .status(StatusCode::BAD_REQUEST)
+                        .body(Body::from("Invalid query"))
+                        .unwrap())
+                },

Committable suggestion skipped: line range outside the PR's diff.

};
Comment on lines +259 to +313
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement safeguards against resource exhaustion

Ohayo, sensei! Executing arbitrary SQL queries without limitations can lead to performance issues, such as high memory usage or slow response times if large result sets are returned. Consider implementing safeguards like limiting the number of rows returned or restricting the types of queries that can be executed.

For example, you could limit the number of rows by appending LIMIT to the query if not already present. Alternatively, define an allowlist of permitted queries or require queries to match certain patterns.

}
Comment on lines +239 to +314
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider restricting or securing the /sql endpoint

Ohayo, sensei! Exposing an endpoint that allows execution of arbitrary SQL queries can be a significant security risk, even with a read-only database connection. It could lead to data leakage or other unintended consequences.

Consider the following options:

  • Restrict the queries: Allow only specific queries or enforce a whitelist of allowed statements.
  • Add authentication: Require authentication to access the endpoint to prevent unauthorized use.
  • Use parameterized queries: Although not directly applicable here, ensure any dynamic inputs are safely handled.

Would you like assistance in refactoring this endpoint to enhance security?


Check warning on line 315 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L314-L315

Added lines #L314 - L315 were not covered by tests
let json = json!({
"service": "torii",
"success": true
Expand Down
Loading