diff --git a/Cargo.lock b/Cargo.lock index 0ecbb8e4..ed610990 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -976,9 +976,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -991,9 +991,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -1001,15 +1001,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -1029,15 +1029,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -1046,15 +1046,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" @@ -1068,9 +1068,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -1172,6 +1172,7 @@ dependencies = [ "console-subscriber", "ecies", "eyre", + "futures", "futures-util", "hex", "home", @@ -1214,6 +1215,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", + "blake3", "prost 0.11.9", "tokio", "tokio-stream", diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index 798f36dd..4bea57ff 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -1,5 +1,5 @@ use crate::file::FileData; -use gevulot_node::types::transaction::ProgramMetadata; +use gevulot_node::types::transaction::{Created, ProgramMetadata}; use gevulot_node::types::Hash; use gevulot_node::{ rpc_client::RpcClient, @@ -203,7 +203,7 @@ pub async fn run_deploy_command( )) } -async fn send_transaction(client: &RpcClient, tx: &Transaction) -> Result { +async fn send_transaction(client: &RpcClient, tx: &Transaction) -> Result { client .send_transaction(tx) .await diff --git a/crates/node/.sqlx/query-151f420a22124e7c5f7d6a868f06c71e6f6805787aae1dea47cb248d07689498.json b/crates/node/.sqlx/query-151f420a22124e7c5f7d6a868f06c71e6f6805787aae1dea47cb248d07689498.json deleted file mode 100644 index 758221cf..00000000 --- a/crates/node/.sqlx/query-151f420a22124e7c5f7d6a868f06c71e6f6805787aae1dea47cb248d07689498.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO assets ( tx ) VALUES ( $1 ) RETURNING *", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx", - "type_info": "Varchar" - }, - { - "ordinal": 1, - "name": "created", - "type_info": "Timestamp" - }, - { - "ordinal": 2, - "name": "completed", - "type_info": "Timestamp" - } - ], - "parameters": { - "Left": [ - "Varchar" - ] - }, - "nullable": [ - false, - false, - true - ] - }, - "hash": "151f420a22124e7c5f7d6a868f06c71e6f6805787aae1dea47cb248d07689498" -} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 3dd0d9c3..c03a4e2e 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -17,6 +17,7 @@ clap = { version = "4", features = ["derive", "env", "string"] } console-subscriber = "0.2" ecies = {version = "0.2", default-features = false, features = ["pure"]} eyre = "0.6.8" +futures = "0.3.30" futures-util = { version = "0.3", features = [ "io" ] } hex = "0.4" home = "0.5" diff --git a/crates/node/migrations/20240215174342_add_txfile_and_checksum.sql b/crates/node/migrations/20240215174342_add_txfile_and_checksum.sql new file mode 100644 index 00000000..e84b5881 --- /dev/null +++ b/crates/node/migrations/20240215174342_add_txfile_and_checksum.sql @@ -0,0 +1,16 @@ +-- Add checksum to task files +-- Add a new table to store Tx files. + +ALTER TABLE file ADD checksum VARCHAR(64) NOT NULL; + +DROP TABLE IF EXISTS txfile; + +CREATE TABLE txfile ( + tx_id VARCHAR(64) NOT NULL, + name VARCHAR(256) NOT NULL, + url VARCHAR(2048) NOT NULL, + checksum VARCHAR(64) NOT NULL, + CONSTRAINT fk_tx + FOREIGN KEY (tx_id) + REFERENCES transaction (hash) ON DELETE CASCADE +); diff --git a/crates/node/migrations/20240221184329_remove_asset_table.sql b/crates/node/migrations/20240221184329_remove_asset_table.sql new file mode 100644 index 00000000..dcf53aba --- /dev/null +++ b/crates/node/migrations/20240221184329_remove_asset_table.sql @@ -0,0 +1,3 @@ +-- Remove asset table + +DROP TABLE IF EXISTS assets; diff --git a/crates/node/src/asset_manager/mod.rs b/crates/node/src/asset_manager/mod.rs deleted file mode 100644 index ec97ec14..00000000 --- a/crates/node/src/asset_manager/mod.rs +++ /dev/null @@ -1,195 +0,0 @@ -use crate::{ - cli::Config, - storage::Database, - types::{ - transaction::{self, Transaction}, - Hash, Program, - }, -}; -use eyre::{eyre, Result}; -use gevulot_node::types::{ - self, - transaction::{Payload, ProgramData}, -}; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::{path::PathBuf, sync::Arc, time::Duration}; -use thiserror::Error; -use tokio::time::sleep; - -#[allow(clippy::enum_variant_names)] -#[derive(Error, Debug)] -enum AssetManagerError { - #[error("program image download")] - ProgramImageDownload, - - #[error("incompatible transaction payload")] - IncompatibleTxPayload(Hash), -} - -/// AssetManager is reponsible for coordinating asset management for a new -/// transaction. New deployment of prover & verifier requires downloading of -/// VM images for those programs. Similarly, Run transaction has associated -/// input data which must be downloaded, but also built into workspace volume -/// for execution. -pub struct AssetManager { - config: Arc, - database: Arc, - http_client: reqwest::Client, - http_peer_list: Arc>>>, -} - -impl AssetManager { - pub fn new( - config: Arc, - database: Arc, - http_peer_list: Arc>>>, - ) -> Self { - AssetManager { - config, - database, - http_client: reqwest::Client::new(), - http_peer_list, - } - } - - pub async fn run(&self) -> Result<()> { - // Main processing loop. - loop { - for tx_hash in self.database.get_incomplete_assets().await? { - if let Some(tx) = self.database.find_transaction(&tx_hash).await? { - if let Err(err) = self.process_transaction(&tx).await { - tracing::error!( - "failed to process transaction (hash: {}) assets: {}", - tx.hash, - err - ); - continue; - } - - self.database.mark_asset_complete(&tx_hash).await?; - } else { - tracing::warn!("asset entry for missing transaction; hash: {}", &tx_hash); - } - } - - // TODO: Define specific period for Asset processing refresh and - // compute remaining sleep time from that. If asset processing - // takes longer than anticipated, then there's no sleep. Otherwise - // next iteration starts from same periodic cycle as normally. - sleep(Duration::from_millis(500)).await; - } - } - - /// handle_transaction admits transaction into `AssetManager` for further - /// processing. - pub async fn handle_transaction(&self, tx: &Transaction) -> Result<()> { - self.database.add_asset(&tx.hash).await - } - - async fn process_transaction(&self, tx: &Transaction) -> Result<()> { - match tx.payload { - transaction::Payload::Deploy { .. } => self.process_deployment(tx).await, - transaction::Payload::Run { .. } => self.process_run(tx).await, - // Other transaction types don't have external assets that would - // need processing. - _ => Ok(()), - } - } - - async fn process_deployment(&self, tx: &Transaction) -> Result<()> { - let (prover, verifier) = match tx.payload.clone() { - Payload::Deploy { - name: _, - prover, - verifier, - } => (Program::from(prover), Program::from(verifier)), - _ => return Err(AssetManagerError::IncompatibleTxPayload(tx.hash).into()), - }; - - self.process_program(&prover).await?; - self.process_program(&verifier).await?; - - Ok(()) - } - - async fn process_program(&self, program: &Program) -> Result<()> { - self.download_image(program).await - } - - async fn process_run(&self, tx: &Transaction) -> Result<()> { - let workflow = match tx.payload.clone() { - Payload::Run { workflow } => workflow, - _ => return Err(AssetManagerError::IncompatibleTxPayload(tx.hash).into()), - }; - - // TODO: Ideally the following would happen concurrently for each file... - for step in workflow.steps { - for input in step.inputs { - match input { - ProgramData::Input { - file_name, - file_url, - checksum, - } => { - let f = types::File { - tx: tx.hash, - name: file_name, - url: file_url, - }; - crate::networking::download_manager::download_file( - &f.url, - &self.config.data_directory, - f.get_file_relative_path() - .to_str() - .ok_or(eyre!("Download bad file path: {:?}", f.name))?, - self.get_peer_list().await, - &self.http_client, - checksum.into(), - ) - .await?; - } - ProgramData::Output { .. } => { - /* ProgramData::Output asinput means it comes from another - program execution -> skip this branch. */ - } - } - } - } - - Ok(()) - } - - /// download downloads file from the given `url` and saves it to file in `file_path`. - async fn download_image(&self, program: &Program) -> Result<()> { - let file_path = PathBuf::new() - .join("images") - .join(program.hash.to_string()) - .join(&program.image_file_name); - tracing::info!( - "asset download url:{} file_path:{file_path:?} file_checksum:{}", - program.image_file_url, - program.image_file_checksum - ); - crate::networking::download_manager::download_file( - &program.image_file_url, - &self.config.data_directory, - file_path - .to_str() - .ok_or(eyre!("Download bad file path: {:?}", file_path))?, - self.get_peer_list().await, - &self.http_client, - (&*program.image_file_checksum).into(), - ) - .await - } - - async fn get_peer_list(&self) -> Vec<(SocketAddr, Option)> { - self.http_peer_list - .read() - .await - .iter() - .map(|(a, p)| (*a, *p)) - .collect() - } -} diff --git a/crates/node/src/cli.rs b/crates/node/src/cli.rs index 94ef18dc..67c3b2d4 100644 --- a/crates/node/src/cli.rs +++ b/crates/node/src/cli.rs @@ -156,6 +156,13 @@ pub enum PeerCommand { #[derive(Debug, Args)] pub struct P2PBeaconConfig { + #[arg( + long, + long_help = "Directory where the node should store its data", + env = "GEVULOT_DATA_DIRECTORY", + default_value_os_t = PathBuf::from("/var/lib/gevulot"), + )] + pub data_directory: PathBuf, #[arg( long, long_help = "P2P listen address", @@ -178,6 +185,14 @@ pub struct P2PBeaconConfig { default_value = "Pack my box with five dozen liquor jugs." )] pub p2p_psk_passphrase: String, + + #[arg( + long, + long_help = "HTTP port for downloading transaction data between nodes. Uses same interface as P2P listen address.", + env = "GEVULOT_HTTP_PORT", + default_value = "9995" + )] + pub http_download_port: u16, } #[derive(Debug, Subcommand)] diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 04b5dad3..0ce530f4 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -1,16 +1,6 @@ #![allow(dead_code)] #![allow(unused_variables)] -use std::{ - io::{ErrorKind, Write}, - net::ToSocketAddrs, - path::PathBuf, - sync::{Arc, Mutex}, - thread::sleep, - time::Duration, -}; - -use asset_manager::AssetManager; use async_trait::async_trait; use clap::Parser; use cli::{ @@ -22,13 +12,24 @@ use libsecp256k1::{PublicKey, SecretKey}; use pea2pea::Pea2Pea; use rand::{rngs::StdRng, SeedableRng}; use sqlx::postgres::PgPoolOptions; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::{ + io::{ErrorKind, Write}, + net::ToSocketAddrs, + path::PathBuf, + sync::{Arc, Mutex}, + thread::sleep, + time::Duration, +}; +use tokio::sync::mpsc; use tokio::sync::{Mutex as TMutex, RwLock}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Server; use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan, EnvFilter}; -use types::{Hash, Transaction}; +use types::{transaction::Validated, Hash, Transaction}; use workflow::WorkflowEngine; -mod asset_manager; mod cli; mod mempool; mod nanos; @@ -36,6 +37,7 @@ mod networking; mod rpc_server; mod scheduler; mod storage; +mod txvalidation; mod vmm; mod workflow; @@ -146,15 +148,19 @@ fn generate_key(opts: KeyOptions) -> Result<()> { #[async_trait] impl mempool::Storage for storage::Database { - async fn get(&self, hash: &Hash) -> Result> { + async fn get(&self, hash: &Hash) -> Result>> { self.find_transaction(hash).await } - async fn set(&self, tx: &Transaction) -> Result<()> { + async fn set(&self, tx: &Transaction) -> Result<()> { + let tx_hash = tx.hash; self.add_transaction(tx).await } - async fn fill_deque(&self, deque: &mut std::collections::VecDeque) -> Result<()> { + async fn fill_deque( + &self, + deque: &mut std::collections::VecDeque>, + ) -> Result<()> { for t in self.get_unexecuted_transactions().await? { deque.push_back(t); } @@ -165,52 +171,32 @@ impl mempool::Storage for storage::Database { #[async_trait] impl workflow::TransactionStore for storage::Database { - async fn find_transaction(&self, tx_hash: &Hash) -> Result> { + async fn find_transaction(&self, tx_hash: &Hash) -> Result>> { self.find_transaction(tx_hash).await } -} - -struct P2PTxHandler { - mempool: Arc>, - database: Arc, -} - -impl P2PTxHandler { - pub fn new(mempool: Arc>, database: Arc) -> Self { - Self { mempool, database } + async fn mark_tx_executed(&self, tx_hash: &Hash) -> Result<()> { + self.mark_tx_executed(tx_hash).await } } -#[async_trait::async_trait] -impl networking::p2p::TxHandler for P2PTxHandler { - async fn recv_tx(&self, tx: Transaction) -> Result<()> { - // The transaction was received from P2P network so we can consider it - // propagated at this point. - let tx_hash = tx.hash; - let mut tx = tx; - tx.propagated = true; - - // Submit the tx to mempool. - self.mempool.write().await.add(tx).await?; +async fn run(config: Arc) -> Result<()> { + let database = Arc::new(Database::new(&config.db_url).await?); - //TODO copy paste of the asset manager handle_transaction method. - //added because when a tx arrive from the p2p asset are not added. - //should be done in a better way. - self.database.add_asset(&tx_hash).await - } -} + let http_peer_list: Arc>>> = + Default::default(); -#[async_trait::async_trait] -impl mempool::AclWhitelist for Database { - async fn contains(&self, key: &PublicKey) -> Result { - let key = entity::PublicKey(*key); - self.acl_whitelist_has(&key).await - } -} + let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?)); -async fn run(config: Arc) -> Result<()> { - let database = Arc::new(Database::new(&config.db_url).await?); - let file_storage = Arc::new(storage::File::new(&config.data_directory)); + // Start Tx process event loop. + let (txevent_loop_jh, tx_sender, p2p_stream) = txvalidation::spawn_event_loop( + config.data_directory.clone(), + config.p2p_listen_addr, + config.http_download_port, + http_peer_list.clone(), + database.clone(), + mempool.clone(), + ) + .await?; // Launch the ACL whitelist syncing early in the startup. let acl_whitelist_syncer = @@ -233,23 +219,13 @@ async fn run(config: Arc) -> Result<()> { &config.p2p_psk_passphrase, Some(config.http_download_port), config.p2p_advertised_listen_addr, + http_peer_list, + txvalidation::TxEventSender::::build(tx_sender.clone()), + p2p_stream, ) .await, ); - let mempool = Arc::new(RwLock::new( - Mempool::new(database.clone(), database.clone(), Some(p2p.clone())).await?, - )); - - p2p.register_tx_handler(Arc::new(P2PTxHandler::new( - mempool.clone(), - database.clone(), - ))) - .await; - - //start http download manager - let download_jh = networking::download_manager::serve_files(&config).await?; - // TODO(tuommaki): read total available resources from config / acquire system stats. let num_gpus = if config.gpu_devices.is_some() { 1 } else { 0 }; let resource_manager = Arc::new(Mutex::new(scheduler::ResourceManager::new( @@ -269,21 +245,14 @@ async fn run(config: Arc) -> Result<()> { resource_manager.clone(), ); - let asset_mgr = Arc::new(AssetManager::new( - config.clone(), - database.clone(), - p2p.as_ref().peer_http_port_list.clone(), - )); - let node_key = read_node_key(&config.node_key_file)?; - // Launch AssetManager's background processing. - tokio::spawn({ - let asset_mgr = asset_mgr.clone(); - async move { asset_mgr.run().await } - }); - - let workflow_engine = Arc::new(WorkflowEngine::new(database.clone(), file_storage.clone())); + let workflow_engine = Arc::new(WorkflowEngine::new(database.clone())); + let download_url_prefix = format!( + "http://{}:{}", + config.p2p_listen_addr.ip(), + config.http_download_port + ); let scheduler = Arc::new(scheduler::Scheduler::new( mempool.clone(), @@ -291,10 +260,13 @@ async fn run(config: Arc) -> Result<()> { program_manager, workflow_engine, node_key, + config.data_directory.clone(), + download_url_prefix, + txvalidation::TxEventSender::::build(tx_sender.clone()), )); let vm_server = - vmm::vm_server::VMServer::new(scheduler.clone(), provider, file_storage.clone()); + vmm::vm_server::VMServer::new(scheduler.clone(), provider, config.data_directory.clone()); // Start gRPC VSOCK server. tokio::spawn(async move { @@ -332,14 +304,12 @@ async fn run(config: Arc) -> Result<()> { let rpc_server = rpc_server::RpcServer::run( config.clone(), database.clone(), - mempool.clone(), - asset_mgr.clone(), - database.clone(), // AclWhitelist impl. + txvalidation::TxEventSender::::build(tx_sender), ) .await?; - if let Err(err) = download_jh.await { - tracing::info!("download_manager error:{err}"); + if let Err(err) = txevent_loop_jh.await { + tracing::info!("Tx event loop error:{err}"); } Ok(()) } @@ -349,6 +319,18 @@ async fn run(config: Arc) -> Result<()> { /// others, while it doesn't participate in the Gevulot's operational side /// in any other way - i.e. this won't handle transactions in any way. async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> { + let http_peer_list: Arc>>> = + Default::default(); + + // Build an empty channel for P2P interface's `Transaction` management. + // Indicate some domain conflict issue. + // P2P network should be started (peer domain) without Tx management (Node domain). + let (tx, mut rcv_tx_event_rx) = mpsc::unbounded_channel(); + tokio::spawn(async move { while rcv_tx_event_rx.recv().await.is_some() {} }); + + let (_, p2p_recv) = mpsc::unbounded_channel::>(); + let p2p_stream = UnboundedReceiverStream::new(p2p_recv); + let p2p = Arc::new( networking::P2P::new( "gevulot-network", @@ -356,6 +338,9 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> { &config.p2p_psk_passphrase, None, config.p2p_advertised_listen_addr, + http_peer_list, + txvalidation::TxEventSender::::build(tx), + p2p_stream, ) .await, ); diff --git a/crates/node/src/mempool/mod.rs b/crates/node/src/mempool/mod.rs index 75b52af1..42be3745 100644 --- a/crates/node/src/mempool/mod.rs +++ b/crates/node/src/mempool/mod.rs @@ -1,26 +1,15 @@ +use crate::types::{transaction::Validated, Hash, Transaction}; use async_trait::async_trait; use eyre::Result; -use libsecp256k1::PublicKey; use std::collections::VecDeque; use std::sync::Arc; use thiserror::Error; -use tokio::sync::RwLock; - -use crate::{ - networking, - types::{Hash, Transaction}, -}; #[async_trait] pub trait Storage: Send + Sync { - async fn get(&self, hash: &Hash) -> Result>; - async fn set(&self, tx: &Transaction) -> Result<()>; - async fn fill_deque(&self, deque: &mut VecDeque) -> Result<()>; -} - -#[async_trait] -pub trait AclWhitelist: Send + Sync { - async fn contains(&self, key: &PublicKey) -> Result; + async fn get(&self, hash: &Hash) -> Result>>; + async fn set(&self, tx: &Transaction) -> Result<()>; + async fn fill_deque(&self, deque: &mut VecDeque>) -> Result<()>; } #[allow(clippy::enum_variant_names)] @@ -33,62 +22,28 @@ pub enum MempoolError { #[derive(Clone)] pub struct Mempool { storage: Arc, - acl_whitelist: Arc, - // TODO: This should be refactored to PubSub channel abstraction later on. - tx_chan: Option>, - deque: VecDeque, + deque: VecDeque>, } impl Mempool { - pub async fn new( - storage: Arc, - acl_whitelist: Arc, - tx_chan: Option>, - ) -> Result { + pub async fn new(storage: Arc) -> Result { let mut deque = VecDeque::new(); storage.fill_deque(&mut deque).await?; - Ok(Self { - storage, - acl_whitelist, - tx_chan, - deque, - }) + Ok(Self { storage, deque }) } - pub fn next(&mut self) -> Option { + pub fn next(&mut self) -> Option> { // TODO(tuommaki): Should storage reflect the POP in state? self.deque.pop_front() } - pub fn peek(&self) -> Option<&Transaction> { + pub fn peek(&self) -> Option<&Transaction> { self.deque.front() } - pub async fn add(&mut self, tx: Transaction) -> Result<()> { - // First validate transaction. - tx.validate()?; - - // Secondly verify that author is whitelisted. - if !self.acl_whitelist.contains(&tx.author).await? { - return Err(MempoolError::PermissionDenied.into()); - } - - let mut tx = tx; + pub async fn add(&mut self, tx: Transaction) -> Result<()> { self.storage.set(&tx).await?; - - // Broadcast new transaction to P2P network if it's configured. - if !tx.propagated { - if let Some(ref tx_chan) = self.tx_chan { - if tx_chan.send_tx(&tx).await.is_ok() { - tx.propagated = true; - self.storage.set(&tx).await?; - } else { - // TODO: Implement retry? - } - } - } - self.deque.push_back(tx); Ok(()) } @@ -97,12 +52,3 @@ impl Mempool { self.deque.len() } } - -pub struct P2PTxHandler(Arc>); - -#[async_trait::async_trait] -impl networking::p2p::TxHandler for P2PTxHandler { - async fn recv_tx(&self, tx: Transaction) -> Result<()> { - self.0.write().await.add(tx).await - } -} diff --git a/crates/node/src/networking/mod.rs b/crates/node/src/networking/mod.rs index 086ad2fc..44a34528 100644 --- a/crates/node/src/networking/mod.rs +++ b/crates/node/src/networking/mod.rs @@ -1,4 +1,3 @@ -pub mod download_manager; pub mod p2p; use eyre::{eyre, Result}; diff --git a/crates/node/src/networking/p2p/mod.rs b/crates/node/src/networking/p2p/mod.rs index ed8c2dcf..f0fe97f1 100644 --- a/crates/node/src/networking/p2p/mod.rs +++ b/crates/node/src/networking/p2p/mod.rs @@ -1,4 +1,4 @@ mod noise; mod pea2pea; mod protocol; -pub use pea2pea::{TxChannel, TxHandler, P2P}; +pub use pea2pea::P2P; diff --git a/crates/node/src/networking/p2p/pea2pea.rs b/crates/node/src/networking/p2p/pea2pea.rs index d0abf437..f34d557c 100644 --- a/crates/node/src/networking/p2p/pea2pea.rs +++ b/crates/node/src/networking/p2p/pea2pea.rs @@ -1,3 +1,6 @@ +use crate::txvalidation::P2pSender; +use crate::txvalidation::TxEventSender; +use futures_util::Stream; use std::{ collections::{BTreeSet, HashMap}, io, @@ -7,11 +10,15 @@ use std::{ }; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; +use tokio::pin; +use tokio_stream::StreamExt; use super::{noise, protocol}; use bytes::{Bytes, BytesMut}; -use eyre::Result; -use gevulot_node::types::Transaction; +use gevulot_node::types::{ + transaction::{Created, Validated}, + Transaction, +}; use parking_lot::RwLock; use pea2pea::{ protocols::{Handshake, OnDisconnect, Reading, Writing}, @@ -19,42 +26,25 @@ use pea2pea::{ }; use sha3::{Digest, Sha3_256}; -#[async_trait::async_trait] -pub trait TxHandler: Send + Sync { - async fn recv_tx(&self, tx: Transaction) -> Result<()>; -} - -#[async_trait::async_trait] -pub trait TxChannel: Send + Sync { - async fn send_tx(&self, tx: &Transaction) -> Result<()>; -} - -struct BlackholeTxHandler; -#[async_trait::async_trait] -impl TxHandler for BlackholeTxHandler { - async fn recv_tx(&self, tx: Transaction) -> Result<()> { - tracing::debug!("submitting received tx to black hole"); - Ok(()) - } -} - // NOTE: This P2P implementation is originally from `pea2pea` Noise handshake example. #[derive(Clone)] pub struct P2P { node: Node, noise_states: Arc>>, - tx_handler: Arc>>, // Peer connection map: <(P2P TCP connection's peer address) , (peer's advertised address in peer_list)>. // This mapping is needed for proper cleanup on OnDisconnect. peer_addr_mapping: Arc>>, peer_list: Arc>>, - + // Contains corrected peers that are used for asset file download. pub peer_http_port_list: Arc>>>, http_port: Option, nat_listen_addr: Option, psk: Vec, + + // Send Tx to the process loop. + tx_sender: TxEventSender, } impl Pea2Pea for P2P { @@ -64,12 +54,16 @@ impl Pea2Pea for P2P { } impl P2P { + #[allow(clippy::too_many_arguments)] pub async fn new( name: &str, listen_addr: SocketAddr, psk_passphrase: &str, http_port: Option, nat_listen_addr: Option, + peer_http_port_list: Arc>>>, + tx_sender: TxEventSender, + propagate_tx_stream: impl Stream> + std::marker::Send + 'static, ) -> Self { let config = Config { name: Some(name.into()), @@ -88,13 +82,13 @@ impl P2P { let instance = Self { node, noise_states: Default::default(), - tx_handler: Arc::new(tokio::sync::RwLock::new(Arc::new(BlackholeTxHandler {}))), psk: psk.to_vec(), peer_list: Default::default(), peer_addr_mapping: Default::default(), - peer_http_port_list: Default::default(), + peer_http_port_list, http_port, nat_listen_addr, + tx_sender, }; // Enable node functionalities. @@ -103,22 +97,39 @@ impl P2P { instance.enable_writing().await; instance.enable_disconnect().await; - instance - } + // Start a new Tx stream loop. + tokio::spawn({ + let p2p = instance.clone(); + async move { + pin!(propagate_tx_stream); + while let Some(tx) = propagate_tx_stream.next().await { + let tx_hash = tx.hash; + let msg = protocol::Message::V0(protocol::MessageV0::Transaction(tx)); + let bs = match bincode::serialize(&msg) { + Ok(bs) => bs, + Err(err) => { + tracing::error!( + "Tx:{tx_hash} not send because serialization fail:{err}", + ); + continue; + } + }; + let bs = Bytes::from(bs); + tracing::debug!("broadcasting transaction {}", tx_hash); + if let Err(err) = p2p.broadcast(bs) { + tracing::error!("Tx:{tx_hash} not send because :{err}"); + } + } + } + }); - pub async fn register_tx_handler(&self, tx_handler: Arc) { - let mut old_handler = self.tx_handler.write().await; - *old_handler = tx_handler; - tracing::debug!("new tx handler registered"); + instance } - async fn recv_tx(&self, tx: Transaction) { + async fn forward_tx(&self, tx: Transaction) { tracing::debug!("submitting received tx to tx_handler"); - let tx_handler = self.tx_handler.read().await; - if let Err(err) = tx_handler.recv_tx(tx).await { - tracing::error!("failed to handle incoming transaction: {}", err); - } else { - tracing::debug!("submitted received tx to tx_handler"); + if let Err(err) = self.tx_sender.send_tx(tx) { + tracing::error!("P2P error during received Tx sending:{err}"); } } @@ -274,8 +285,6 @@ impl Handshake for P2P { handshake_msg.peers.insert(handshake_msg.my_p2p_listen_addr); } - tracing::debug!("node information exchanged."); - tracing::debug!("tcp connection peer address: {}", remote_peer); tracing::debug!( "peer advertised address: {}", @@ -325,7 +334,6 @@ impl Handshake for P2P { local_diff.remove(&local_p2p_addr); local_diff.remove(remote_peer_p2p_addr); - tracing::debug!("found {} new nodes", local_diff.len()); let node = self.node(); for addr in local_diff { tracing::debug!("connect to {}", &addr); @@ -362,11 +370,22 @@ impl Reading for P2P { Ok(protocol::Message::V0(msg)) => match msg { protocol::MessageV0::Transaction(tx) => { tracing::debug!( - "received transaction {} author:{}", + "received transaction {}:{} author:{}", tx.hash, + tx.payload, hex::encode(tx.author.serialize()) ); - self.recv_tx(tx).await; + let tx: Transaction = Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.propagated, + executed: tx.executed, + state: Created, + }; + self.forward_tx(tx).await; } protocol::MessageV0::DiagnosticsRequest(kind) => { tracing::debug!("received diagnostics request"); @@ -382,17 +401,6 @@ impl Reading for P2P { } } -#[async_trait::async_trait] -impl TxChannel for P2P { - async fn send_tx(&self, tx: &Transaction) -> Result<()> { - let msg = protocol::Message::V0(protocol::MessageV0::Transaction(tx.clone())); - let bs = Bytes::from(bincode::serialize(&msg)?); - - tracing::debug!("broadcasting transaction {}", tx.hash); - self.broadcast(bs)?; - Ok(()) - } -} impl Writing for P2P { type Message = Bytes; type Codec = noise::Codec; @@ -419,28 +427,64 @@ impl OnDisconnect for P2P { #[cfg(test)] mod tests { use super::*; + use crate::txvalidation; + use crate::txvalidation::CallbackSender; + use crate::txvalidation::EventProcessError; use eyre::Result; - use gevulot_node::types::{transaction::Payload, Hash, Transaction}; + use gevulot_node::types::transaction::Received; + use gevulot_node::types::{transaction::Payload, Transaction}; use libsecp256k1::SecretKey; - use rand::{rngs::StdRng, RngCore, SeedableRng}; - use tokio::sync::mpsc::{self, Sender}; + use rand::{rngs::StdRng, SeedableRng}; + use tokio::sync::mpsc::UnboundedReceiver; + use tokio::sync::mpsc::UnboundedSender; + use tokio::sync::mpsc::{self}; + use tokio::sync::oneshot; + use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; - struct Sink(Arc>); - impl Sink { - fn new(tx: Arc>) -> Self { - Self(tx) - } + async fn create_peer( + name: &str, + ) -> ( + P2P, + UnboundedSender>, + UnboundedReceiver<( + Transaction, + Option>>, + )>, + ) { + let http_peer_list1: Arc>>> = + Default::default(); + let (tx_sender, p2p_recv1) = mpsc::unbounded_channel::>(); + let p2p_stream1 = UnboundedReceiverStream::new(p2p_recv1); + let (sendtx1, txreceiver1) = + mpsc::unbounded_channel::<(Transaction, Option)>(); + let txsender1 = txvalidation::TxEventSender::::build(sendtx1); + let peer = P2P::new( + name, + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + None, + None, + http_peer_list1, + txsender1, + p2p_stream1, + ) + .await; + (peer, tx_sender, txreceiver1) } - #[async_trait::async_trait] - impl TxHandler for Sink { - async fn recv_tx(&self, tx: Transaction) -> Result<()> { - tracing::debug!("sink received new transaction"); - self.0.send(tx).await.expect("sink send"); - tracing::debug!("sink submitted tx to channel"); - Ok(()) + // TODO: Change to `impl From` form when module declaration between main and lib is solved. + fn into_receive(tx: Transaction) -> Transaction { + Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Received::P2P, } } @@ -448,51 +492,15 @@ mod tests { async fn test_peer_list_inter_connection() { //start_logger(LevelFilter::ERROR); - let (tx1, mut rx1) = mpsc::channel(1); - let (tx2, mut rx2) = mpsc::channel(1); - let (tx3, mut rx3) = mpsc::channel(1); - let (sink1, sink2, sink3) = ( - Arc::new(Sink::new(Arc::new(tx1))), - Arc::new(Sink::new(Arc::new(tx2))), - Arc::new(Sink::new(Arc::new(tx3))), - ); - let (peer1, peer2, peer3) = ( - P2P::new( - "peer1", - "127.0.0.1:0".parse().unwrap(), - "secret passphrase", - None, - None, - ) - .await, - P2P::new( - "peer2", - "127.0.0.1:0".parse().unwrap(), - "secret passphrase", - Some(9995), - None, - ) - .await, - P2P::new( - "peer3", - "127.0.0.1:0".parse().unwrap(), - "secret passphrase", - Some(9995), - None, - ) - .await, - ); + let (peer1, tx_sender1, mut tx_receiver1) = create_peer("peer1").await; + let (peer2, tx_sender2, mut tx_receiver2) = create_peer("peer2").await; + let (peer3, tx_sender3, mut tx_receiver3) = create_peer("peer3").await; tracing::debug!("start listening"); let bind_add = peer1.node().start_listening().await.expect("peer1 listen"); let bind_add = peer2.node().start_listening().await.expect("peer2 listen"); let bind_add = peer3.node().start_listening().await.expect("peer3 listen"); - tracing::debug!("register tx handlers"); - peer1.register_tx_handler(sink1.clone()).await; - peer2.register_tx_handler(sink2.clone()).await; - peer3.register_tx_handler(sink3.clone()).await; - tracing::debug!("connect peer2 to peer1"); peer2 .node() @@ -518,60 +526,36 @@ mod tests { tracing::debug!("send tx from peer2 to peer1 and peer3"); let tx = new_tx(); - peer2.send_tx(&tx).await.unwrap(); + tx_sender2.send(tx.clone()).unwrap(); tracing::debug!("recv tx on peer1 from peer2"); - let recv_tx = rx1.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver1.recv().await.expect("peer1 recv"); + + assert_eq!(into_receive(tx.clone()), recv_tx.0); tracing::debug!("recv tx on peer3 from peer2"); - let recv_tx = rx3.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver3.recv().await.expect("peer3 recv"); + assert_eq!(into_receive(tx), recv_tx.0); let tx = new_tx(); tracing::debug!("send tx from peer3 to peer1 and peer2"); - peer3.send_tx(&tx).await.unwrap(); + tx_sender3.send(tx.clone()).unwrap(); tracing::debug!("recv tx on peer1 from peer3"); - let recv_tx = rx1.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver1.recv().await.expect("peer1 recv"); + assert_eq!(into_receive(tx.clone()), recv_tx.0); tracing::debug!("recv tx on peer2 from peer3"); - let recv_tx = rx2.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver2.recv().await.expect("peer2 recv"); + assert_eq!(into_receive(tx), recv_tx.0); } #[tokio::test] async fn test_two_peers_disconnect() { //start_logger(LevelFilter::ERROR); - let (tx1, mut rx1) = mpsc::channel(1); - let (tx2, mut rx2) = mpsc::channel(1); - let (sink1, sink2) = ( - Arc::new(Sink::new(Arc::new(tx1))), - Arc::new(Sink::new(Arc::new(tx2))), - ); - - let peer1 = P2P::new( - "peer1", - "127.0.0.1:0".parse().unwrap(), - "secret passphrase", - None, - None, - ) - .await; + let (peer1, tx_sender1, mut tx_receiver1) = create_peer("peer1").await; peer1.node().start_listening().await.expect("peer1 listen"); - peer1.register_tx_handler(sink1.clone()).await; { - let peer2 = P2P::new( - "peer2", - "127.0.0.1:0".parse().unwrap(), - "secret passphrase", - Some(8776), - None, - ) - .await; + let (peer2, tx_sender2, mut tx_receiver2) = create_peer("peer2").await; peer2.node().start_listening().await.expect("peer2 listen"); - peer2.register_tx_handler(sink2.clone()).await; - - tracing::debug!("Nodes init Done"); peer1 .node() @@ -584,17 +568,17 @@ mod tests { tracing::debug!("Nodes Connected"); tracing::debug!("send tx from peer1 to peer2"); let tx = new_tx(); - peer1.send_tx(&tx).await.unwrap(); + tx_sender1.send(tx.clone()).unwrap(); tracing::debug!("recv tx on peer2 from peer1"); - let recv_tx = rx2.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver2.recv().await.expect("peer2 recv"); + assert_eq!(into_receive(tx), recv_tx.0); let tx = new_tx(); tracing::debug!("send tx from peer2 to peer1"); - peer2.send_tx(&tx).await.unwrap(); + tx_sender2.send(tx.clone()).unwrap(); tracing::debug!("recv tx on peer1 from peer2"); - let recv_tx = rx1.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver1.recv().await.expect("peer1 recv"); + assert_eq!(into_receive(tx), recv_tx.0); let peers = peer2.node().connected_addrs(); for addr in peers { @@ -607,8 +591,7 @@ mod tests { // Simulate the silent node disconnection by dropping the node. tracing::debug!("send tx from peer1 to disconnected peer2"); let tx = new_tx(); - peer1.send_tx(&tx).await.unwrap(); - + tx_sender1.send(tx).unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; assert_eq!(peer1.peer_list.read().await.len(), 1); @@ -621,39 +604,13 @@ mod tests { async fn test_two_peers() { //start_logger(LevelFilter::ERROR); - let (tx1, mut rx1) = mpsc::channel(1); - let (tx2, mut rx2) = mpsc::channel(1); - let (sink1, sink2) = ( - Arc::new(Sink::new(Arc::new(tx1))), - Arc::new(Sink::new(Arc::new(tx2))), - ); - let (peer1, peer2) = ( - P2P::new( - "peer1", - "127.0.0.1:0".parse().unwrap(), - "secret passphrase", - None, - None, - ) - .await, - P2P::new( - "peer2", - "127.0.0.1:0".parse().unwrap(), - "secret passphrase", - None, - None, - ) - .await, - ); + let (peer1, tx_sender1, mut tx_receiver1) = create_peer("peer1").await; + let (peer2, tx_sender2, mut tx_receiver2) = create_peer("peer2").await; tracing::debug!("start listening"); peer1.node().start_listening().await.expect("peer1 listen"); peer2.node().start_listening().await.expect("peer2 listen"); - tracing::debug!("register tx handlers"); - peer1.register_tx_handler(sink1.clone()).await; - peer2.register_tx_handler(sink2.clone()).await; - tracing::debug!("connect peer2 to peer1"); peer2 .node() @@ -663,31 +620,34 @@ mod tests { tracing::debug!("send tx from peer1 to peer2"); let tx = new_tx(); - peer1.send_tx(&tx).await.unwrap(); + tx_sender1.send(tx.clone()).unwrap(); tracing::debug!("recv tx on peer2 from peer1"); - let recv_tx = rx2.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver2.recv().await.expect("peer2 recv"); + assert_eq!(into_receive(tx), recv_tx.0); let tx = new_tx(); tracing::debug!("send tx from peer2 to peer1"); - peer2.send_tx(&tx).await.unwrap(); + tx_sender2.send(tx.clone()).unwrap(); tracing::debug!("recv tx on peer1 from peer2"); - let recv_tx = rx1.recv().await.expect("sink recv"); - assert_eq!(tx, recv_tx); + let recv_tx = tx_receiver1.recv().await.expect("peer1 recv"); + assert_eq!(into_receive(tx), recv_tx.0); } - fn new_tx() -> Transaction { + fn new_tx() -> Transaction { let rng = &mut StdRng::from_entropy(); - let mut tx = Transaction { - hash: Hash::random(rng), - payload: Payload::Empty, - nonce: rng.next_u64(), - ..Default::default() - }; - let key = SecretKey::random(rng); - tx.sign(&key); - tx + let tx = Transaction::::new(Payload::Empty, &SecretKey::random(rng)); + + Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Validated, + } } fn start_logger(default_level: LevelFilter) { diff --git a/crates/node/src/networking/p2p/protocol.rs b/crates/node/src/networking/p2p/protocol.rs index 1675501b..3afc1df2 100644 --- a/crates/node/src/networking/p2p/protocol.rs +++ b/crates/node/src/networking/p2p/protocol.rs @@ -23,7 +23,7 @@ pub(crate) enum Message { #[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Deserialize, Serialize)] pub(crate) enum MessageV0 { - Transaction(types::Transaction), + Transaction(types::Transaction), DiagnosticsRequest(DiagnosticsRequestKind), DiagnosticsResponse(DiagnosticsResponseV0), } diff --git a/crates/node/src/rpc_client/mod.rs b/crates/node/src/rpc_client/mod.rs index eaffb24b..25c257c9 100644 --- a/crates/node/src/rpc_client/mod.rs +++ b/crates/node/src/rpc_client/mod.rs @@ -5,7 +5,11 @@ use jsonrpsee::{ http_client::{HttpClient, HttpClientBuilder}, }; -use crate::types::{rpc::RpcResponse, transaction::TransactionTree, Hash, Transaction}; +use crate::types::{ + rpc::RpcResponse, + transaction::{Created, TransactionTree, Validated}, + Hash, Transaction, +}; pub struct RpcClient { client: HttpClient, @@ -22,13 +26,13 @@ impl RpcClient { pub async fn get_transaction( &self, tx_hash: &Hash, - ) -> Result, Box> { + ) -> Result>, Box> { let mut params = ArrayParams::new(); params.insert(tx_hash).expect("rpc params"); let resp = self .client - .request::, ArrayParams>("getTransaction", params) + .request::>, ArrayParams>("getTransaction", params) .await .expect("rpc request"); @@ -38,7 +42,7 @@ impl RpcClient { } } - pub async fn send_transaction(&self, tx: &Transaction) -> Result<(), Box> { + pub async fn send_transaction(&self, tx: &Transaction) -> Result<(), Box> { let mut params = ArrayParams::new(); params.insert(tx).expect("rpc params"); diff --git a/crates/node/src/rpc_server/mod.rs b/crates/node/src/rpc_server/mod.rs index 439c80fb..982ea94f 100644 --- a/crates/node/src/rpc_server/mod.rs +++ b/crates/node/src/rpc_server/mod.rs @@ -1,5 +1,16 @@ -use std::{net::SocketAddr, rc::Rc, sync::Arc}; +use crate::txvalidation::RpcSender; +use crate::txvalidation::TxEventSender; +use std::rc::Rc; +use std::{net::SocketAddr, sync::Arc}; +use crate::{ + cli::Config, + storage::Database, + types::{ + transaction::{Created, Validated}, + Transaction, + }, +}; use eyre::Result; use gevulot_node::types::{ rpc::{RpcError, RpcResponse}, @@ -9,21 +20,10 @@ use jsonrpsee::{ server::{RpcModule, Server, ServerHandle}, types::Params, }; -use tokio::sync::RwLock; - -use crate::{ - asset_manager::AssetManager, - cli::Config, - mempool::{AclWhitelist, Mempool}, - storage::Database, - types::Transaction, -}; struct Context { database: Arc, - mempool: Arc>, - asset_manager: Arc, - acl_whitelist: Arc, + tx_sender: TxEventSender, } impl std::fmt::Debug for Context { @@ -41,16 +41,12 @@ impl RpcServer { pub async fn run( cfg: Arc, database: Arc, - mempool: Arc>, - asset_manager: Arc, - acl_whitelist: Arc, + tx_sender: TxEventSender, ) -> Result { let server = Server::builder().build(cfg.json_rpc_listen_addr).await?; let mut module = RpcModule::new(Context { database, - mempool, - asset_manager, - acl_whitelist, + tx_sender, }); module.register_async_method("sendTransaction", send_transaction)?; @@ -80,7 +76,7 @@ async fn send_transaction(params: Params<'static>, ctx: Arc) -> RpcResp tracing::info!("JSON-RPC: send_transaction()"); // Real logic - let tx: Transaction = match params.one() { + let tx: Transaction = match params.one() { Ok(tx) => tx, Err(e) => { tracing::error!("failed to parse transaction: {}", e); @@ -88,46 +84,7 @@ async fn send_transaction(params: Params<'static>, ctx: Arc) -> RpcResp } }; - // Secondly verify that author is whitelisted. - let whitelisted = match ctx.acl_whitelist.contains(&tx.author).await { - Ok(result) => result, - Err(err) => { - tracing::error!("error when authorizing transaction: {}", err); - - // Technically following is not 100% correct, but when there is - // problem with authorizing a transaction, it cannot be passed - // through. The real nature of the problem must also not be exposed - // as it can reveal unexpected security issues. - return RpcResponse::Err(RpcError::Unauthorized); - } - }; - - if !whitelisted { - return RpcResponse::Err(RpcError::Unauthorized); - } - - // Persist transaction in DB first. - if let Err(err) = ctx.database.add_transaction(&tx).await { - return RpcResponse::Err(RpcError::InvalidRequest( - "failed to persist transaction".to_string(), - )); - } - - // Then add it to asset manager in order to download all necessary files - // involved. - if let Err(err) = ctx.asset_manager.handle_transaction(&tx).await { - tracing::error!( - "failed to enqueue transaction for asset processing: {}", - err - ); - return RpcResponse::Err(RpcError::InvalidRequest( - "failed to enqueue transaction for asset processing".to_string(), - )); - } - - // Finally add it to mempool to propagate it to P2P network and wait - // to be scheduled for execution. - if let Err(err) = ctx.mempool.write().await.add(tx.clone()).await { + if let Err(err) = ctx.tx_sender.send_tx(tx).await { tracing::error!("failed to persist transaction: {}", err); return RpcResponse::Err(RpcError::InvalidRequest( "failed to persist transaction".to_string(), @@ -138,7 +95,10 @@ async fn send_transaction(params: Params<'static>, ctx: Arc) -> RpcResp } #[tracing::instrument(level = "info")] -async fn get_transaction(params: Params<'static>, ctx: Arc) -> RpcResponse { +async fn get_transaction( + params: Params<'static>, + ctx: Arc, +) -> RpcResponse> { let tx_hash: Hash = match params.one() { Ok(tx_hash) => tx_hash, Err(e) => { @@ -233,33 +193,28 @@ fn build_tx_tree(hash: &Hash, txs: Vec<(Hash, Option)>) -> Rc Result { - Ok(true) - } - } - #[ignore] #[tokio::test] async fn test_send_transaction() { start_logger(LevelFilter::INFO); - let rpc_server = new_rpc_server().await; + let (rpc_server, mut tx_receiver) = new_rpc_server().await; let url = format!("http://{}", rpc_server.addr()); let rpc_client = HttpClientBuilder::default() @@ -278,6 +233,21 @@ mod tests { .await .expect("rpc request"); + let recv_tx = tx_receiver.recv().await.expect("recv tx"); + + let tx = Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Received::RPC, + }; + + assert_eq!(tx, recv_tx.0); + dbg!(resp); } @@ -448,7 +418,13 @@ mod tests { .init(); } - async fn new_rpc_server() -> RpcServer { + async fn new_rpc_server() -> ( + RpcServer, + UnboundedReceiver<( + Transaction, + Option>>, + )>, + ) { let cfg = Arc::new(Config { acl_whitelist_url: "http://127.0.0.1:0/does.not.exist".to_string(), data_directory: temp_dir(), @@ -468,27 +444,17 @@ mod tests { http_download_port: 0, }); - let acl_whitelist = Arc::new(AlwaysGrantAclWhitelist {}); let db = Arc::new(Database::new(&cfg.db_url).await.unwrap()); - let mempool = Arc::new(RwLock::new( - Mempool::new(db.clone(), acl_whitelist.clone(), None) - .await - .unwrap(), - )); - let asset_manager = Arc::new(AssetManager::new( - cfg.clone(), - db.clone(), - Arc::new(RwLock::new(std::collections::HashMap::new())), - )); - RpcServer::run( - cfg.clone(), - db.clone(), - mempool, - asset_manager, - acl_whitelist, + let (sendtx, txreceiver) = + mpsc::unbounded_channel::<(Transaction, Option)>(); + let txsender = txvalidation::TxEventSender::::build(sendtx); + + ( + RpcServer::run(cfg, db, txsender) + .await + .expect("rpc_server.run"), + txreceiver, ) - .await - .expect("rpc_server.run") } } diff --git a/crates/node/src/scheduler/mod.rs b/crates/node/src/scheduler/mod.rs index 7304ed65..7df78d88 100644 --- a/crates/node/src/scheduler/mod.rs +++ b/crates/node/src/scheduler/mod.rs @@ -2,36 +2,37 @@ mod program_manager; mod resource_manager; use crate::storage::Database; +use crate::txvalidation::TxEventSender; +use crate::txvalidation::TxResultSender; +use crate::types::file::{move_vmfile, File, ProofVerif, Vm}; use crate::types::TaskState; use crate::vmm::vm_server::grpc; use crate::vmm::{vm_server::TaskManager, VMId}; use crate::workflow::{WorkflowEngine, WorkflowError}; +use crate::{ + mempool::Mempool, + types::{Hash, Task}, +}; use async_trait::async_trait; +use eyre::Result; use gevulot_node::types::transaction::Payload; use gevulot_node::types::{TaskKind, Transaction}; use libsecp256k1::SecretKey; pub use program_manager::ProgramManager; use rand::RngCore; pub use resource_manager::ResourceManager; - +use std::path::PathBuf; use std::time::Instant; use std::{ collections::{HashMap, VecDeque}, sync::Arc, time::Duration, }; - -use eyre::Result; use tokio::{ sync::{Mutex, RwLock}, time::sleep, }; -use crate::{ - mempool::Mempool, - types::{Hash, Task}, -}; - use self::program_manager::{ProgramError, ProgramHandle}; use self::resource_manager::ResourceError; @@ -63,15 +64,22 @@ pub struct Scheduler { running_vms: Arc>>, #[allow(clippy::type_complexity)] task_queue: Arc>>>, + data_directory: PathBuf, + http_download_host: String, + tx_sender: TxEventSender, } impl Scheduler { + #[allow(clippy::too_many_arguments)] pub fn new( mempool: Arc>, database: Arc, program_manager: ProgramManager, workflow_engine: Arc, node_key: SecretKey, + data_directory: PathBuf, + http_download_host: String, + tx_sender: TxEventSender, ) -> Self { Self { mempool, @@ -84,6 +92,9 @@ impl Scheduler { running_tasks: Arc::new(Mutex::new(vec![])), running_vms: Arc::new(Mutex::new(vec![])), task_queue: Arc::new(Mutex::new(HashMap::new())), + data_directory, + http_download_host, + tx_sender, } } @@ -180,49 +191,33 @@ impl Scheduler { let mut mempool = self.mempool.write().await; // Check if next tx is ready for processing? - let tx = match mempool.peek() { - Some(tx) => { - if let Payload::Run { .. } = tx.payload { - if self.database.has_assets_loaded(&tx.hash).await.unwrap() { - mempool.next().unwrap() - } else { - // Assets are still downloading. - // TODO: This can stall the whole processing pipeline!! - // XXX: ....^.........^........^......^.......^........ - tracing::info!("assets for tx {} still loading", tx.hash); - return None; - } - } else { - tracing::debug!("scheduling new task from tx {}", tx.hash); - mempool.next().unwrap() - } - } - None => return None, - }; - match self.workflow_engine.next_task(&tx).await { - Ok(res) => res, - Err(e) if e.is::() => { - let err = e.downcast_ref::(); - match err { - Some(WorkflowError::IncompatibleTransaction(_)) => { - tracing::debug!("{}", e); - None - } - _ => { - tracing::error!( - "workflow error, failed to compute next task for tx:{}: {}", - tx.hash, - e - ); - None + match mempool.next() { + Some(tx) => match self.workflow_engine.next_task(&tx).await { + Ok(res) => res, + Err(e) if e.is::() => { + let err = e.downcast_ref::(); + match err { + Some(WorkflowError::IncompatibleTransaction(_)) => { + tracing::debug!("{}", e); + None + } + _ => { + tracing::error!( + "workflow error, failed to compute next task for tx:{}: {}", + tx.hash, + e + ); + None + } } } - } - Err(e) => { - tracing::error!("failed to compute next task for tx:{}: {}", tx.hash, e); - None - } + Err(e) => { + tracing::error!("failed to compute next task for tx:{}: {}", tx.hash, e); + None + } + }, + None => None, } } @@ -358,8 +353,40 @@ impl TaskManager for Scheduler { vm_id ); - if let Some(task_queue) = self.task_queue.lock().await.get(&program) { - if let Some((task, scheduled)) = task_queue.front() { + // Ensure that the VM requesting for a task is not already executing one! + let mut running_tasks = self.running_tasks.lock().await; + if let Some(idx) = running_tasks.iter().position(|e| e.vm_id.eq(vm_id.clone())) { + // A VM that is already running a task, requests for a new one. + // Mark the existing task as executed and stop the VM. + let running_task = running_tasks.swap_remove(idx); + tracing::info!( + "task {} has been running {}sec but found to be orphaned. marking it as executed.", + running_task.task.id, + running_task.task_started.elapsed().as_secs() + ); + + tracing::debug!("terminating VM {} running program {}", vm_id, program); + + let mut running_vms = self.running_vms.lock().await; + let idx = running_vms.iter().position(|e| e.vm_id().eq(vm_id.clone())); + if idx.is_some() { + let program_handle = running_vms.remove(idx.unwrap()); + if let Err(err) = self + .program_manager + .lock() + .await + .stop_program(program_handle) + .await + { + tracing::error!("failed to stop program {}: {}", program, err); + } + } + + return None; + } + + if let Some(task_queue) = self.task_queue.lock().await.get_mut(&program) { + if let Some((task, scheduled)) = task_queue.pop_front() { tracing::debug!( "task {} found for program {} running in vm_id {}", task.id, @@ -372,10 +399,10 @@ impl TaskManager for Scheduler { scheduled.elapsed().as_millis() ); - self.running_tasks.lock().await.push(RunningTask { + running_tasks.push(RunningTask { task: task.clone(), vm_id: vm_id.clone(), - task_scheduled: *scheduled, + task_scheduled: scheduled, task_started: Instant::now(), }); @@ -397,7 +424,7 @@ impl TaskManager for Scheduler { vm_id: Arc, result: grpc::task_result_request::Result, ) -> bool { - dbg!(&result); + tracing::debug!("submit_result result:{result:#?}"); let grpc::task_result_request::Result::Task(result) = result else { todo!("task failed; handle it correctly") @@ -416,13 +443,71 @@ impl TaskManager for Scheduler { running_task.task_started.elapsed().as_secs() ); - if let Err(err) = self.database.mark_tx_executed(&running_task.task.tx).await { - tracing::error!( - "failed to update transaction.executed => true - tx.hash: {} error:{err}", - &running_task.task.tx - ); - } - + // Handle tx execution's result files so that they are available as an input for next task if needed. + let executed_files: Vec<(File, File)> = result + .files + .into_iter() + .map(|file| { + let vm_file = File::::new( + file.path.to_string(), + file.checksum[..].into(), + running_task.task.tx, + ); + let dest = File::::new( + file.path, + self.http_download_host.clone(), + file.checksum[..].into(), + ); + (vm_file, dest) + }) + .collect(); + + //TODO ? -Verify that all expected files has been generated. + // let executed_files: Vec<(VmFile, TxFile)> = + // if let Ok(Some(tx)) = self.database.find_transaction(&running_task.task.tx).await { + // if let Payload::Run { workflow } = tx.payload { + // workflow + // .steps + // .iter() + // .flat_map(|step| &step.inputs) + // .filter_map(|input| { + // let ProgramData::Output { + // source_program, + // file_name, + // } = input + // else { + // return None; + // }; + // let vm_file = VmFile { + // task_id: running_task.task.tx, + // name: file_name.to_string(), + // }; + + // let uuid = Uuid::new_v4(); + // //format file_name to keep the current filename and the uuid. + // //put uuid at the end so that the uuid is use to save the file. + // let new_file_name = format!("{}/{}", file_name, uuid); + // let dest = TxFile { + // //Real url will be calculated during download. + // url: self.http_download_host.clone(), + // name: new_file_name, + // checksum: Hash::default(), + // }; + // Some((vm_file, dest)) + // }) + // .collect() + // } else { + // vec![] + // } + // } else { + // vec![] + // }; + + let new_tx_files: Vec> = executed_files + .iter() + .map(|(_, file)| file) + .cloned() + .collect(); let nonce = rand::thread_rng().next_u64(); let tx = match running_task.task.kind { TaskKind::Proof => Transaction::new( @@ -430,6 +515,7 @@ impl TaskManager for Scheduler { parent: running_task.task.tx, prover: program, proof: result.data, + files: new_tx_files, }, &self.node_key, ), @@ -438,6 +524,7 @@ impl TaskManager for Scheduler { parent: running_task.task.tx, verifier: program, verification: result.data, + files: new_tx_files, }, &self.node_key, ), @@ -451,13 +538,29 @@ impl TaskManager for Scheduler { ); } }; + tracing::info!("Submit result Tx created:{}", tx.hash.to_string()); + + //Move tx file from execution Tx path to new Tx path + for (source_file, dest_file) in executed_files { + tracing::trace!( + "Move file {} checksum:{}", + dest_file.name, + dest_file.checksum + ); + if let Err(err) = + move_vmfile(&source_file, &dest_file, &self.data_directory, tx.hash).await + { + tracing::error!( + "failed to move excution file from: {source_file:?} to: {dest_file:?} error: {err}", + ); + } + } - let mut mempool = self.mempool.write().await; - if let Err(err) = mempool.add(tx.clone()).await { - tracing::error!("failed to add transaction to mempool: {}", err); + //send tx to validation process. + if let Err(err) = self.tx_sender.send_tx(tx).await { + tracing::error!("failed to send Tx result to validation process: {}", err); } else { - dbg!(tx); - tracing::info!("successfully added new tx to mempool from task result"); + tracing::info!("successfully sent new tx to tx validation from task result"); } tracing::debug!("terminating VM {} running program {}", vm_id, program); diff --git a/crates/node/src/scheduler/program_manager.rs b/crates/node/src/scheduler/program_manager.rs index 96702937..2820d063 100644 --- a/crates/node/src/scheduler/program_manager.rs +++ b/crates/node/src/scheduler/program_manager.rs @@ -60,6 +60,8 @@ impl ProgramManager { id: Hash, limits: Option, ) -> Result { + tracing::trace!("start_program task:{}", id.to_string()); + let program = match self.storage.find_program(&id).await? { Some(program) => program, None => return Err(ProgramError::ProgramNotFound(id.to_string()).into()), diff --git a/crates/node/src/storage/database/entity/transaction.rs b/crates/node/src/storage/database/entity/transaction.rs index eeb4b6b5..4fca6010 100644 --- a/crates/node/src/storage/database/entity/transaction.rs +++ b/crates/node/src/storage/database/entity/transaction.rs @@ -29,8 +29,8 @@ pub struct Transaction { pub executed: bool, } -impl From<&types::Transaction> for Transaction { - fn from(value: &types::Transaction) -> Self { +impl From<&types::Transaction> for Transaction { + fn from(value: &types::Transaction) -> Self { let kind = match value.payload { transaction::Payload::Empty => Kind::Empty, transaction::Payload::Transfer { .. } => Kind::Transfer, @@ -56,8 +56,8 @@ impl From<&types::Transaction> for Transaction { } } -impl From for types::Transaction { - fn from(value: Transaction) -> types::Transaction { +impl From for types::Transaction { + fn from(value: Transaction) -> types::Transaction { types::Transaction { author: value.author.into(), hash: value.hash, @@ -68,6 +68,7 @@ impl From for types::Transaction { signature: value.signature, propagated: value.propagated, executed: value.executed, + state: transaction::Validated, } } } diff --git a/crates/node/src/storage/database/postgres.rs b/crates/node/src/storage/database/postgres.rs index 9461cdb4..a0485351 100644 --- a/crates/node/src/storage/database/postgres.rs +++ b/crates/node/src/storage/database/postgres.rs @@ -1,16 +1,32 @@ -use std::time::Duration; - +use super::entity::{self}; +use crate::txvalidation::acl::AclWhiteListError; +use crate::txvalidation::acl::AclWhitelist; +use crate::types::file::DbFile; +use crate::types::{ + self, + transaction::{ProgramData, Validated}, + Hash, Program, Task, +}; use eyre::Result; use gevulot_node::types::program::ResourceRequest; +use libsecp256k1::PublicKey; use sqlx::{self, postgres::PgPoolOptions, FromRow, Row}; +use std::time::Duration; use uuid::Uuid; -use super::entity::{self}; -use crate::types::{self, transaction::ProgramData, File, Hash, Program, Task}; - const MAX_DB_CONNS: u32 = 64; const DB_CONNECT_TIMEOUT: Duration = Duration::from_secs(30); +#[async_trait::async_trait] +impl AclWhitelist for Database { + async fn contains(&self, key: &PublicKey) -> Result { + let key = entity::PublicKey(*key); + self.acl_whitelist_has(&key).await.map_err(|err| { + AclWhiteListError::InternalError(format!("Fail to query access list: {err}",)) + }) + } +} + #[derive(Clone)] pub struct Database { pool: sqlx::PgPool, @@ -137,11 +153,12 @@ impl Database { { let mut query_builder = - sqlx::QueryBuilder::new("INSERT INTO file ( task_id, name, url )"); + sqlx::QueryBuilder::new("INSERT INTO file ( task_id, name, url, checksum )"); query_builder.push_values(&t.files, |mut b, new_file| { b.push_bind(t.id) .push_bind(&new_file.name) - .push_bind(&new_file.url); + .push_bind(&new_file.url) + .push_bind(new_file.checksum); }); let query = query_builder.build(); @@ -166,10 +183,11 @@ impl Database { // Fetch accompanied Files for the Task. match task { Some(mut task) => { - let mut files = sqlx::query_as::<_, File>("SELECT * FROM file WHERE task_id = $1") - .bind(id) - .fetch_all(&mut *tx) - .await?; + let mut files = + sqlx::query_as::<_, DbFile>("SELECT * FROM file WHERE task_id = $1") + .bind(id) + .fetch_all(&mut *tx) + .await?; task.files.append(&mut files); Ok(Some(task)) } @@ -186,7 +204,7 @@ impl Database { .await?; for task in &mut tasks { - let mut files = sqlx::query_as::<_, File>("SELECT * FROM file WHERE task_id = $1") + let mut files = sqlx::query_as::<_, DbFile>("SELECT * FROM file WHERE task_id = $1") .bind(task.id) .fetch_all(&mut *tx) .await?; @@ -206,50 +224,14 @@ impl Database { Ok(()) } - pub async fn add_asset(&self, tx_hash: &Hash) -> Result<()> { - sqlx::query!( - "INSERT INTO assets ( tx ) VALUES ( $1 ) RETURNING *", - tx_hash.to_string(), - ) - .fetch_one(&self.pool) - .await?; - Ok(()) - } - - pub async fn has_assets_loaded(&self, tx_hash: &Hash) -> Result { - let res: Option = - sqlx::query("SELECT 1 FROM assets WHERE completed IS NOT NULL AND tx = $1") - .bind(tx_hash) - .map(|row: sqlx::postgres::PgRow| row.get(0)) - .fetch_optional(&self.pool) - .await?; - - Ok(res.is_some()) - } - - pub async fn get_incomplete_assets(&self) -> Result> { - let assets = - sqlx::query("SELECT tx FROM assets WHERE completed IS NULL ORDER BY created ASC") - .map(|row: sqlx::postgres::PgRow| row.get(0)) - .fetch_all(&self.pool) - .await?; - - Ok(assets) - } - - pub async fn mark_asset_complete(&self, tx_hash: &Hash) -> Result<()> { - sqlx::query("UPDATE assets SET completed = NOW() WHERE tx = $1") - .bind(&tx_hash.to_string()) - .execute(&self.pool) - .await?; - Ok(()) - } - // NOTE: There are plenty of opportunities for optimizations in following // transaction related operations. They are implemented naively on purpose // for now to maintain initial flexibility in development. Later on, these // queries here are easy low hanging fruits for optimizations. - pub async fn find_transaction(&self, tx_hash: &Hash) -> Result> { + pub async fn find_transaction( + &self, + tx_hash: &Hash, + ) -> Result>> { let mut db_tx = self.pool.begin().await?; let entity = @@ -348,6 +330,13 @@ impl Database { } } entity::transaction::Kind::Proof => { + //get payload files + let files = + sqlx::query_as::<_, DbFile>("SELECT * FROM txfile WHERE tx_id = $1") + .bind(tx_hash) + .fetch_all(&mut *db_tx) + .await?; + sqlx::query("SELECT parent, prover, proof FROM proof WHERE tx = $1") .bind(tx_hash) .map( @@ -355,6 +344,7 @@ impl Database { parent: row.get(0), prover: row.get(1), proof: row.get(2), + files: files.clone().into_iter().map(|file| file.into()).collect(), }, ) .fetch_one(&mut *db_tx) @@ -373,6 +363,12 @@ impl Database { .await? } entity::transaction::Kind::Verification => { + //get payload files + let files = + sqlx::query_as::<_, DbFile>("SELECT * FROM txfile WHERE tx_id = $1") + .bind(tx_hash) + .fetch_all(&mut *db_tx) + .await?; sqlx::query( "SELECT parent, verifier, verification FROM verification WHERE tx = $1", ) @@ -382,6 +378,7 @@ impl Database { parent: row.get(0), verifier: row.get(1), verification: row.get(2), + files: files.clone().into_iter().map(|file| file.into()).collect(), }, ) .fetch_one(&mut *db_tx) @@ -390,7 +387,7 @@ impl Database { _ => types::transaction::Payload::Empty, }; - let mut tx: types::transaction::Transaction = entity.into(); + let mut tx: types::transaction::Transaction = entity.into(); tx.payload = payload; Ok(Some(tx)) } else { @@ -398,7 +395,7 @@ impl Database { } } - pub async fn get_transactions(&self) -> Result> { + pub async fn get_transactions(&self) -> Result>> { let mut db_tx = self.pool.begin().await?; let refs: Vec = sqlx::query("SELECT hash FROM transaction") .map(|row: sqlx::postgres::PgRow| row.get(0)) @@ -416,7 +413,7 @@ impl Database { Ok(txs) } - pub async fn get_unexecuted_transactions(&self) -> Result> { + pub async fn get_unexecuted_transactions(&self) -> Result>> { let mut db_tx = self.pool.begin().await?; let refs: Vec = sqlx::query("SELECT hash FROM transaction WHERE executed IS false") .map(|row: sqlx::postgres::PgRow| row.get(0)) @@ -450,7 +447,7 @@ impl Database { Ok(refs) } - pub async fn add_transaction(&self, tx: &types::Transaction) -> Result<()> { + pub async fn add_transaction(&self, tx: &types::Transaction) -> Result<()> { let entity = entity::Transaction::from(tx); let mut db_tx = self.pool.begin().await?; @@ -539,6 +536,7 @@ impl Database { parent, prover, proof, + files, } => { sqlx::query( "INSERT INTO proof ( tx, parent, prover, proof ) VALUES ( $1, $2, $3, $4 ) ON CONFLICT (tx) DO NOTHING", @@ -549,6 +547,25 @@ impl Database { .bind(proof) .execute(&mut *db_tx) .await?; + + //save payload files + if !files.is_empty() { + let mut query_builder = sqlx::QueryBuilder::new( + "INSERT INTO txfile ( tx_id, name, url, checksum )", + ); + query_builder.push_values(files, |mut b, new_file| { + b.push_bind(tx.hash) + .push_bind(&new_file.name) + .push_bind(&new_file.url) + .push_bind(new_file.checksum); + }); + + let query = query_builder.build(); + if let Err(err) = query.execute(&mut *db_tx).await { + db_tx.rollback().await?; + return Err(err.into()); + } + } } types::transaction::Payload::ProofKey { parent, key } => { @@ -564,7 +581,9 @@ impl Database { parent, verifier, verification, + files, } => { + tracing::trace!("Postgres add_transaction tx:{}", tx.hash.to_string()); sqlx::query( "INSERT INTO verification ( tx, parent, verifier, verification ) VALUES ( $1, $2, $3, $4 ) ON CONFLICT (tx) DO NOTHING", ) @@ -574,6 +593,25 @@ impl Database { .bind(verification) .execute(&mut *db_tx) .await?; + + //save payload files + if !files.is_empty() { + let mut query_builder = sqlx::QueryBuilder::new( + "INSERT INTO txfile ( tx_id, name, url, checksum )", + ); + query_builder.push_values(files, |mut b, new_file| { + b.push_bind(tx.hash) + .push_bind(&new_file.name) + .push_bind(&new_file.url) + .push_bind(new_file.checksum); + }); + + let query = query_builder.build(); + if let Err(err) = query.execute(&mut *db_tx).await { + db_tx.rollback().await?; + return Err(err.into()); + } + } } _ => { /* ignore for now */ } } @@ -643,6 +681,7 @@ impl Database { #[cfg(test)] mod tests { + use gevulot_node::types::transaction::Created; use libsecp256k1::{PublicKey, SecretKey}; use rand::{rngs::StdRng, Rng, SeedableRng}; @@ -653,6 +692,20 @@ mod tests { use super::*; + //TODO change by impl From when module declaration between main and lib are solved. + fn into_validated(tx: Transaction) -> Transaction { + Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Validated, + } + } + #[ignore] #[tokio::test] async fn test_add_and_find_deploy_transaction() { @@ -696,6 +749,7 @@ mod tests { signature: Signature::default(), propagated: false, executed: false, + state: Validated, }; database @@ -757,6 +811,7 @@ mod tests { parent: run_tx.hash, prover: prover.hash, proof: vec![1], + files: vec![], }, &key, ); @@ -765,6 +820,7 @@ mod tests { parent: run_tx.hash, prover: prover.hash, proof: vec![2], + files: vec![], }, &key, ); @@ -773,6 +829,7 @@ mod tests { parent: run_tx.hash, prover: prover.hash, proof: vec![3], + files: vec![], }, &key, ); @@ -781,6 +838,7 @@ mod tests { parent: run_tx.hash, prover: prover.hash, proof: vec![4], + files: vec![], }, &key, ); @@ -790,6 +848,7 @@ mod tests { parent: proof1_tx.hash, verifier: verifier.hash, verification: vec![1], + files: vec![], }, &key, ); @@ -798,6 +857,7 @@ mod tests { parent: proof2_tx.hash, verifier: verifier.hash, verification: vec![2], + files: vec![], }, &key, ); @@ -806,6 +866,7 @@ mod tests { parent: proof3_tx.hash, verifier: verifier.hash, verification: vec![3], + files: vec![], }, &key, ); @@ -814,6 +875,7 @@ mod tests { parent: proof4_tx.hash, verifier: verifier.hash, verification: vec![4], + files: vec![], }, &key, ); @@ -845,7 +907,10 @@ mod tests { .expect("add verifier"); for tx in &txs { - database.add_transaction(tx).await.expect("add transaction"); + database + .add_transaction(&into_validated(tx.clone())) + .await + .expect("add transaction"); } // Pick random transaction from set. diff --git a/crates/node/src/storage/file.rs b/crates/node/src/storage/file.rs deleted file mode 100644 index 095cc228..00000000 --- a/crates/node/src/storage/file.rs +++ /dev/null @@ -1,104 +0,0 @@ -use eyre::Result; -use std::path::{Path, PathBuf}; -use tokio::io::AsyncWriteExt; - -pub struct File { - data_dir: PathBuf, -} - -impl File { - pub fn new(data_dir: &Path) -> Self { - File { - data_dir: PathBuf::new().join(data_dir), - } - } - - pub fn data_dir(&self) -> PathBuf { - self.data_dir.clone() - } - - pub async fn get_task_file( - &self, - task_id: &str, - path: &str, - ) -> Result> { - let mut path = Path::new(path); - if path.is_absolute() { - path = path.strip_prefix("/")?; - } - let path = PathBuf::new().join(&self.data_dir).join(task_id).join(path); - let fd = tokio::fs::File::open(path).await?; - Ok(tokio::io::BufReader::new(fd)) - } - - pub async fn move_task_file( - &self, - task_id_src: &str, - task_id_dst: &str, - path: &str, - ) -> Result<()> { - let mut path = Path::new(path); - if path.is_absolute() { - path = path.strip_prefix("/")?; - } - - let src_file_path = PathBuf::new() - .join(&self.data_dir) - .join(task_id_src) - .join(path); - let dst_file_path = PathBuf::new() - .join(&self.data_dir) - .join(task_id_dst) - .join(path); - - tracing::info!( - "moving file from {:#?} to {:#?}", - src_file_path, - dst_file_path - ); - - // Ensure any necessary subdirectories exists. - if let Some(parent) = dst_file_path.parent() { - tokio::fs::create_dir_all(parent) - .await - .expect("task file mkdir"); - } - - tokio::fs::rename(src_file_path, dst_file_path) - .await - .map_err(|e| e.into()) - } - - pub async fn save_task_file(&self, task_id: &str, path: &str, data: Vec) -> Result<()> { - let mut path = Path::new(path); - if path.is_absolute() { - path = path.strip_prefix("/")?; - } - - let file_path = PathBuf::new().join(&self.data_dir).join(task_id).join(path); - - tracing::debug!( - "saving task {} file {:#?} to {:#?}", - task_id, - path, - file_path - ); - - // Ensure any necessary subdirectories exists. - if let Some(parent) = file_path.parent() { - tokio::fs::create_dir_all(parent) - .await - .expect("task file mkdir"); - } - - let fd = tokio::fs::File::create(&file_path).await?; - let mut fd = tokio::io::BufWriter::new(fd); - - fd.write_all(data.as_slice()).await?; - fd.flush().await?; - - tracing::debug!("file {:#?} successfully written", file_path); - - Ok(()) - } -} diff --git a/crates/node/src/storage/mod.rs b/crates/node/src/storage/mod.rs index af53df79..ff48cd81 100644 --- a/crates/node/src/storage/mod.rs +++ b/crates/node/src/storage/mod.rs @@ -1,5 +1,2 @@ pub(crate) mod database; -mod file; - pub use database::postgres::Database; -pub use file::File; diff --git a/crates/node/src/txvalidation/acl.rs b/crates/node/src/txvalidation/acl.rs new file mode 100644 index 00000000..36587f2d --- /dev/null +++ b/crates/node/src/txvalidation/acl.rs @@ -0,0 +1,24 @@ +use async_trait::async_trait; +use eyre::Result; +use libsecp256k1::PublicKey; +use thiserror::Error; + +#[allow(clippy::enum_variant_names)] +#[derive(Error, Debug)] +pub enum AclWhiteListError { + #[error("An error occurs during ACLlist validation: {0}")] + InternalError(String), +} + +#[async_trait] +pub trait AclWhitelist: Send + Sync { + async fn contains(&self, key: &PublicKey) -> Result; +} + +pub struct AlwaysGrantAclWhitelist; +#[async_trait::async_trait] +impl AclWhitelist for AlwaysGrantAclWhitelist { + async fn contains(&self, _key: &PublicKey) -> Result { + Ok(true) + } +} diff --git a/crates/node/src/networking/download_manager.rs b/crates/node/src/txvalidation/download_manager.rs similarity index 61% rename from crates/node/src/networking/download_manager.rs rename to crates/node/src/txvalidation/download_manager.rs index 387e6410..cb86e50e 100644 --- a/crates/node/src/networking/download_manager.rs +++ b/crates/node/src/txvalidation/download_manager.rs @@ -1,4 +1,4 @@ -use crate::cli::Config; +use crate::types::file::{Download, File}; use eyre::eyre; use eyre::Result; use futures_util::TryStreamExt; @@ -11,26 +11,40 @@ use hyper::{Request, Response, StatusCode}; use hyper_util::rt::TokioIo; use std::net::SocketAddr; use std::path::Path; -use tokio::fs::File; +use std::path::PathBuf; +use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::task::JoinHandle; use tokio_util::io::ReaderStream; -/// download_file downloads file from the given `url` and saves it to file in `local_directory_path` + / + `file`. -pub async fn download_file( - url: &str, +/// Downloads file from the given `url` and saves it to file in `local_directory_path` + / + `file path`. +pub async fn download_asset_file( local_directory_path: &Path, - file: &str, - http_peer_list: Vec<(SocketAddr, Option)>, + http_peer_list: &[(SocketAddr, Option)], http_client: &reqwest::Client, - file_hash: gevulot_node::types::Hash, + asset_file: File, ) -> Result<()> { - tracing::trace!("download_file url:{url} local_directory_path:{local_directory_path:?} file:{file} file_hash:{file_hash} http_peer_list:{http_peer_list:?}"); - let url = reqwest::Url::parse(url)?; - let mut resp = match http_client.get(url.clone()).send().await { - Ok(resp) => resp, - Err(err) => { + let local_relative_file_path = asset_file.get_save_path(); + tracing::info!("download_file:{asset_file:?} local_directory_path:{local_directory_path:?} local_relative_file_path:{local_relative_file_path:?} http_peer_list:{http_peer_list:?}"); + + // Detect if the file already exist. If yes don't download. + if asset_file.exist(&local_relative_file_path).await { + tracing::trace!( + "download_asset_file: File already exist, skip download: {:#?}", + asset_file.get_save_path() + ); + return Ok(()); + } + + let mut resp = match tokio::time::timeout( + tokio::time::Duration::from_secs(5), + http_client.get(asset_file.url).send(), + ) + .await + { + Ok(Ok(resp)) if resp.status() == reqwest::StatusCode::OK => resp, + Ok(_) => { let peer_urls: Vec = http_peer_list .iter() .filter_map(|(peer, port)| { @@ -39,7 +53,7 @@ pub async fn download_file( let mut url = reqwest::Url::parse("http://localhost").unwrap(); //unwrap always succeed url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed url.set_port(Some(port)).unwrap(); //unwrap always succeed - url.set_path(file); //unwrap always succeed + url.set_path(local_relative_file_path.to_str().unwrap()); //unwrap Path always ok url }) }) @@ -47,19 +61,28 @@ pub async fn download_file( let mut resp = None; for url in peer_urls { if let Ok(val) = http_client.get(url.clone()).send().await { - tracing::trace!("download_file from peer url:{url}"); - resp = Some(val); - break; + if let reqwest::StatusCode::OK = val.status() { + tracing::trace!("download_file from peer url:{url}"); + resp = Some(val); + break; + } } } resp.ok_or(eyre!( - "Download no host found to download the file: {file:?}" + "Download no host found to download the file: {}", + asset_file.name ))? } + Err(err) => { + return Err(eyre!( + "Download file: {:?}, request send timeout.", + asset_file.name + )); + } }; if resp.status() == reqwest::StatusCode::OK { - let file_path = local_directory_path.join(file); + let file_path = local_directory_path.join(&local_relative_file_path); // Ensure any necessary subdirectories exists. if let Some(parent) = file_path.parent() { if let Ok(false) = tokio::fs::try_exists(parent).await { @@ -78,23 +101,44 @@ pub async fn download_file( //create the Hasher to verify the Hash let mut hasher = blake3::Hasher::new(); - while let Some(chunk) = resp.chunk().await? { - hasher.update(&chunk); - fd.write_all(&chunk).await?; + loop { + match tokio::time::timeout(tokio::time::Duration::from_secs(5), resp.chunk()).await { + Ok(Ok(Some(chunk))) => { + hasher.update(&chunk); + fd.write_all(&chunk).await?; + } + Ok(Ok(None)) => break, + Ok(Err(_)) => { + return Err(eyre!( + "Download file: {:?}, connection timeout", + asset_file.name + )); + } + Err(err) => { + return Err(eyre!( + "Download file: {:?}, http error:{err}", + asset_file.name + )); + } + } } fd.flush().await?; - let checksum: gevulot_node::types::Hash = (&hasher.finalize()).into(); - if checksum != file_hash { - Err(eyre!("Download file: {:?}, bad checksum", file)) + let checksum: crate::types::Hash = (&hasher.finalize()).into(); + if checksum != asset_file.checksum { + Err(eyre!( + "download_file:{:?} bad checksum checksum:{checksum} set_file.checksum:{}.", + asset_file.name, + asset_file.checksum + )) } else { //rename to original name Ok(std::fs::rename(tmp_file_path, file_path)?) } } else { Err(eyre!( - "failed to download file from {}: response status: {}", - url, + "failed to download file: {:?} response status: {}", + asset_file.name, resp.status() )) } @@ -102,13 +146,16 @@ pub async fn download_file( //start the local server and serve the specified file path. //Return the server task join handle. -pub async fn serve_files(config: &Config) -> Result> { - let mut bind_addr = config.p2p_listen_addr; - bind_addr.set_port(config.http_download_port); +pub async fn serve_files( + mut bind_addr: SocketAddr, + http_download_port: u16, + data_directory: Arc, +) -> Result> { + bind_addr.set_port(http_download_port); let listener = TcpListener::bind(bind_addr).await?; let jh = tokio::spawn({ - let data_directory = config.data_directory.clone(); + let data_directory = data_directory.clone(); async move { tracing::info!( "listening for http at {}", @@ -155,7 +202,7 @@ async fn server_process_file( let mut file_path = data_directory.join(file_digest); - let file = match File::open(&file_path).await { + let file = match tokio::fs::File::open(&file_path).await { Ok(file) => file, Err(_) => { //try to see if the file is currently being updated. diff --git a/crates/node/src/txvalidation/event.rs b/crates/node/src/txvalidation/event.rs new file mode 100644 index 00000000..a0cdcac3 --- /dev/null +++ b/crates/node/src/txvalidation/event.rs @@ -0,0 +1,200 @@ +use crate::txvalidation::acl::AclWhitelist; +use crate::txvalidation::download_manager; +use crate::txvalidation::EventProcessError; +use crate::types::{ + transaction::{Received, Validated}, + Transaction, +}; +use crate::Mempool; +use futures::future::join_all; +use futures_util::TryFutureExt; +use std::fmt::Debug; +use std::net::SocketAddr; +use std::path::Path; +use tokio::sync::mpsc::UnboundedSender; + +//event type. +#[derive(Debug, Clone)] +pub struct ReceivedTx; + +#[derive(Debug, Clone)] +pub struct DownloadTx; + +#[derive(Debug, Clone)] +pub struct NewTx; + +#[derive(Debug, Clone)] +pub struct PropagateTx; + +//Event processing depends on the marker type. +#[derive(Debug, Clone)] +pub struct TxEvent { + pub tx: Transaction, + pub tx_type: T, +} + +impl From> for TxEvent { + fn from(tx: Transaction) -> Self { + TxEvent { + tx, + tx_type: ReceivedTx, + } + } +} + +impl From> for TxEvent { + fn from(event: TxEvent) -> Self { + TxEvent { + tx: event.tx, + tx_type: DownloadTx, + } + } +} + +impl From> for TxEvent { + fn from(event: TxEvent) -> Self { + TxEvent { + tx: event.tx, + tx_type: NewTx, + } + } +} + +impl From> for Option> { + fn from(event: TxEvent) -> Self { + match event.tx.state { + Received::P2P => None, + Received::RPC => Some(TxEvent { + tx: event.tx, + tx_type: PropagateTx, + }), + Received::TXRESULT => Some(TxEvent { + tx: event.tx, + tx_type: PropagateTx, + }), + } + } +} + +//Processing of event that arrive: SourceTxType. +impl TxEvent { + pub async fn process_event( + self, + acl_whitelist: &impl AclWhitelist, + ) -> Result, EventProcessError> { + match self.validate_tx(acl_whitelist).await { + Ok(()) => Ok(self.into()), + Err(err) => Err(EventProcessError::ValidateError(format!( + "Tx validation fail:{err}" + ))), + } + } + + //Tx validation process. + async fn validate_tx( + &self, + acl_whitelist: &impl AclWhitelist, + ) -> Result<(), EventProcessError> { + self.tx.validate().map_err(|err| { + EventProcessError::ValidateError(format!("Error during transaction validation:{err}",)) + })?; + + // Secondly verify that author is whitelisted. + if !acl_whitelist.contains(&self.tx.author).await? { + return Err(EventProcessError::ValidateError( + "Tx permission denied signer not authorized".to_string(), + )); + } + + Ok(()) + } +} + +//Download Tx processing +impl TxEvent { + pub async fn process_event( + self, + local_directory_path: &Path, + http_peer_list: Vec<(SocketAddr, Option)>, + ) -> Result<(TxEvent, Option>), EventProcessError> { + let http_client = reqwest::Client::new(); + let asset_file_list = self.tx.get_asset_list().map_err(|err| { + EventProcessError::DownloadAssetError(format!( + "Asset file param conversion error:{err}" + )) + })?; + + let futures: Vec<_> = asset_file_list + .into_iter() + .map(|asset_file| { + download_manager::download_asset_file( + local_directory_path, + &http_peer_list, + &http_client, + asset_file, + ) + }) + .collect(); + join_all(futures) + .await + .into_iter() + .collect::, _>>() + .map_err(|err| { + EventProcessError::DownloadAssetError(format!("Execution error:{err}")) + })?; + let newtx: TxEvent = self.clone().into(); + let propagate: Option> = self.into(); + Ok((newtx, propagate)) + } +} + +//Propagate Tx processing +impl TxEvent { + pub async fn process_event( + self, + p2p_sender: &UnboundedSender>, + ) -> Result<(), EventProcessError> { + let tx = Transaction { + author: self.tx.author, + hash: self.tx.hash, + payload: self.tx.payload, + nonce: self.tx.nonce, + signature: self.tx.signature, + //TODO should be updated after the p2p send with a notification + propagated: true, + executed: self.tx.executed, + state: Validated, + }; + tracing::info!( + "Tx validation propagate tx:{} payload:{}", + tx.hash.to_string(), + tx.payload + ); + p2p_sender.send(tx).map_err(|err| Box::new(err).into()) + } +} + +impl TxEvent { + pub async fn process_event(self, mempool: &mut Mempool) -> Result<(), EventProcessError> { + let tx = Transaction { + author: self.tx.author, + hash: self.tx.hash, + payload: self.tx.payload, + nonce: self.tx.nonce, + signature: self.tx.signature, + //TODO should be updated after the p2p send with a notification + propagated: true, + executed: self.tx.executed, + state: Validated, + }; + tracing::info!( + "Tx validation save tx:{} payload:{}", + tx.hash.to_string(), + tx.payload + ); + mempool + .add(tx) + .map_err(|err| EventProcessError::SaveTxError(format!("{err}"))) + .await + } +} diff --git a/crates/node/src/txvalidation/mod.rs b/crates/node/src/txvalidation/mod.rs new file mode 100644 index 00000000..c3bb117c --- /dev/null +++ b/crates/node/src/txvalidation/mod.rs @@ -0,0 +1,205 @@ +use crate::txvalidation::event::{ReceivedTx, TxEvent}; +use crate::types::{ + transaction::{Created, Received, Validated}, + Transaction, +}; +use crate::Mempool; +use futures_util::Stream; +use futures_util::TryFutureExt; +use std::collections::HashMap; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; +use thiserror::Error; +use tokio::sync::mpsc; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::UnboundedReceiverStream; + +pub mod acl; +mod download_manager; +mod event; + +#[allow(clippy::enum_variant_names)] +#[derive(Error, Debug)] +pub enum EventProcessError { + #[error("Fail to rcv Tx from the channel: {0}")] + RcvChannelError(#[from] tokio::sync::oneshot::error::RecvError), + #[error("Fail to send the Tx on the channel: {0}")] + SendChannelError( + #[from] + tokio::sync::mpsc::error::SendError<(Transaction, Option)>, + ), + #[error("Fail to send the Tx on the channel: {0}")] + PropagateTxError(#[from] Box>>), + #[error("validation fail: {0}")] + ValidateError(String), + #[error("Tx asset fail to download because {0}")] + DownloadAssetError(String), + #[error("Save Tx error: {0}")] + SaveTxError(String), + #[error("AclWhite list authenticate error: {0}")] + AclWhiteListAuthError(#[from] acl::AclWhiteListError), +} + +pub type CallbackSender = oneshot::Sender>; + +//Sending Tx interface. +//Some marker type to define the sender source. +pub struct RpcSender; +#[derive(Clone)] +pub struct P2pSender; +pub struct TxResultSender; + +// `TxEventSender` holds the received transaction of a specific state together with an optional callback interface. +#[derive(Debug, Clone)] +pub struct TxEventSender { + sender: UnboundedSender<(Transaction, Option)>, + _marker: PhantomData, +} + +//Manage send from the p2p source +impl TxEventSender { + pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { + TxEventSender { + sender, + _marker: PhantomData, + } + } + + pub fn send_tx(&self, tx: Transaction) -> Result<(), EventProcessError> { + self.sender + .send((tx.into_received(Received::P2P), None)) + .map_err(|err| err.into()) + } +} + +//Manage send from the RPC source +impl TxEventSender { + pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { + TxEventSender { + sender, + _marker: PhantomData, + } + } + + pub async fn send_tx(&self, tx: Transaction) -> Result<(), EventProcessError> { + let (sender, rx) = oneshot::channel(); + self.sender + .send((tx.into_received(Received::RPC), Some(sender))) + .map_err(EventProcessError::from)?; + rx.await? + } +} + +//Manage send from the Tx result execution source +impl TxEventSender { + pub fn build(sender: UnboundedSender<(Transaction, Option)>) -> Self { + TxEventSender { + sender, + _marker: PhantomData, + } + } + + pub async fn send_tx(&self, tx: Transaction) -> Result<(), EventProcessError> { + let (sender, rx) = oneshot::channel(); + self.sender + .send((tx.into_received(Received::TXRESULT), Some(sender))) + .map_err(EventProcessError::from)?; + rx.await? + } +} + +//Main event processing loog. +pub async fn spawn_event_loop( + local_directory_path: PathBuf, + bind_addr: SocketAddr, + http_download_port: u16, + http_peer_list: Arc>>>, + acl_whitelist: Arc, + //New Tx are added to the mempool directly. + //Like for the p2p a stream can be use to decouple both process. + mempool: Arc>, +) -> eyre::Result<( + JoinHandle<()>, + //channel use to send RcvTx event to the processing + UnboundedSender<(Transaction, Option)>, + //output stream use to propagate Tx. + impl Stream>, +)> { + let local_directory_path = Arc::new(local_directory_path); + //start http download manager + let download_jh = + download_manager::serve_files(bind_addr, http_download_port, local_directory_path.clone()) + .await?; + + let (tx, mut rcv_tx_event_rx) = + mpsc::unbounded_channel::<(Transaction, Option)>(); + + let (p2p_sender, p2p_recv) = mpsc::unbounded_channel::>(); + let p2p_stream = UnboundedReceiverStream::new(p2p_recv); + let jh = tokio::spawn({ + let local_directory_path = local_directory_path.clone(); + + async move { + while let Some((tx, callback)) = rcv_tx_event_rx.recv().await { + //create new event with the Tx + let event: TxEvent = tx.into(); + + //process RcvTx(EventTx) event + let http_peer_list = convert_peer_list_to_vec(&http_peer_list).await; + + tracing::trace!("txvalidation receive event:{}", event.tx.hash.to_string()); + + //process the receive event + tokio::spawn({ + let p2p_sender = p2p_sender.clone(); + let local_directory_path = local_directory_path.clone(); + let acl_whitelist = acl_whitelist.clone(); + let mempool = mempool.clone(); + async move { + let res = event + .process_event(acl_whitelist.as_ref()) + .and_then(|download_event| { + download_event.process_event(&local_directory_path, http_peer_list) + }) + .and_then(|(new_tx, propagate_tx)| async move { + if let Some(propagate_tx) = propagate_tx { + propagate_tx.process_event(&p2p_sender).await?; + } + new_tx.process_event(&mut *(mempool.write().await)).await?; + + Ok(()) + }) + .await; + //log the error if any error is return + if let Err(ref err) = res { + tracing::error!("An error occurs during Tx validation: {err}",); + } + //send the execution result back if needed. + if let Some(callback) = callback { + //forget the result because if the RPC connection is closed the send can fail. + let _ = callback.send(res); + } + } + }); + } + } + }); + Ok((jh, tx, p2p_stream)) +} + +async fn convert_peer_list_to_vec( + http_peer_list: &tokio::sync::RwLock>>, +) -> Vec<(SocketAddr, Option)> { + http_peer_list + .read() + .await + .iter() + .map(|(a, p)| (*a, *p)) + .collect() +} diff --git a/crates/node/src/types/file.rs b/crates/node/src/types/file.rs new file mode 100644 index 00000000..684db250 --- /dev/null +++ b/crates/node/src/types/file.rs @@ -0,0 +1,253 @@ +use crate::types::transaction; +use crate::types::Hash; +use eyre::Result; +use serde::Deserialize; +use serde::Serialize; +use std::path::Path; +use std::path::PathBuf; + +pub async fn open_task_file( + data_dir: &PathBuf, + task_id: &str, + path: &str, +) -> Result> { + let mut path = Path::new(path); + if path.is_absolute() { + path = path.strip_prefix("/")?; + } + let path = PathBuf::new().join(data_dir).join(task_id).join(path); + let fd = tokio::fs::File::open(path).await?; + Ok(tokio::io::BufReader::new(fd)) +} + +//describe file data that is stored in the database. +// to manipulate file on disk use the equivalent type state definition File +#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, sqlx::FromRow)] +pub struct DbFile { + pub name: String, + pub url: String, + pub checksum: Hash, +} + +impl DbFile { + pub fn try_from_prg_data( + value: &transaction::ProgramData, + ) -> Result, &'static str> { + let file = match value { + transaction::ProgramData::Input { + file_name, + file_url, + checksum, + } => Some(DbFile { + name: file_name.clone(), + url: file_url.clone(), + checksum: checksum.clone().into(), + }), + transaction::ProgramData::Output { + source_program: _, + file_name: _, + } => { + //Output file are not use by Tx management. + None + } + }; + + Ok(file) + } +} + +// Type state definition of a file to manage all the different case of file manipulation. +// Download: Use to download the file data and move them to the right place. Constructed using the other type. +// ProofVerif: A file attached to a proof or verify Tx: Only downloaded on distant host. Nothing done on the host when the Tx has been executed. +// Vm(Hash): File generated by the VM execution. Move from Vm to ProofVerif place. +// Image: File attached to a Deploy Tx. Identify an image that are stored in the image directory. +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct File { + pub name: String, + pub url: String, + pub checksum: Hash, + pub extention: E, +} + +impl File { + pub fn build(name: String, url: String, checksum: Hash, extention: E) -> Self { + File { + name, + url, + checksum, + extention, + } + } +} + +//File type +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct Download((String, bool)); +// Asset file use download and install the file data locally. +// verify_exist: define if the exist verification do a real file system verification. +// Only file path using checksum should do file exist verification. +impl File { + pub fn new( + name: String, + url: String, + checksum: Hash, + tx_hash: Hash, + verify_exist: bool, + ) -> Self { + File::build( + name, + url, + checksum, + Download((tx_hash.to_string(), verify_exist)), + ) + } + + // Save Relative File path for downloaded files is / + pub fn get_save_path(&self) -> PathBuf { + let file_name = Path::new(&self.name) + .file_name() + .unwrap_or(std::ffi::OsStr::new("")); + let mut path = PathBuf::from(&self.extention.0 .0); + path.push(file_name); + path + } + + pub async fn exist(&self, root_path: &Path) -> bool { + if self.extention.0 .1 { + let file_path = root_path.join(self.get_save_path()); + tokio::fs::try_exists(file_path).await.unwrap_or(false) + } else { + false + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct ProofVerif; +impl File { + pub fn new(path: String, http_download_host: String, checksum: Hash) -> Self { + File::build(path, http_download_host, checksum, ProofVerif) + } + + pub fn into_download_file(self, tx_hash: Hash) -> File { + let relative_path = self.get_relatif_path(tx_hash).to_str().unwrap().to_string(); + let url = format!("{}/{}", self.url, relative_path); + File::::new(relative_path, url, self.checksum, tx_hash, true) + } + + // Save Relative File path for ProofVerif files is // + pub fn get_relatif_path(&self, tx_hash: Hash) -> PathBuf { + let file_name = Path::new(&self.name).file_name().unwrap_or_default(); + let mut path = PathBuf::from(tx_hash.to_string()); + path.push(self.checksum.to_string()); + path.push(file_name); + path + } + + pub fn vec_to_bytes(vec: &[File]) -> Result, bincode::Error> { + bincode::serialize(vec) + } + + pub async fn exist(&self, root_path: &Path, tx_hash: Hash) -> bool { + let file_path = root_path.join(self.get_relatif_path(tx_hash)); + tokio::fs::try_exists(file_path).await.unwrap_or(false) + } +} + +impl From for File { + fn from(file: DbFile) -> Self { + File::::new(file.name, file.url, file.checksum) + } +} + +// Describe a file generated by the VM. +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct Vm(Hash); +impl File { + pub fn new(path: String, checksum: Hash, task_tx: Hash) -> Self { + File::build(path, String::new(), checksum, Vm(task_tx)) + } + + pub fn get_relatif_path(&self) -> PathBuf { + let mut file_path = Path::new(&self.name); + if file_path.is_absolute() { + file_path = file_path.strip_prefix("/").unwrap(); // Unwrap tested in `is_absolute()`. + } + + let mut path = PathBuf::from(self.extention.0.to_string()); + path.push(file_path); + path + } + + pub async fn remove_file(&self, base_path: &Path) -> std::io::Result<()> { + let src_file_path = base_path.join(self.get_relatif_path()); + tokio::fs::remove_file(src_file_path).await + } +} + +pub async fn move_vmfile( + source: &File, + dest: &File, + base_path: &Path, + proofverif_tx_hash: Hash, +) -> Result<()> { + // If the dest file already exist don't copy it. + // Remove it from the VM. + if dest.exist(base_path, proofverif_tx_hash).await { + tracing::debug!( + "move_vmfile: dest file already exist:{:#?}. Remove VM file:{:#?}", + dest.get_relatif_path(proofverif_tx_hash), + source.get_relatif_path() + ); + source.remove_file(base_path).await.map_err(|e| e.into()) + } else { + let src_file_path = base_path.to_path_buf().join(source.get_relatif_path()); + let dst_file_path = base_path + .to_path_buf() + .join(dest.get_relatif_path(proofverif_tx_hash)); + + tracing::debug!( + "move_vmfile: moving file from {:#?} to {:#?}", + src_file_path, + dst_file_path + ); + + // Ensure any necessary subdirectories exists. + if let Some(parent) = dst_file_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .expect("task file mkdir"); + } + + tokio::fs::rename(src_file_path, dst_file_path) + .await + .map_err(|e| e.into()) + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct Image(Hash); +impl File { + pub fn try_from_prg_meta_data(value: &transaction::ProgramMetadata) -> Self { + File::build( + value.name.clone(), + value.image_file_url.clone(), + value.image_file_checksum.clone().into(), + Image(value.hash), + ) + } +} + +impl From> for File { + fn from(file: File) -> Self { + //image file has the image directory happened at the beginning. + let mut extention = PathBuf::from("images"); + extention.push(file.extention.0.to_string()); + File::build( + file.name, + file.url, + file.checksum, + Download((extention.to_str().unwrap().to_string(), false)), + ) + } +} diff --git a/crates/node/src/types/hash.rs b/crates/node/src/types/hash.rs index 92c24795..277f2b99 100644 --- a/crates/node/src/types/hash.rs +++ b/crates/node/src/types/hash.rs @@ -4,7 +4,7 @@ use serde::{de, Deserialize, Serialize}; use sqlx::{self, Decode, Encode, Postgres, Type}; use std::fmt; -const HASH_SIZE: usize = 32; +pub const HASH_SIZE: usize = 32; #[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct Hash([u8; HASH_SIZE]); diff --git a/crates/node/src/types/mod.rs b/crates/node/src/types/mod.rs index ba477a5b..80b704b1 100644 --- a/crates/node/src/types/mod.rs +++ b/crates/node/src/types/mod.rs @@ -1,5 +1,6 @@ mod account; mod deployment; +pub mod file; mod hash; mod key_capsule; pub mod program; @@ -15,5 +16,5 @@ pub use key_capsule::KeyCapsule; pub use program::Program; pub use signature::Signature; #[allow(unused_imports)] -pub use task::{File, Task, TaskId, TaskKind, TaskResult, TaskState}; +pub use task::{Task, TaskId, TaskKind, TaskResult, TaskState}; pub use transaction::{Transaction, TransactionTree}; diff --git a/crates/node/src/types/task.rs b/crates/node/src/types/task.rs index 8503e7f5..d1a96ec8 100644 --- a/crates/node/src/types/task.rs +++ b/crates/node/src/types/task.rs @@ -1,6 +1,5 @@ +use crate::types::file::DbFile; use serde::{Deserialize, Serialize}; -use std::path::Path; -use std::path::PathBuf; use uuid::Uuid; use super::hash::{deserialize_hash_from_json, Hash}; @@ -28,21 +27,6 @@ pub enum TaskKind { Nop, } -#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, sqlx::FromRow)] -pub struct File { - #[serde(skip_serializing, skip_deserializing)] - pub tx: Hash, - pub name: String, - pub url: String, -} - -impl File { - pub fn get_file_relative_path(&self) -> PathBuf { - let file_name = Path::new(&self.name).file_name().unwrap(); - PathBuf::new().join(self.tx.to_string()).join(file_name) - } -} - #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, sqlx::FromRow)] pub struct Task { pub id: TaskId, @@ -53,7 +37,7 @@ pub struct Task { pub program_id: Hash, pub args: Vec, #[sqlx(skip)] - pub files: Vec, + pub files: Vec, #[serde(skip_deserializing)] pub serial: i32, #[serde(skip_deserializing)] diff --git a/crates/node/src/types/transaction.rs b/crates/node/src/types/transaction.rs index 1f1c43b4..7dff9e18 100644 --- a/crates/node/src/types/transaction.rs +++ b/crates/node/src/types/transaction.rs @@ -1,12 +1,13 @@ -use std::{collections::HashSet, rc::Rc}; - +use super::file::{Download, File, Image, ProofVerif}; use super::signature::Signature; use super::{hash::Hash, program::ResourceRequest}; +use crate::types::transaction; use eyre::Result; use libsecp256k1::{sign, verify, Message, PublicKey, SecretKey}; use num_bigint::BigInt; use serde::{Deserialize, Serialize}; use sha3::{Digest, Sha3_256}; +use std::{collections::HashSet, rc::Rc}; use thiserror::Error; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] @@ -156,6 +157,7 @@ pub enum Payload { parent: Hash, prover: Hash, proof: Vec, + files: Vec>, }, ProofKey { parent: Hash, @@ -165,14 +167,33 @@ pub enum Payload { parent: Hash, verifier: Hash, verification: Vec, + files: Vec>, }, Cancel { parent: Hash, }, } +impl std::fmt::Display for Payload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let payload = match self { + Payload::Empty => "Empty", + Payload::Transfer { .. } => "Transfer", + Payload::Stake { .. } => "Stake", + Payload::Unstake { .. } => "Unstake", + Payload::Deploy { .. } => "Deploy", + Payload::Run { .. } => "Run", + Payload::Proof { .. } => "Proof", + Payload::ProofKey { .. } => "ProofKey", + Payload::Verification { .. } => "Verification", + Payload::Cancel { .. } => "Cancel", + }; + write!(f, "({})", payload) + } +} + impl Payload { - fn serialize_into(&self, buf: &mut Vec) { + pub fn serialize_into(&self, buf: &mut Vec) { match self { Payload::Empty => {} Payload::Transfer { to, value } => { @@ -201,10 +222,13 @@ impl Payload { parent, prover, proof, + files, } => { buf.append(&mut parent.to_vec()); buf.append(&mut prover.to_vec()); buf.append(proof.clone().as_mut()); + buf.append(proof.clone().as_mut()); + buf.append(&mut File::::vec_to_bytes(files).unwrap()); } Payload::ProofKey { parent, key } => { buf.append(&mut parent.to_vec()); @@ -214,10 +238,12 @@ impl Payload { parent, verifier, verification, + files, } => { buf.append(&mut parent.to_vec()); buf.append(&mut verifier.to_vec()); buf.append(verification.clone().as_mut()); + buf.append(&mut File::::vec_to_bytes(files).unwrap()); } Payload::Cancel { parent } => { buf.append(&mut parent.to_vec()); @@ -231,10 +257,42 @@ impl Payload { pub enum TransactionError { #[error("validation: {0}")] Validation(String), + #[error("General error: {0}")] + General(String), +} + +// Transaction definition. +// Type state are use to define the different state of a Tx. +// +// Tx are defined in 3 domains: Validation, Execution, Storage. +// Currently the same definition is used but different type should be defined (TODO). +// Only the validation type state are defined. +// Created : identify a Tx that has just been created. +// Received: Identify a Tx that has been received. Determine the received source. +// Validated: Identify a Tx that has been validated. Pass all the validation process. +// The validation suppose the Tx has been propagated. Currently there's no notification during the propagation (TODO). + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct Created; + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub enum Received { + P2P, + RPC, + TXRESULT, +} + +impl Received { + fn is_from_tx_exec_result(&self) -> bool { + matches!(self, Received::TXRESULT) + } } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] -pub struct Transaction { +pub struct Validated; + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct Transaction { pub author: PublicKey, pub hash: Hash, pub payload: Payload, @@ -244,9 +302,37 @@ pub struct Transaction { pub propagated: bool, #[serde(skip_serializing, skip_deserializing)] pub executed: bool, + pub state: T, +} + +impl Transaction { + pub fn compute_hash(&self) -> Hash { + let mut hasher = Sha3_256::new(); + let mut buf = vec![]; + hasher.update(self.author.serialize()); + self.payload.serialize_into(&mut buf); + hasher.update(buf); + hasher.update(self.nonce.to_be_bytes()); + (&hasher.finalize()[0..32]).into() + } +} + +impl Default for Transaction { + fn default() -> Self { + Self { + author: PublicKey::from_secret_key(&SecretKey::default()), + hash: Hash::default(), + payload: Payload::default(), + nonce: 0, + signature: Signature::default(), + propagated: false, + executed: false, + state: Created, + } + } } -impl Default for Transaction { +impl Default for Transaction { fn default() -> Self { Self { author: PublicKey::from_secret_key(&SecretKey::default()), @@ -256,11 +342,12 @@ impl Default for Transaction { signature: Signature::default(), propagated: false, executed: false, + state: Validated, } } } -impl Transaction { +impl Transaction { pub fn new(payload: Payload, signing_key: &SecretKey) -> Self { let author = PublicKey::from_secret_key(signing_key); @@ -272,10 +359,17 @@ impl Transaction { signature: Signature::default(), propagated: false, executed: false, + state: Created, }; tx.sign(signing_key); + tracing::debug!( + "Transaction::new tx:{} payload:{}", + tx.hash.to_string(), + tx.payload + ); + tx } @@ -287,23 +381,28 @@ impl Transaction { self.signature = sig.into(); } + pub fn into_received(self, state: Received) -> Transaction { + Transaction { + author: self.author, + hash: self.hash, + payload: self.payload, + nonce: self.nonce, + signature: self.signature, + propagated: self.propagated, + executed: self.executed, + state, + } + } +} + +impl Transaction { pub fn verify(&self) -> bool { let hash = self.compute_hash(); let msg: Message = hash.into(); verify(&msg, &self.signature.into(), &self.author) } - pub fn compute_hash(&self) -> Hash { - let mut hasher = Sha3_256::new(); - let mut buf = vec![]; - hasher.update(self.author.serialize()); - self.payload.serialize_into(&mut buf); - hasher.update(buf); - hasher.update(self.nonce.to_be_bytes()); - (&hasher.finalize()[0..32]).into() - } - - pub fn validate(&self) -> Result<()> { + pub fn validate(&self) -> Result<(), TransactionError> { if let Payload::Run { ref workflow } = self.payload { let mut programs = HashSet::new(); for step in &workflow.steps { @@ -311,8 +410,7 @@ impl Transaction { return Err(TransactionError::Validation(format!( "multiple programs in workflow: {}", &step.program - )) - .into()); + ))); } } } @@ -320,12 +418,68 @@ impl Transaction { if !self.verify() { return Err(TransactionError::Validation(String::from( "signature verification failed", - )) - .into()); + ))); } Ok(()) } + + pub fn get_asset_list(&self) -> Result>> { + match &self.payload { + transaction::Payload::Deploy { + prover, verifier, .. + } => Ok(vec![ + File::::try_from_prg_meta_data(prover).into(), + File::::try_from_prg_meta_data(verifier).into(), + ]), + Payload::Run { workflow } => { + workflow + .steps + .iter() + .flat_map(|step| &step.inputs) + .filter_map(|input| { + match input { + ProgramData::Input { + file_name, + file_url, + checksum, + } => Some((file_name, file_url, checksum)), + ProgramData::Output { .. } => { + /* ProgramData::Output as input means it comes from another + program execution -> skip this branch. */ + None + } + } + }) + .map(|(file_name, file_url, checksum)| { + //verify the url is valide. + reqwest::Url::parse(file_url)?; + Ok(File::::new( + file_name.to_string(), + file_url.clone(), + checksum.to_string().into(), + self.hash, + false, + )) + }) + .collect() + } + Payload::Proof { files, .. } | Payload::Verification { files, .. } => { + //generated file during execution has already been moved. No Download. + if self.state.is_from_tx_exec_result() { + Ok(vec![]) + } else { + files + .iter() + .map(|file| Ok(file.clone().into_download_file(self.hash))) + .collect() + } + } + // Other transaction types don't have external assets that would + // need processing. + _ => Ok(vec![]), + } + } } #[cfg(test)] @@ -340,6 +494,7 @@ mod tests { let sk = SecretKey::random(&mut StdRng::from_entropy()); let tx = Transaction::new(Payload::Empty, &sk); + let tx = tx.into_received(Received::P2P); assert!(tx.verify()); } @@ -353,6 +508,7 @@ mod tests { tx.nonce += 1; // Verify must return false. + let tx = tx.into_received(Received::TXRESULT); assert!(!tx.verify()); } @@ -367,31 +523,22 @@ mod tests { }; let sk = SecretKey::random(&mut StdRng::from_entropy()); - let tx = Transaction { + let tx = Transaction:: { author: PublicKey::from_secret_key(&sk), - hash: Hash::default(), payload: Payload::Run { workflow }, - nonce: 0, signature: Signature::default(), - propagated: false, - executed: false, + ..Default::default() }; + let tx = tx.into_received(Received::RPC); assert!(tx.validate().is_err()); } #[test] fn test_tx_validations_verifies_signature() { - let tx = Transaction { - author: PublicKey::from_secret_key(&SecretKey::default()), - hash: Hash::default(), - payload: Payload::Empty, - nonce: 0, - signature: Signature::default(), - propagated: false, - executed: false, - }; + let tx = Transaction::::default(); + let tx = tx.into_received(Received::RPC); assert!(tx.validate().is_err()); } } diff --git a/crates/node/src/vmm/qemu.rs b/crates/node/src/vmm/qemu.rs index 459ecbf7..f84c26b1 100644 --- a/crates/node/src/vmm/qemu.rs +++ b/crates/node/src/vmm/qemu.rs @@ -141,6 +141,7 @@ impl Provider for Qemu { // TODO: // - Builder to construct QEMU flags // - Handle GPUs + // - Verify that the file exists before booting the VM. Otherwise the node panics because the QEMU won't start. let img_file = Path::new(&self.config.data_directory) .join(IMAGES_DIR) @@ -265,7 +266,11 @@ impl Provider for Qemu { cmd.stderr(Stdio::from(stderr)); } - tracing::info!("starting QEMU. args:\n{:#?}\n", cmd.get_args()); + tracing::info!( + "Program:{} starting QEMU. args:\n{:#?}\n", + program.hash.to_string(), + cmd.get_args(), + ); qemu_vm_handle.child = Some(cmd.spawn().expect("failed to start VM")); @@ -285,7 +290,7 @@ impl Provider for Qemu { match Qmp::new(format!("localhost:{qmp_port}")).await { Ok(c) => client = Some(c), - Err(_) => { + Err(err) => { retry_count += 1; sleep(Duration::from_millis(10)).await; } diff --git a/crates/node/src/vmm/vm_server.rs b/crates/node/src/vmm/vm_server.rs index 9e543b84..77839f5b 100644 --- a/crates/node/src/vmm/vm_server.rs +++ b/crates/node/src/vmm/vm_server.rs @@ -1,21 +1,19 @@ +use crate::types::file; +use crate::types::Hash; +use crate::vmm::vm_server::grpc::file_data; +use async_trait::async_trait; +use eyre::Result; +use grpc::vm_service_server::VmService; +use grpc::{GetFileRequest, Task, TaskRequest, TaskResultResponse}; use std::fmt::Debug; use std::path::{Path, PathBuf}; use std::sync::Arc; - -use async_trait::async_trait; -use eyre::Result; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Code, Extensions, Request, Response, Status, Streaming}; -use grpc::vm_service_server::VmService; -use grpc::{GetFileRequest, Task, TaskRequest, TaskResultResponse}; - -use crate::storage; -use crate::types::Hash; -use crate::vmm::vm_server::grpc::file_data; - use self::grpc::{ FileChunk, FileData, FileMetadata, GenericResponse, TaskResponse, TaskResultRequest, }; @@ -55,7 +53,7 @@ pub trait ProgramRegistry: Send { pub struct VMServer { task_source: Arc, program_registry: Arc>, - file_storage: Arc, + file_data_dir: PathBuf, } impl Debug for VMServer { @@ -68,12 +66,12 @@ impl VMServer { pub fn new( task_source: Arc, program_registry: Arc>, - file_storage: Arc, + file_data_dir: PathBuf, ) -> Self { VMServer { task_source, program_registry, - file_storage, + file_data_dir, } } @@ -108,23 +106,21 @@ impl VmService for VMServer { }; let reply = match self.task_source.get_task(program, vm_id).await { - Some(task) => { - tracing::info!("task has {} files", task.files.len()); - grpc::TaskResponse { - result: Some(grpc::task_response::Result::Task(grpc::Task { - id: task.id.to_string(), - name: task.name.to_string(), - args: task.args, - files: task.files, - })), - } - } + Some(task) => grpc::TaskResponse { + result: Some(grpc::task_response::Result::Task(grpc::Task { + id: task.id.to_string(), + name: task.name.to_string(), + args: task.args, + files: task.files, + })), + }, None => grpc::TaskResponse { result: Some(grpc::task_response::Result::Error( grpc::TaskError::Unavailable.into(), )), }, }; + tracing::trace!("VMServer get_task reply: {:?}", reply); Ok(Response::new(reply)) } @@ -138,14 +134,11 @@ impl VmService for VMServer { let req = request.into_inner(); - let mut file = match self - .file_storage - .get_task_file(&req.task_id, &req.path) - .await - { - Ok(file) => file, - Err(err) => return Err(Status::new(Code::NotFound, "couldn't get task file")), - }; + let mut file = + match file::open_task_file(&self.file_data_dir, &req.task_id, &req.path).await { + Ok(file) => file, + Err(err) => return Err(Status::new(Code::NotFound, "couldn't get task file")), + }; let (tx, rx) = mpsc::channel(4); tokio::spawn({ @@ -223,7 +216,7 @@ impl VmService for VMServer { } let file_path = PathBuf::new() - .join(self.file_storage.data_dir()) + .join(&self.file_data_dir) .join(task_id) .join(path); @@ -310,6 +303,11 @@ impl VmService for VMServer { } }; + tracing::trace!( + "VMServer submit_result program:{}, vm_id:{vm_id}", + program.to_string() + ); + let result = request.into_inner().result; if let Some(result) = result { diff --git a/crates/node/src/workflow/mod.rs b/crates/node/src/workflow/mod.rs index 967649d2..877faaf2 100644 --- a/crates/node/src/workflow/mod.rs +++ b/crates/node/src/workflow/mod.rs @@ -1,16 +1,14 @@ -use std::sync::Arc; - +use crate::types::file::DbFile; use async_trait::async_trait; use eyre::Result; use gevulot_node::types::{ - transaction::{Payload, ProgramData, Workflow, WorkflowStep}, - File, Hash, Task, TaskKind, Transaction, + transaction::{Payload, Validated, Workflow, WorkflowStep}, + Hash, Task, TaskKind, Transaction, }; +use std::sync::Arc; use thiserror::Error; use uuid::Uuid; -use crate::storage; - #[allow(clippy::enum_variant_names)] #[derive(Error, Debug, PartialEq)] pub enum WorkflowError { @@ -25,28 +23,28 @@ pub enum WorkflowError { #[error("transaction not found: {0}")] TransactionNotFound(Hash), + + #[error("Program file definition error: {0}")] + FileDefinitionError(String), } #[async_trait] pub trait TransactionStore: Sync + Send { - async fn find_transaction(&self, tx_hash: &Hash) -> Result>; + async fn find_transaction(&self, tx_hash: &Hash) -> Result>>; + async fn mark_tx_executed(&self, tx_hash: &Hash) -> Result<()>; } pub struct WorkflowEngine { tx_store: Arc, - file_storage: Arc, } impl WorkflowEngine { - pub fn new(tx_store: Arc, file_storage: Arc) -> Self { - WorkflowEngine { - tx_store, - file_storage, - } + pub fn new(tx_store: Arc) -> Self { + WorkflowEngine { tx_store } } - pub async fn next_task(&self, cur_tx: &Transaction) -> Result> { - let workflow = self.workflow_for_transaction(&cur_tx.hash).await?; + pub async fn next_task(&self, cur_tx: &Transaction) -> Result> { + let opt_workflow = self.workflow_for_transaction(&cur_tx.hash).await?; match &cur_tx.payload { Payload::Run { workflow } => { @@ -69,12 +67,20 @@ impl WorkflowEngine { parent, prover, proof, + .. } => { tracing::debug!("creating next task from Proof tx {}", &cur_tx.hash); + let Some(workflow) = opt_workflow else { + return Err(WorkflowError::WorkflowTransactionMissing(format!( + "Proof tx with no workflow {}", + cur_tx.hash.clone(), + )) + .into()); + }; match workflow.steps.iter().position(|s| s.program == *prover) { Some(proof_step_idx) => { - if proof_step_idx <= workflow.steps.len() { + if workflow.steps.len() <= proof_step_idx { Err(WorkflowError::WorkflowStepMissing(format!( "verifier for proof tx {}", cur_tx.hash.clone(), @@ -119,8 +125,16 @@ impl WorkflowEngine { parent, prover, proof, + .. } = proof_tx.payload { + let Some(workflow) = opt_workflow else { + return Err(WorkflowError::WorkflowTransactionMissing(format!( + "Proof tx with no workflow {}", + cur_tx.hash.clone(), + )) + .into()); + }; match workflow.steps.iter().position(|s| s.program == prover) { Some(proof_step_idx) => { if workflow.steps.len() <= proof_step_idx { @@ -150,6 +164,22 @@ impl WorkflowEngine { Err(WorkflowError::IncompatibleTransaction(proof_tx.hash.to_string()).into()) } } + Payload::Verification { .. } => { + // Execute the verify tx by setting to executed. + // Ideally it's not the right place to execute a Tx + // but as the execution is nothing, it's more convenient. + tracing::debug!("Mark as executed Payload::Verification tx {}", &cur_tx.hash); + self.tx_store.mark_tx_executed(&cur_tx.hash).await?; + Ok(None) + } + Payload::Deploy { .. } => { + // Execute the Deploy tx by setting to executed. + // Ideally it's not the right place to execute a Tx + // but as the execution is only a move of file that has been done, it's more convenient. + tracing::debug!("Mark as executed Payload::Deploy tx {}", &cur_tx.hash); + self.tx_store.mark_tx_executed(&cur_tx.hash).await?; + Ok(None) + } _ => Err(WorkflowError::IncompatibleTransaction( "unsupported payload type".to_string(), ) @@ -204,9 +234,9 @@ impl WorkflowEngine { cur_tx = parent; continue; } - _ => { + payload => { tracing::debug!( - "failed to find workflow for transaction {}: incompatible transaction", + "Find parent failed to find workflow for transaction {}: incompatible transaction: {payload:?}", cur_tx ); return Err(WorkflowError::IncompatibleTransaction(cur_tx.to_string()).into()); @@ -215,7 +245,7 @@ impl WorkflowEngine { } } - async fn workflow_for_transaction(&self, tx_hash: &Hash) -> Result { + async fn workflow_for_transaction(&self, tx_hash: &Hash) -> Result> { let mut tx_hash = *tx_hash; tracing::debug!("finding workflow for transaction {}", tx_hash); @@ -232,9 +262,10 @@ impl WorkflowEngine { match tx.unwrap().payload { Payload::Run { workflow } => { tracing::debug!("workflow found for transaction {}", tx_hash); - return Ok(workflow); + return Ok(Some(workflow)); } Payload::Proof { parent, .. } => { + //if we return the parent Tx it's reexecuted an generate a duplicate key value violates unique constraint error in the db tracing::debug!("finding workflow from parent {} of {}", &parent, tx_hash); tx_hash = parent; continue; @@ -245,13 +276,19 @@ impl WorkflowEngine { continue; } Payload::Verification { parent, .. } => { - tracing::debug!("finding workflow from parent {} of {}", &parent, tx_hash); - tx_hash = parent; - continue; + // //// XXX: If we return the parent tx, it gets re-executed and would generate + // //// a duplicate key value violates unique constraint error in the db + // tracing::debug!("finding workflow from parent {} of {}", &parent, tx_hash); + // tx_hash = parent; + // continue; + + //no workflow for Verif Tx + return Ok(None); } - _ => { + Payload::Deploy { .. } => return Ok(None), + payload => { tracing::debug!( - "failed to find workflow for transaction {}: incompatible transaction", + "failed to find workflow for transaction {}: incompatible transaction :{payload:?}", &tx_hash ); return Err(WorkflowError::IncompatibleTransaction(tx_hash.to_string()).into()); @@ -267,47 +304,16 @@ impl WorkflowEngine { kind: TaskKind, ) -> Result { let id = Uuid::new_v4(); - let mut file_transfers = vec![]; + let file_transfers: Vec<(Hash, String)> = vec![]; let files = step .inputs .iter() - .map(|e| match e { - ProgramData::Input { - file_name, - file_url, - .. - } => File { - tx, - name: file_name.clone(), - url: file_url.clone(), - }, - ProgramData::Output { - source_program, - file_name, - } => { - // Make record of file that needs transfer from source tx to current tx's files. - file_transfers.push((*source_program, file_name.clone())); - - File { - tx, - name: file_name.clone(), - url: "".to_string(), - } - } + .filter_map(|e| { + DbFile::try_from_prg_data(e) + .map_err(|err| WorkflowError::FileDefinitionError(err.to_string()).into()) + .transpose() }) - .collect(); - - // Process file transfers from source programs. - for (source_program, file_name) in file_transfers { - let source_tx = self - .find_parent_tx_for_program(&tx, &source_program) - .await - .expect("output file dependency missing"); - - self.file_storage - .move_task_file(&source_tx.to_string(), &tx.to_string(), &file_name) - .await?; - } + .collect::>>()?; Ok(Task { id, @@ -324,7 +330,8 @@ impl WorkflowEngine { #[cfg(test)] mod tests { - use std::{collections::HashMap, env::temp_dir}; + use gevulot_node::types::transaction::Created; + use std::collections::HashMap; use gevulot_node::types::{ transaction::{Payload, ProgramData, Workflow, WorkflowStep}, @@ -336,11 +343,11 @@ mod tests { use super::*; pub struct TxStore { - pub txs: HashMap, + pub txs: HashMap>, } impl TxStore { - pub fn new(txs: &[Transaction]) -> Self { + pub fn new(txs: &[Transaction]) -> Self { let mut store = TxStore { txs: HashMap::with_capacity(txs.len()), }; @@ -353,17 +360,18 @@ mod tests { #[async_trait] impl TransactionStore for TxStore { - async fn find_transaction(&self, tx_hash: &Hash) -> Result> { + async fn find_transaction(&self, tx_hash: &Hash) -> Result>> { Ok(self.txs.get(tx_hash).cloned()) } + async fn mark_tx_executed(&self, tx_hash: &Hash) -> Result<()> { + // Do nothing because the txs map can't be modified behind a &self. + Ok(()) + } } #[tokio::test] async fn test_next_task_for_empty_workflow_steps() { - let wfe = WorkflowEngine::new( - Arc::new(TxStore::new(&[])), - Arc::new(storage::File::new(&temp_dir())), - ); + let wfe = WorkflowEngine::new(Arc::new(TxStore::new(&[]))); let tx = transaction_for_workflow_steps(vec![]); if let Payload::Run { workflow } = &tx.payload { let res = wfe.next_task(&tx).await; @@ -392,10 +400,7 @@ mod tests { }; let tx = transaction_for_workflow_steps(vec![proving.clone(), verifying]); - let wfe = WorkflowEngine::new( - Arc::new(TxStore::new(&[tx.clone()])), - Arc::new(storage::File::new(&temp_dir())), - ); + let wfe = WorkflowEngine::new(Arc::new(TxStore::new(&[tx.clone()]))); if let Payload::Run { workflow } = &tx.payload { let task = wfe.next_task(&tx).await.expect("next_task").unwrap(); @@ -433,57 +438,69 @@ mod tests { let proofkey_tx = transaction_for_proofkey(&proof_tx.hash); let verification_tx = transaction_for_verification(&proof_tx.hash, &verifier_hash); let tx_store = TxStore::new(&[root_tx, proof_tx, proofkey_tx.clone(), verification_tx]); - let wfe = WorkflowEngine::new( - Arc::new(tx_store), - Arc::new(storage::File::new(&temp_dir())), - ); + let wfe = WorkflowEngine::new(Arc::new(tx_store)); let task = wfe.next_task(&proofkey_tx).await; assert!(task.is_ok()); } - fn transaction_for_workflow_steps(steps: Vec) -> Transaction { + fn into_validated(tx: Transaction) -> Transaction { + Transaction { + author: tx.author, + hash: tx.hash, + payload: tx.payload, + nonce: tx.nonce, + signature: tx.signature, + propagated: tx.executed, + executed: tx.executed, + state: Validated, + } + } + + fn transaction_for_workflow_steps(steps: Vec) -> Transaction { let key = SecretKey::random(&mut StdRng::from_entropy()); - Transaction::new( + into_validated(Transaction::new( Payload::Run { workflow: Workflow { steps }, }, &key, - ) + )) } - fn transaction_for_proof(parent: &Hash, program: &Hash) -> Transaction { + fn transaction_for_proof(parent: &Hash, program: &Hash) -> Transaction { let key = SecretKey::random(&mut StdRng::from_entropy()); - Transaction::new( + into_validated(Transaction::new( Payload::Proof { parent: *parent, prover: *program, proof: "proof.".into(), + files: vec![], }, &key, - ) + )) } - fn transaction_for_proofkey(parent: &Hash) -> Transaction { + fn transaction_for_proofkey(parent: &Hash) -> Transaction { let key = SecretKey::random(&mut StdRng::from_entropy()); - Transaction::new( + into_validated(Transaction::new( Payload::ProofKey { parent: *parent, key: "key.".into(), }, &key, - ) + )) } - fn transaction_for_verification(parent: &Hash, program: &Hash) -> Transaction { + fn transaction_for_verification(parent: &Hash, program: &Hash) -> Transaction { let key = SecretKey::random(&mut StdRng::from_entropy()); - Transaction::new( + into_validated(Transaction::new( Payload::Verification { parent: *parent, verifier: *program, verification: b"verification.".to_vec(), + files: vec![], }, &key, - ) + )) } } diff --git a/crates/shim/Cargo.toml b/crates/shim/Cargo.toml index f1cb7b83..72388086 100644 --- a/crates/shim/Cargo.toml +++ b/crates/shim/Cargo.toml @@ -7,6 +7,7 @@ license = "MIT OR Apache-2.0" [dependencies] anyhow = "1" async-stream = "0.3.5" +blake3 = "1.5" prost = "0.11" tokio = { version = "1.0", features = ["fs", "macros", "rt-multi-thread"] } tokio-stream = "0.1" diff --git a/crates/shim/proto/vm_service.proto b/crates/shim/proto/vm_service.proto index e575eb63..d9497e4b 100644 --- a/crates/shim/proto/vm_service.proto +++ b/crates/shim/proto/vm_service.proto @@ -41,6 +41,11 @@ message GetFileRequest { string path = 2; } +message File { + string path = 1; + bytes checksum = 2; +} + message FileMetadata { string task_id = 1; string path = 2; @@ -65,7 +70,7 @@ message FileData { message TaskResult { string id = 1; bytes data = 2; - repeated string files = 3; + repeated File files = 3; } message TaskResultRequest { diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index a7c9ae4e..a2986292 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -1,11 +1,11 @@ +use grpc::vm_service_client::VmServiceClient; +use grpc::{FileChunk, FileData, FileMetadata}; use std::fs::File; use std::io::{BufRead, BufReader}; use std::path::PathBuf; +use std::sync::Arc; use std::time::Instant; use std::{path::Path, thread::sleep, time::Duration}; - -use grpc::vm_service_client::VmServiceClient; -use grpc::{FileChunk, FileData, FileMetadata}; use tokio::io::AsyncReadExt; use tokio::runtime::Runtime; use tokio::{io::AsyncWriteExt, sync::Mutex}; @@ -154,9 +154,12 @@ impl GRPCClient { Ok(Some(task)) } - fn submit_file(&mut self, task_id: TaskId, file_path: String) -> Result<()> { + fn submit_file(&mut self, task_id: TaskId, file_path: String) -> Result<[u8; 32]> { + let hasher = Arc::new(Mutex::new(blake3::Hasher::new())); self.rt.block_on(async { + let stream_hasher = Arc::clone(&hasher); let outbound = async_stream::stream! { + let fd = match tokio::fs::File::open(&file_path).await { Ok(fd) => fd, Err(err) => { @@ -175,11 +178,12 @@ impl GRPCClient { yield metadata; let mut buf: [u8; DATA_STREAM_CHUNK_SIZE] = [0; DATA_STREAM_CHUNK_SIZE]; - loop { match file.read(&mut buf).await { Ok(0) => return, Ok(n) => { + + stream_hasher.lock().await.update(&buf[..n]); yield FileData{ result: Some(grpc::file_data::Result::Chunk(FileChunk{ data: buf[..n].to_vec() }))}; }, Err(err) => { @@ -194,20 +198,30 @@ impl GRPCClient { println!("failed to submit file: {}", err); } }); + let hasher = Arc::into_inner(hasher).unwrap().into_inner(); + let hash = hasher.finalize().into(); - Ok(()) + Ok(hash) } fn submit_result(&mut self, result: &TaskResult) -> Result { - for file in &result.files { - self.submit_file(result.id.clone(), file.clone())?; - } + let files = result + .files + .iter() + .map(|file| { + self.submit_file(result.id.clone(), file.clone()) + .map(|checksum| crate::grpc::File { + path: file.to_string(), + checksum: checksum.to_vec(), + }) + }) + .collect::>>()?; let task_result_req = grpc::TaskResultRequest { result: Some(grpc::task_result_request::Result::Task(grpc::TaskResult { id: result.id.clone(), data: result.data.clone(), - files: result.files.clone(), + files, })), }; diff --git a/crates/tests/e2e-tests/src/main.rs b/crates/tests/e2e-tests/src/main.rs index 22b77de3..eaca0a12 100644 --- a/crates/tests/e2e-tests/src/main.rs +++ b/crates/tests/e2e-tests/src/main.rs @@ -1,3 +1,4 @@ +use gevulot_node::types::transaction::Created; use std::{ net::SocketAddr, path::{Path, PathBuf}, @@ -99,7 +100,19 @@ async fn deploy_programs( .expect("get_transaction"); assert!(read_tx.is_some()); - assert_eq!(tx, read_tx.unwrap()); + let read_tx = read_tx.unwrap(); + let read_tx = Transaction { + author: read_tx.author, + hash: read_tx.hash, + payload: read_tx.payload, + nonce: read_tx.nonce, + signature: read_tx.signature, + propagated: false, + executed: false, + state: Created, + }; + + assert_eq!(tx, read_tx); Ok((tx.hash, prover.hash, verifier.hash)) }