Skip to content

Commit

Permalink
Playing with tokio/futures/async
Browse files Browse the repository at this point in the history
  • Loading branch information
Encephala committed Jun 10, 2024
1 parent 0154ba1 commit 05472df
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 115 deletions.
11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
name = "rusty-db"
version = "0.1.0"
edition = "2021"
license = "MIT"
description = "A non-production-ready-and-will-never-be database written for my own learning purposes"

[workspace.package]
license = "MIT"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "*", features = ["full"]}
futures = "*"

sql-parse.workspace = true
dbms.workspace = true
Expand All @@ -17,9 +20,5 @@ dbms.workspace = true
sql-parse = { path = "./sql-parse", version = "0.1.0" }
dbms = { path = "./dbms", version = "0.1.0" }


[workspace]
members = [
"sql-parse",
"dbms"
]
members = ["cli"]
14 changes: 14 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "cli"
version = "0.1.0"
edition = "2021"
license.workspace = true
description = "CLI client for the database"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "*", features = ["full"]}

sql-parse.workspace = true
dbms.workspace = true
114 changes: 114 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#![allow(clippy::needless_return)]

use std::io::Write;
use std::path::PathBuf;

use sql_parse::{Lexer, parse_statement, Statement, CreateType};
use dbms::{Execute, Database, DatabaseName, ExecutionResult, PersistenceManager, FileSystem, SerialisationManager, Serialiser};

async fn repl() {
let stdin = std::io::stdin();
let mut stdout = std::io::stdout();

let mut database: Option<Database> = None;

let persistence_manager: Box<_> = FileSystem::new(
SerialisationManager::new(Serialiser::V2),
PathBuf::from("/tmp/rusty-db"),
).into();

loop {
print!(">> ");

stdout.flush().unwrap();

let mut input = String::new();

stdin.read_line(&mut input).unwrap();

if input == "\\q\n" {
break;
} else if input.is_empty() {
println!();
break;
}

// TODO: Standardise handling these special commands
if input.starts_with("\\c ") {
let database_name = input.strip_prefix("\\c ").unwrap().strip_suffix('\n').unwrap();

database = match persistence_manager.load_database(DatabaseName(database_name.into())).await {
Ok(db) => {
println!("Connected to database {}", db.name.0);

Some(db)
},
Err(error) => {
println!("Got execution error: {error:?}");

None
},
};

continue;
}

if input.starts_with("\\l ") {
let tokens = Lexer::lex(input.strip_prefix("\\l ").unwrap());

println!("Lexed: {tokens:?}");

continue;
}

let statement = parse_statement(&input);

if input.starts_with("\\p ") {
let statement = parse_statement(input.strip_prefix("\\p ").unwrap());

println!("Parsed: {statement:?}");

continue;
}

if let Some(statement) = statement {
let is_create_database = matches!(statement, Statement::Create { what: CreateType::Database, .. });
let is_drop_database = matches!(statement, Statement::Drop { what: CreateType::Database, .. });

let result = statement.execute(database.as_mut(), persistence_manager.as_ref()).await;

match result {
Ok(result) => {
match result {
ExecutionResult::None => (),
an_actual_result => println!("Executed:\n{an_actual_result:?}"),
}
},
Err(error) => {
println!("Got execution error: {error:?}");

// Don't persist storage if statement failed
continue;
}
}

if is_create_database || is_drop_database {
continue;
}

// TODO: doing this properly, should only write changed things
// Also I can probably do better than the `is_drop_database` above
match persistence_manager.save_database(database.as_ref().unwrap()).await {
Ok(_) => (),
Err(error) => println!("Failed saving to disk: {error:?}"),
}
} else {
println!("Failed to parse: {input}");
}
}
}

#[tokio::main]
async fn main() {
repl().await;
}
2 changes: 1 addition & 1 deletion dbms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "dbms"
version = "0.1.0"
edition = "2021"
license = "MIT"
license.workspace = true
description = "Database management system"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod types;
mod evaluate;
mod utils;
mod persistence;
mod server;

use types::{ColumnName, ColumnValue, TableName};
use sql_parse::{ColumnType, Expression, InfixOperator};
Expand All @@ -15,6 +16,7 @@ pub use database::Database;
pub use types::DatabaseName;
pub use evaluate::{Execute, ExecutionResult};
pub use persistence::{PersistenceManager, FileSystem, SerialisationManager, Serialiser};
pub use server::handle_connection;



Expand Down Expand Up @@ -49,6 +51,9 @@ pub enum SqlError {
NotABoolean(u8),

IncompatibleVersion(u8),

CouldNotWriteToConnection(std::io::Error),
CouldNotReadFromConnection(std::io::Error),
}

pub type Result<T> = std::result::Result<T, SqlError>;
22 changes: 22 additions & 0 deletions dbms/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::{io::{Read, Write}, net::TcpStream};

use crate::{Result, SqlError};
// use sql_parse::parse_statement;

pub async fn handle_connection(mut stream: TcpStream) -> Result<()> {
write_welcome(&mut stream)?;

let buf = &mut vec![];
stream.read_to_end(buf)
.map_err(SqlError::CouldNotReadFromConnection)?;

// Handle message
println!("Got message {}", std::str::from_utf8(buf).unwrap());

return Ok(());
}

fn write_welcome(stream: &mut TcpStream) -> Result<()> {
return stream.write_all(&[0x48, 0x45, 0x4C, 0x4C, 0x4F])
.map_err(SqlError::CouldNotWriteToConnection);
}
2 changes: 1 addition & 1 deletion sql-parse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "sql-parse"
version = "0.1.0"
edition = "2021"
license = "MIT"
license.workspace = true
description = "SQL parser"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
138 changes: 31 additions & 107 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,38 @@
#![allow(clippy::needless_return)]
use std::net::TcpListener;

use std::io::Write;
use std::path::PathBuf;
use tokio::{spawn, task::JoinHandle};
use futures::future::join_all;

use sql_parse::{Lexer, parse_statement, Statement, CreateType};
use dbms::{Execute, Database, DatabaseName, ExecutionResult, PersistenceManager, FileSystem, SerialisationManager, Serialiser};
use dbms::handle_connection;

async fn repl() {
let stdin = std::io::stdin();
let mut stdout = std::io::stdout();

let mut database: Option<Database> = None;

let persistence_manager: Box<_> = FileSystem::new(
SerialisationManager::new(Serialiser::V2),
PathBuf::from("/tmp/rusty-db"),
).into();

loop {
print!(">> ");

stdout.flush().unwrap();

let mut input = String::new();

stdin.read_line(&mut input).unwrap();

if input == "\\q\n" {
break;
} else if input.is_empty() {
println!();
break;
}

// TODO: Standardise handling these special commands
if input.starts_with("\\l ") {
let tokens = Lexer::lex(input.strip_prefix("\\l ").unwrap());

println!("Lexed: {tokens:?}");

continue;
}

let statement = parse_statement(&input);

if input.starts_with("\\p ") {
let statement = parse_statement(input.strip_prefix("\\p ").unwrap());

println!("Parsed: {statement:?}");

continue;
}

if input.starts_with("\\c ") {
let database_name = input.strip_prefix("\\c ").unwrap().strip_suffix('\n').unwrap();

database = match persistence_manager.load_database(DatabaseName(database_name.into())).await {
Ok(db) => {
println!("Connected to database {}", db.name.0);

Some(db)
},
Err(error) => {
println!("Got execution error: {error:?}");

None
},
};

continue;
}

if let Some(statement) = statement {
let is_create_database = matches!(statement, Statement::Create { what: CreateType::Database, .. });
let is_drop_database = matches!(statement, Statement::Drop { what: CreateType::Database, .. });

let result = statement.execute(database.as_mut(), persistence_manager.as_ref()).await;

match result {
Ok(result) => {
match result {
ExecutionResult::None => (),
an_actual_result => println!("Executed:\n{an_actual_result:?}"),
}
},
Err(error) => {
println!("Got execution error: {error:?}");

// Don't persist storage if statement failed
continue;
}
}

if is_create_database || is_drop_database {
continue;
}

// TODO: doing this properly, should only write changed things
// Also I can probably do better than the `is_drop_database` above
match persistence_manager.save_database(database.as_ref().unwrap()).await {
Ok(_) => (),
Err(error) => println!("Failed saving to disk: {error:?}"),
}
} else {
println!("Failed to parse: {input}");
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("localhost:42069").unwrap();
println!("Listening on localhost:42069 (of course)");

let mut join_handles = vec![];

for stream in listener.incoming() {
join_handles.retain(|handle: &JoinHandle<_>| {
!handle.is_finished()
});

match stream {
Ok(stream) => {
println!("New connection established from {}", stream.peer_addr().unwrap());
println!("Now have {} connections", join_handles.len() + 1);

join_handles.push(spawn(async move {
handle_connection(stream).await
}));
},
Err(error) => panic!("{error}"),
}
}
}

#[tokio::main]
async fn main() {
repl().await;
join_all(join_handles).await
.into_iter()
.collect::<Result<Result<Vec<_>, _>, _>>().unwrap().unwrap();

println!("Main thread exiting");
}

0 comments on commit 05472df

Please sign in to comment.