From 0af4ed15e204324a2765593e2d87719f9e7a4d6f Mon Sep 17 00:00:00 2001 From: dominicburkart Date: Sun, 16 Jun 2024 20:21:07 +0200 Subject: [PATCH] basic pest grammar --- marigold-grammar/Cargo.toml | 8 +- marigold-grammar/build.rs | 5 - marigold-grammar/src/ast.lalrpop | 908 ----------------------------- marigold-grammar/src/ast.rs | 139 +++++ marigold-grammar/src/lib.rs | 323 +++++++++- marigold-grammar/src/marigold.pest | 31 + 6 files changed, 476 insertions(+), 938 deletions(-) delete mode 100644 marigold-grammar/build.rs delete mode 100644 marigold-grammar/src/ast.lalrpop create mode 100644 marigold-grammar/src/ast.rs create mode 100644 marigold-grammar/src/marigold.pest diff --git a/marigold-grammar/Cargo.toml b/marigold-grammar/Cargo.toml index 2fcaca6..3c29cae 100644 --- a/marigold-grammar/Cargo.toml +++ b/marigold-grammar/Cargo.toml @@ -3,7 +3,6 @@ name = "marigold-grammar" version = "0.1.16" edition = "2021" authors = ["Dominic "] -build = "build.rs" description = "Grammar for the marigold language." license = "Apache-2.0 OR MIT" repository = "/~https://github.com/DominicBurkart/marigold" @@ -11,11 +10,9 @@ repository = "/~https://github.com/DominicBurkart/marigold" [features] io = [] -[build-dependencies] -lalrpop = "0.19.8" - [dependencies] -lalrpop-util = {version="0.19.8", features=["lexer"]} +pest = "2.7" +pest_derive = "2.7" regex = "1" num-traits = "0.2" itertools = "0.10.2" @@ -24,3 +21,4 @@ rusymbols = "0.1.2" num-bigint = "0.4" arrayvec = "0.7" once_cell = "1.15.0" +thiserror = "1.0.61" diff --git a/marigold-grammar/build.rs b/marigold-grammar/build.rs deleted file mode 100644 index 23c7d3f..0000000 --- a/marigold-grammar/build.rs +++ /dev/null @@ -1,5 +0,0 @@ -extern crate lalrpop; - -fn main() { - lalrpop::process_root().unwrap(); -} diff --git a/marigold-grammar/src/ast.lalrpop b/marigold-grammar/src/ast.lalrpop deleted file mode 100644 index c871466..0000000 --- a/marigold-grammar/src/ast.lalrpop +++ /dev/null @@ -1,908 +0,0 @@ -use crate::nodes; -use crate::type_aggregation; -use std::str::FromStr; -use regex::Regex; - -grammar; - -pub Program: String = { - Expr* => { - let mut output = "async { - use ::marigold::marigold_impl::*; - ".to_string(); - - // Before we can start streaming, we need to declare the helpers: - // the enums and structs, functions, and then the stream variable declarations. - let enums_and_structs = <> - .iter() - .filter_map(|expr| match expr { - crate::nodes::TypedExpression::UnnamedReturningStream(_) => None, - crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::NamedReturningStream(_) => None, - crate::nodes::TypedExpression::NamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::StructDeclaration(s) => Some(s.code()), - crate::nodes::TypedExpression::EnumDeclaration(e) => Some(e.code()), - crate::nodes::TypedExpression::StreamVariable(v) => None, - crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => None, - crate::nodes::TypedExpression::FnDeclaration(f) => None, - }) - .map(|s| format!("{s}\n\n")) - .collect::>() - .join(""); - - output.push_str(&enums_and_structs); - - let functions = <> - .iter() - .filter_map(|expr| match expr { - crate::nodes::TypedExpression::UnnamedReturningStream(_) => None, - crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::NamedReturningStream(_) => None, - crate::nodes::TypedExpression::NamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::StructDeclaration(_) => None, - crate::nodes::TypedExpression::EnumDeclaration(_) => None, - crate::nodes::TypedExpression::StreamVariable(_) => None, - crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None, - crate::nodes::TypedExpression::FnDeclaration(f) => Some(f.code()), - }) - .map(|s| format!("{s}\n\n")) - .collect::>() - .join(""); - - output.push_str(&functions); - - let stream_variable_declarations = <> - .iter() - .filter_map(|expr| match expr { - crate::nodes::TypedExpression::UnnamedReturningStream(_) => None, - crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::NamedReturningStream(_) => None, - crate::nodes::TypedExpression::NamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::StructDeclaration(s) => None, - crate::nodes::TypedExpression::EnumDeclaration(e) => None, - crate::nodes::TypedExpression::StreamVariable(v) => Some(v.declaration_code()), - crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => Some(v.declaration_code()), - crate::nodes::TypedExpression::FnDeclaration(f) => None - }) - .map(|s| format!("{s}\n\n")) - .collect::>() - .join(""); - - output.push_str(&stream_variable_declarations); - - let returning_stream_vec = <> - .iter() - .filter_map( - |expr| match expr { - crate::nodes::TypedExpression::UnnamedReturningStream(s) => Some(s.code()), - crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::NamedReturningStream(s) => Some(s.code()), - crate::nodes::TypedExpression::NamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::StructDeclaration(_) => None, - crate::nodes::TypedExpression::EnumDeclaration(_) => None, - crate::nodes::TypedExpression::StreamVariable(_) => None, - crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None, - crate::nodes::TypedExpression::FnDeclaration(f) => None, - }) - .collect::>(); - - let n_returning_streams = returning_stream_vec.len(); - - output.push_str( - returning_stream_vec - .iter() - .zip(0..n_returning_streams) - .map(|(stream_def, i)| format!("let returning_stream_{i} = Box::pin({stream_def});\n")) - .collect::>() - .join("") - .as_str() - ); - - let non_returning_streams = <> - .iter() - .filter_map( - |expr| match expr { - crate::nodes::TypedExpression::UnnamedReturningStream(_) => None, - crate::nodes::TypedExpression::UnnamedNonReturningStream(s) => Some(s.code()), - crate::nodes::TypedExpression::NamedReturningStream(_) => None, - crate::nodes::TypedExpression::NamedNonReturningStream(s) => Some(s.code()), - crate::nodes::TypedExpression::StructDeclaration(_) => None, - crate::nodes::TypedExpression::EnumDeclaration(_) => None, - crate::nodes::TypedExpression::StreamVariable(_) => None, - crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None, - crate::nodes::TypedExpression::FnDeclaration(f) => None, - }) - .collect::>(); - - output.push_str( - non_returning_streams - .iter() - .zip(0..non_returning_streams.len()) - .map(|(stream_def, i)| format!("let non_returning_stream_{i} = Box::pin({stream_def});\n")) - .collect::>() - .join("") - .as_str() - ); - - let stream_variable_runners = <> - .iter() - .filter_map( - |expr| match expr { - crate::nodes::TypedExpression::UnnamedReturningStream(_) => None, - crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::NamedReturningStream(_) => None, - crate::nodes::TypedExpression::NamedNonReturningStream(_) => None, - crate::nodes::TypedExpression::StructDeclaration(_) => None, - crate::nodes::TypedExpression::EnumDeclaration(_) => None, - crate::nodes::TypedExpression::StreamVariable(v) => Some(v.runner_code()), - crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => Some(v.runner_code()), - crate::nodes::TypedExpression::FnDeclaration(f) => None, - }) - .collect::>(); - - output.push_str( - stream_variable_runners - .iter() - .zip(0..stream_variable_runners.len()) - .map(|(stream_def, i)| format!("let stream_variable_runners_{i} = Box::pin({stream_def});\n")) - .collect::>() - .join("") - .as_str() - ); - - let mut streams_string = "vec![\n".to_string(); - - streams_string.push_str( - (0..n_returning_streams) - .map(|i| format!("returning_stream_{i},\n")) - .collect::>() - .join("") - .as_str() - ); - - streams_string.push_str( - (0..non_returning_streams.len()) - .map(|i| format!("non_returning_stream_{i},\n")) - .collect::>() - .join("") - .as_str() - ); - - streams_string.push_str( - (0..stream_variable_runners.len()) - .map(|i| format!("stream_variable_runners_{i},\n")) - .collect::>() - .join("") - .as_str() - ); - - streams_string.push_str("]\n"); - - if n_returning_streams > 0 { - output.push_str( - format!(" - /// silly function that uses generics to infer the output type (StreamItem) via generics, so that - /// we can provide the streams as an array of Pin>>. - #[inline(always)] - fn typed_stream_vec(v: Vec>>>) -> Vec>>> {{ - v - }} - ").as_str() - ); - output.push_str(format!("let streams_array = typed_stream_vec({streams_string});").as_str()); - } else { - output.push_str(format!("let streams_array: Vec>>> = {streams_string};").as_str()); - } - - output.push_str("let mut all_streams = ::marigold::marigold_impl::futures::stream::select_all(streams_array);"); - - if n_returning_streams == 0 { - output.push_str("all_streams.collect::>().await;\n"); - // ^ completes the stream; vec will always have a length of 0. - } else { - output.push_str("all_streams\n"); - } - output.push_str("}\n"); - - output - } -} - -Expr: nodes::TypedExpression = { - Stream, - StreamVariableDeclaration, - StructDeclaration, - EnumDeclaration, - FnDeclaration -} - -/// Nonsense nonterminal used to handle terminal ambiguity. Used for e.g. variable names. -FreeText: String = { - <\w\-]+"> => text.to_string() -} - -BracedText: String = { - => text.to_string() -} - -// nonsense struct used to handle terminal ambiguity. Allowed: variable name, -// or quoted string (string literal like: "hello"). -QuotedFreeText: String = { - => quoted_text.to_string(), - => variable_name.to_string() -} - -StructDeclaration: nodes::TypedExpression = { - "struct" => - nodes::TypedExpression::from( - crate::nodes::StructDeclarationNode { - name: struct_name, - fields: { - lazy_static! { - static ref WHITESPACE: Regex = - Regex::new(r#"[\s]+"#).unwrap(); - - static ref FIELD_DECLARATION: Regex = - Regex::new(r#"([\S]+)[\s]*:[\s]*(.*)"#).unwrap(); - } - - let cleaned = WHITESPACE - .replace_all(&field_declarations, " "); - - cleaned[1..cleaned.len() - 1] // remove surrounding braces - .split(",") - .filter_map(|t| FIELD_DECLARATION - .captures(t) - .map(|c| ( - c[1].to_string(), - crate::nodes::Type::from_str(&c[2]) - .expect("could not parse type in struct definition") - )) - ) - .collect::>() - } - } - ) -} - -StructFieldDeclaration: (String, String) = { - ":" => { - (field_name, field_value) - } -} - -EnumDeclaration: nodes::TypedExpression = { - "enum" => nodes::parse_enum(enum_name, enum_contents), -} - -EnumFieldDeclaration: (String, Option) = { - "=" => { - (field_name, Some(field_value)) - }, - => { - (field_name, None) - } -} - -FnParameter: (String, String) = { - ":" => - (parameter_name, - match amp { - Some(_) => format!("&{}", parameter_type), - None => parameter_type - } - ) -} - -FnSignature: nodes::FunctionSignature = { - "fn" "(" - - - ")" "->" => nodes::FunctionSignature { - name: name, - parameters: { - let mut cleaned_parameters = parameters - .iter() - .map(|(param, _comma_literal)| param.clone()) - .collect::>(); - match maybe_trailing_parameter { - Some(p) => cleaned_parameters.push(p), - None => (), - } - cleaned_parameters - }, - output_type: output_type.into_iter().map( - |typ| { - lazy_static! { - static ref STRING: Regex = Regex::new(r"string_([0-9_A-Za-z]+)").unwrap(); - } - if let Some(string_def) = STRING.captures(&typ) { - let size_str = string_def - .get(1) - .expect("Could not find size definition for string field"); - let size = u32::from_str(size_str.as_str()) - .expect("Could not parse string size in struct. Must be parsable as U32."); - return format!("::marigold::marigold_impl::arrayvec::ArrayString<{size}>"); - } - typ - } - ) - .collect::>() - .join(" ") - } -} - -FnDeclaration: nodes::TypedExpression = { - => nodes::TypedExpression::from( - nodes::FnDeclarationNode { - name: signature.name.clone(), - parameters: signature.parameters.clone(), - output_type: signature.output_type.clone(), - body: body.to_string() - } - ) -} - -Stream: nodes::TypedExpression = { - )*> "." => - nodes::TypedExpression::from( - nodes::UnnamedStreamNode{ - inp_and_funs: nodes::InputAndMaybeStreamFunctions { - inp, - funs, - }, - out: out - } - ), - )*> "." => - nodes::TypedExpression::from( - nodes::NamedStreamNode { - stream_variable, - funs, - out - } - ) -} - -StreamVariableDeclaration: nodes::TypedExpression = { - "=" )*> => - nodes::TypedExpression::from( - nodes::StreamVariableNode { - variable_name: field_name, - inp: inp, - funs: funs - } - ), - "=" )*> => - nodes::TypedExpression::from( - nodes::StreamVariableFromPriorStreamVariableNode { - variable_name: field_name, - prior_stream_variable: stream_variable, - funs: funs - } - ) -} - -InputFunction: nodes::InputFunctionNode = { - "range(" "," ")" => nodes::InputFunctionNode { - variability: nodes::InputVariability::Constant, - input_count: nodes::InputCount::Known((n2.parse::().expect("could not parse input as integer") - n1.parse::().expect("could not parse input as integer")).to_biguint().unwrap()), - code: format!("::marigold::marigold_impl::futures::stream::iter({n1}..{n2})"), - }, - "read_file(" "," "csv" "," "struct" "=" ")" => nodes::InputFunctionNode { - variability: nodes::InputVariability::Variable, - input_count: nodes::InputCount::Unknown, - code: { - match path[1..path.len() - 1].rsplit('.').next() { - Some(postfix) => match postfix { - "gz" => format!(" - ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader( - ::marigold::marigold_impl::async_compression::tokio::bufread::GzipDecoder::new( - ::marigold::marigold_impl::tokio::io::BufReader::new( - ::marigold::marigold_impl::tokio::fs::File::open({path}) - .await - .expect(\"Marigold could not open file\") - ) - ).compat() - ).into_deserialize::<{deserialization_struct}>() - "), - postfix => format!(" - ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader( - ::marigold::marigold_impl::tokio::fs::File::open({path}) - .await - .expect(\"Marigold could not open file\") - .compat() - ).into_deserialize::<{deserialization_struct}>() - ") - }, - None => format!(" - ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader( - ::marigold::marigold_impl::tokio::fs::File::open({path}) - .await - .expect(\"Marigold could not open file\") - .compat() - ).into_deserialize::<{deserialization_struct}>() - ") - } - } - }, - "read_file(" "," "csv" "," "struct" "=" "," "infer_compression" "=" "false" ")" => nodes::InputFunctionNode { - variability: nodes::InputVariability::Variable, - input_count: nodes::InputCount::Unknown, - code: format!(" - ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader( - ::marigold::marigold_impl::tokio::fs::File::open({path}) - .await - .expect(\"Marigold could not open file\") - .compat() - ).into_deserialize::<{deserialization_struct}>() - ") - }, - "select_all" "(" ")" => { - let streams = { - let mut streams = selected_streams - .into_iter() - .map(|(stream, _string_literal)| stream) - .collect::>(); - if let Some(trailing_stream) = last_selected_stream { - streams.push(trailing_stream); - } - streams - }; - - - nodes::InputFunctionNode { - variability: type_aggregation::aggregate_input_variability(streams.iter().map(|s| s.inp.variability.clone())), - input_count: type_aggregation::aggregate_input_count(streams.iter().map(|s| s.inp.input_count.clone())), - code: { - let stream_code = streams - .iter() - .map(|stream| { - let code = stream.code(); - format!("::marigold::marigold_impl::run_stream::run_stream({code})") - }) - .collect::>() - .join(",\n"); - format!("::marigold::marigold_impl::futures::prelude::stream::select_all::select_all([{stream_code}])") - } - } - } -} - -InputAndMaybeStreamFunctions: nodes::InputAndMaybeStreamFunctions = { - )*> => nodes::InputAndMaybeStreamFunctions { - inp: inp, - funs: funs - } -} - -StreamFunction: nodes::StreamFunctionNode = { - "permutations(" ")" => nodes::StreamFunctionNode { - code: format!("permutations({n}).await"), - }, - "permutations_with_replacement(" ")" => nodes::StreamFunctionNode { - code: format!("collect_and_apply(|values| async {{ - ::marigold::marigold_impl::gen_nested_iter_yield::nested_iter_yield!(values.iter(), {n}, .to_owned(), ::marigold::marigold_impl::) - }}) - .await - .await - "), - }, - "combinations(" ")" => nodes::StreamFunctionNode { - code: format!("combinations({n}).await"), - }, - "keep_first_n(" "," ")" => nodes::StreamFunctionNode { - code: format!("keep_first_n({n}, {value_fn}).await"), - }, - "filter(" ")" => { - #[cfg(not(any(feature = "tokio", feature = "async-std")))] - return nodes::StreamFunctionNode { - code: format!("filter(|v| ::marigold::marigold_impl::futures::future::ready({filter_fn}(v)))"), - // todo: filters have a bad type that doesn't allow them to compile if passed - // an actual async function, so wrap a sync function in a fake future until - // the filter types are updated. - }; - - #[cfg(any(feature = "tokio", feature = "async-std"))] - return nodes::StreamFunctionNode { - code: format!("map(|v| async move {{ - if {filter_fn}(&v) {{ - Some(v) - }} - None - }}) - .buffered( - std::cmp::max( - 2 * (::marigold::marigold_impl::num_cpus::get() - 1), - 2 - ) - ) - .filter_map(|v| v)"), - }; - }, - "filter_map(" ")" => { - #[cfg(not(any(feature = "tokio", feature = "async-std")))] - return nodes::StreamFunctionNode { - code: format!("filter_map(|v| ::marigold::marigold_impl::futures::future::ready({filter_map_fn}(v)))") - }; - - #[cfg(any(feature = "tokio", feature = "async-std"))] - return nodes::StreamFunctionNode { - code: format!(" - map(|v| ::marigold::marigold_impl::futures::future::ready({filter_map_fn}(v))) - .buffered( - std::cmp::max( - 2 * (::marigold::marigold_impl::num_cpus::get() - 1), - 2 - ) - ) - .filter_map(|v| v) - ") - }; - }, - "map(" ")" => { - #[cfg(any(feature = "tokio", feature = "async-std"))] - return nodes::StreamFunctionNode { - code: format!("map(|v| async move {{{mapping_fn}(v)}}).buffered(std::cmp::max(2 * (::marigold::marigold_impl::num_cpus::get() - 1), 2))"), - }; - - #[cfg(not(any(feature = "tokio", feature = "async-std")))] - return nodes::StreamFunctionNode { - code: format!("map({mapping_fn})"), - }; - }, - "fold(" "," ")" => { - - let number_or_constructor = match state.trim().parse::() { - Ok(_) => state, - Err(_) => format!("{state}()") - }; - - nodes::StreamFunctionNode { - code: format!("marifold({number_or_constructor}, |acc, x| futures::future::ready({fun}(acc, x))).await"), - } - }, - "ok()" => nodes::StreamFunctionNode { - code: format!("filter(|r| futures::future::ready(r.is_ok())) - .map(|r| r.unwrap())"), - }, - "ok_or_panic()" => nodes::StreamFunctionNode { - code: "map(|r| r.unwrap())".to_string(), - } -} - -OutputFunction: nodes::OutputFunctionNode = { - "return" => nodes::OutputFunctionNode { - stream_prefix: "".to_string(), - stream_postfix: "".to_string(), - returning: true - }, - "write_file(" "," "csv" ")" => { - if path.ends_with(".gz\"") { - return nodes::OutputFunctionNode { - stream_prefix: format!("{{ - if let Some(parent) = ::std::path::Path::new({path}).parent() {{ - ::marigold::marigold_impl::tokio::fs::create_dir_all(parent) - .await - .expect(\"could not create parent directory for output file\"); - }} - - static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell< - ::marigold::marigold_impl::tokio::sync::Mutex< - ::marigold::marigold_impl::csv_async::AsyncSerializer< - ::marigold::marigold_impl::tokio_util::compat::Compat< - ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder< - ::marigold::marigold_impl::writer::Writer - > - > - > - > - > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new(); - - WRITER.set( - ::marigold::marigold_impl::tokio::sync::Mutex::new( - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new( - ::marigold::marigold_impl::writer::Writer::file( - ::marigold::marigold_impl::tokio::fs::File::create({path}) - .await - .expect(\"Could not write to file\") - ) - ) - .compat_write() - ) - ) - ).expect(\"Could not put CSV writer into OnceCell\"); - - let mut stream_to_write = - - "), - stream_postfix: " - ; - stream_to_write.filter_map( - |v| async move { - WRITER - .get() - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await - .serialize(v) - .await - .expect(\"could not write record to CSV\"); - None - } - ).chain( - // after the stream is complete, flush the writer. - ::marigold::marigold_impl::futures::stream::iter(0..1) - .filter_map(|_v| async { - let mut serializer_guard = WRITER - .get() // gets Mutex<...> - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await; - let mut serializer = std::mem::replace( - &mut *serializer_guard, - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new( - ::marigold::marigold_impl::writer::Writer::vector() - ) - .compat_write() - ) - ); - serializer - .into_inner() - .await - .expect(\"Could not get underlying writer from serializer\") - .get_mut() - .shutdown() - .await - .expect(\"Could not shut down underlying writer\"); - None - }) - ) - }".to_string(), - returning: false - }; - } else { - return nodes::OutputFunctionNode { - stream_prefix: format!("{{ - if let Some(parent) = ::std::path::Path::new({path}).parent() {{ - ::marigold::marigold_impl::tokio::fs::create_dir_all(parent) - .await - .expect(\"could not create parent directory for output file\"); - }} - - static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell< - ::marigold::marigold_impl::tokio::sync::Mutex< - ::marigold::marigold_impl::csv_async::AsyncSerializer< - ::marigold::marigold_impl::tokio_util::compat::Compat< - ::marigold::marigold_impl::writer::Writer - > - > - > - > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new(); - - WRITER.set( - ::marigold::marigold_impl::tokio::sync::Mutex::new( - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::writer::Writer::file( - ::marigold::marigold_impl::tokio::fs::File::create({path}) - .await - .expect(\"Could not write to file\") - ) - .compat_write() - ) - ) - ).expect(\"Could not put CSV writer into OnceCell\"); - - let mut stream_to_write = - - "), - stream_postfix: " - ; - stream_to_write.filter_map( - |v| async move { - WRITER - .get() - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await - .serialize(v) - .await - .expect(\"could not write record to CSV\"); - None - } - ).chain( - // after the stream is complete, flush the writer. - ::marigold::marigold_impl::futures::stream::iter(0..1) - .filter_map(|_v| async { - let mut serializer_guard = WRITER - .get() // gets Mutex<...> - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await; - let mut serializer = std::mem::replace( - &mut *serializer_guard, - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::writer::Writer::vector() - .compat_write() - ) - ); - serializer - .into_inner() - .await - .expect(\"Could not get underlying writer from serializer\") - .get_mut() - .shutdown() - .await - .expect(\"Could not shut down underlying writer\"); - None - }) - ) - }".to_string(), - returning: false - } - }; - }, - "write_file(" "," "csv" "," "compression" "=" "none" ")" => nodes::OutputFunctionNode { - stream_prefix: format!("{{ - if let Some(parent) = ::std::path::Path::new({path}).parent() {{ - ::marigold::marigold_impl::tokio::fs::create_dir_all(parent) - .await - .expect(\"could not create parent directory for output file\"); - }} - - static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell< - ::marigold::marigold_impl::tokio::sync::Mutex< - ::marigold::marigold_impl::csv_async::AsyncSerializer< - ::marigold::marigold_impl::tokio_util::compat::Compat< - ::marigold::marigold_impl::writer::Writer - > - > - > - > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new(); - - WRITER.set( - ::marigold::marigold_impl::tokio::sync::Mutex::new( - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::writer::Writer::file( - ::marigold::marigold_impl::tokio::fs::File::create({path}) - .await - .expect(\"Could not write to file\") - ).compat_write() - ) - ) - ).expect(\"Could not put CSV writer into OnceCell\"); - - let mut stream_to_write = - - "), - stream_postfix: " - ; - stream_to_write.filter_map( - |v| async move { - WRITER - .get() - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await - .serialize(v) - .await - .expect(\"could not write record to CSV\"); - None - } - ).chain( - // after the stream is complete, flush the writer. - ::marigold::marigold_impl::futures::stream::iter(0..1) - .filter_map(|_v| async { - let mut serializer_guard = WRITER - .get() // gets Mutex<...> - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await; - let mut serializer = std::mem::replace( - &mut *serializer_guard, - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::writer::Writer::vector() - .compat_write() - ) - ); - serializer - .into_inner() - .await - .expect(\"Could not get underlying writer from serializer\") - .get_mut() - .shutdown() - .await - .expect(\"Could not shut down underlying writer\"); - None - }) - ) - }".to_string(), - returning: false, - }, - "write_file(" "," "csv" "," "compression" "=" "gz" ")" => nodes::OutputFunctionNode { - stream_prefix: format!("{{ - if let Some(parent) = ::std::path::Path::new({path}).parent() {{ - ::marigold::marigold_impl::tokio::fs::create_dir_all(parent) - .await - .expect(\"could not create parent directory for output file\"); - }} - - static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell< - ::marigold::marigold_impl::tokio::sync::Mutex< - ::marigold::marigold_impl::csv_async::AsyncSerializer< - ::marigold::marigold_impl::tokio_util::compat::Compat< - ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder< - ::marigold::marigold_impl::writer::Writer - > - > - > - > - > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new(); - - WRITER.set( - ::marigold::marigold_impl::tokio::sync::Mutex::new( - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new( - ::marigold::marigold_impl::writer::Writer::file( - ::marigold::marigold_impl::tokio::fs::File::create({path}) - .await - .expect(\"Could not write to file\") - ) - ) - .compat_write() - ) - ) - ).expect(\"Could not put CSV writer into OnceCell\"); - - let mut stream_to_write = - - "), - stream_postfix: " - ; - stream_to_write.filter_map( - |v| async move { - WRITER - .get() - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await - .serialize(v) - .await - .expect(\"could not write record to CSV\"); - None - } - ).chain( - // after the stream is complete, flush the writer. - ::marigold::marigold_impl::futures::stream::iter(0..1) - .filter_map(|_v| async { - let mut serializer_guard = WRITER - .get() // gets Mutex<...> - .expect(\"Could not get CSV writer from OnceCell\") - .lock() - .await; - let mut serializer = std::mem::replace( - &mut *serializer_guard, - ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer( - ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new( - ::marigold::marigold_impl::writer::Writer::vector() - ) - .compat_write() - ) - ); - serializer - .into_inner() - .await - .expect(\"Could not get underlying writer from serializer\") - .get_mut() - .shutdown() - .await - .expect(\"Could not shut down underlying writer\"); - None - }) - ) - }".to_string(), - returning: false, - } -} diff --git a/marigold-grammar/src/ast.rs b/marigold-grammar/src/ast.rs new file mode 100644 index 0000000..46b7401 --- /dev/null +++ b/marigold-grammar/src/ast.rs @@ -0,0 +1,139 @@ +use core::fmt::Debug; +use core::hash::Hash; +use std::{collections::HashSet, ops::Deref}; + +#[derive(Debug)] +pub struct MarigoldProgram { + pub streams: Vec, +} + +impl PartialEq for MarigoldProgram { + /// unordered comparison of streams + fn eq(&self, other: &Self) -> bool { + self.streams.iter().collect::>() == other.streams.iter().collect::>() + } +} + +impl Eq for MarigoldProgram {} + +pub struct Stream { + pub input: StreamInput, + pub transformations: Vec>, + pub output: StreamOutput, +} + +impl PartialEq for Stream { + fn eq(&self, other: &Self) -> bool { + self.input == other.input + && self.transformations == other.transformations + && self.output == other.output + } +} + +impl Hash for Stream { + fn hash(&self, state: &mut H) { + self.input.hash(state); + self.transformations.hash(state); + self.output.hash(state); + } +} + +impl Debug for Stream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Stream") + .field("input", &self.input) + .field("transformations", &self.transformations) + .field("output", &self.output) + .finish() + } +} + +impl PartialEq for dyn StreamTransformation { + fn eq(&self, other: &Self) -> bool { + format!("{:?}", self) == format!("{:?}", other) // hack to handle trait cycle with Box + } +} + +impl Hash for dyn StreamTransformation { + fn hash(&self, state: &mut H) { + format!("{:?}", self).hash(state) // hack to handle trait cycle with Box + } +} + +impl Eq for Stream {} + +#[derive(Debug)] +pub struct StreamInput { + pub format: DataStreamFormat, + pub source: Box, + pub type_ident: Option, +} + +impl PartialEq for StreamInput { + fn eq(&self, other: &Self) -> bool { + self.format == other.format + && *self.source == *other.source + && self.type_ident == other.type_ident + } +} + +impl Hash for StreamInput { + fn hash(&self, state: &mut H) { + self.format.hash(state); + self.source.hash(state); + self.type_ident.hash(state); + } +} + +impl PartialEq for dyn TransportOrStorageSite { + fn eq(&self, other: &Self) -> bool { + format!("{:?}", self) == format!("{:?}", other) // hack to handle trait cycle with Box + } +} + +impl Hash for dyn TransportOrStorageSite { + fn hash(&self, state: &mut H) { + format!("{:?}", self).hash(state) // hack to handle trait cycle with Box + } +} + +#[derive(Debug)] +pub struct StreamOutput { + pub format: DataStreamFormat, + pub target: Box, +} + +impl PartialEq for StreamOutput { + fn eq(&self, other: &Self) -> bool { + self.format == other.format && *self.target == *other.target + } +} + +impl Hash for StreamOutput { + fn hash(&self, state: &mut H) { + self.format.hash(state); + self.target.deref().hash(state); + } +} + +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub enum DataStreamFormat { + CSV, + INFER, +} + +pub trait TransportOrStorageSite: Debug {} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct File { + pub path: String, +} + +impl TransportOrStorageSite for File {} + +pub trait StreamTransformation: Debug {} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct OkOrPanic {} + +impl StreamTransformation for OkOrPanic {} diff --git a/marigold-grammar/src/lib.rs b/marigold-grammar/src/lib.rs index ae9f5cb..dd2cd26 100644 --- a/marigold-grammar/src/lib.rs +++ b/marigold-grammar/src/lib.rs @@ -1,37 +1,320 @@ #![forbid(unsafe_code)] -#[macro_use] -extern crate lalrpop_util; +use std::collections::HashMap; -#[macro_use] -extern crate lazy_static; - -use lalrpop_util::ParseError; -extern crate proc_macro; -use crate::ast::Token; +use itertools::Itertools; +use num_traits::One; +use pest::Parser; +use pest_derive::Parser; +use thiserror::Error; pub use itertools; +pub mod ast; + +#[derive(Error, Debug)] +pub enum GrammarError { + #[error("Parse error")] + ParseError(#[from] pest::error::Error), +} + +pub fn marigold_parse<'a>(s: &'a str) -> Result { + let parsed = PARSER::parse(Rule::program, s.trim())?.next().unwrap(); + parse_program(parsed) +} + +fn parse_program( + program: pest::iterators::Pair, +) -> Result { + assert_eq!(program.as_rule(), Rule::program); + + let pairs = program.into_inner().into_iter(); + let mut streams = Vec::with_capacity(pairs.len()); + for pair in pairs { + match pair.as_rule() { + Rule::stream => streams.push(parse_stream(pair)?), + _ => unimplemented!(), + } + } + + Ok(ast::MarigoldProgram { streams }) +} + +fn parse_stream(stream: pest::iterators::Pair) -> Result { + assert_eq!(stream.as_rule(), Rule::stream); + + let mut pairs = stream.into_inner().into_iter(); + let n_transformations = pairs.len() - 2; + + let input = parse_input(pairs.next().unwrap())?; + + let mut transformations = Vec::with_capacity(n_transformations); + for _ in 0..n_transformations { + transformations.push(parse_transformation(pairs.next().unwrap())?); + } + + let output = parse_output(pairs.next().unwrap())?; + + Ok(ast::Stream { + input, + transformations, + output, + }) +} + +fn parse_input(input: pest::iterators::Pair) -> Result { + assert_eq!(input.as_rule(), Rule::input); -pub mod nodes; -mod type_aggregation; + let specific_input = input.into_inner().next().unwrap(); + match specific_input.as_rule() { + Rule::read_file => parse_read_file(specific_input), + _ => unimplemented!("no parser for input type {:?}", specific_input.as_rule()), + } +} + +fn parse_read_file( + specific_input: pest::iterators::Pair, +) -> Result { + assert_eq!(specific_input.as_rule(), Rule::read_file); + let specific_input_rule: Rule = specific_input.as_rule(); + let fields = specific_input.into_inner(); + let mut params = HashMap::with_capacity(fields.len()); + fields.into_iter().for_each(|field| { + let field_rule = field.as_rule(); + params.insert(field.as_rule(), field).map(|_| { + unimplemented!( + "duplicate entry for rule {:?} while parsing {:?}", + field_rule, + specific_input_rule + ) + }); + }); -lalrpop_mod!(#[allow(clippy::all)] pub ast); + // generate stream input + let out = ast::StreamInput { + format: { + if params.contains_key(&Rule::input_format) { + let specific_format = params + .remove(&Rule::input_format) + .unwrap() + .into_inner() + .into_iter() + .next() + .unwrap() + .into_inner() + .into_iter() + .next() + .unwrap() + .as_rule(); + match specific_format { + Rule::csv_data_stream_format => ast::DataStreamFormat::CSV, + _ => unimplemented!("unknown data stream format {:?}", specific_format), + } + } else { + ast::DataStreamFormat::INFER + } + }, + source: Box::new(ast::File { + path: params + .remove(&Rule::file_path) + .expect("file_path not found") + .into_inner() + .map(|character| character.as_str().chars().collect_vec()) + .into_iter() + .flatten() + .collect(), + }), + type_ident: params.remove(&Rule::input_struct).map(|input_struct| { + input_struct + .into_inner() + .into_iter() + .map(|character| character.as_str().chars().collect_vec()) + .flatten() + .collect() + }), + }; + + // check we didn't miss any parsed rules + if !params.is_empty() { + let unhandled_rules = params.keys().collect_vec(); + if unhandled_rules.len().is_one() { + unimplemented!( + "rule not accounted for while parsing {:?}: {:?}", + specific_input_rule, + unhandled_rules.get(0).unwrap() + ) + } + unimplemented!( + "{} rules not accounted for while parsing {:?}: {:?}", + params.len(), + specific_input_rule, + unhandled_rules + ) + } -lazy_static! { - static ref PARSER: ast::ProgramParser = ast::ProgramParser::new(); + // if all rules were processed, return + Ok(out) } -pub fn marigold_parse<'a>( - s: &'a str, -) -> Result, &'static str>> { - PARSER.parse(s) +fn parse_output(output: pest::iterators::Pair) -> Result { + assert_eq!(output.as_rule(), Rule::output); + let specific_output = output.into_inner().next().unwrap(); + match specific_output.as_rule() { + Rule::write_file => parse_write_file(specific_output), + _ => unimplemented!("no parser for input type {:?}", specific_output.as_rule()), + } } +fn parse_write_file( + specific_output: pest::iterators::Pair, +) -> Result { + assert_eq!(specific_output.as_rule(), Rule::write_file); + let specific_output_rule: Rule = specific_output.as_rule(); + let fields = specific_output.into_inner(); + let mut params = HashMap::with_capacity(fields.len()); + fields.into_iter().for_each(|field| { + let field_rule = field.as_rule(); + params.insert(field.as_rule(), field).map(|_| { + unimplemented!( + "duplicate entry for rule {:?} while parsing {:?}", + field_rule, + specific_output_rule + ) + }); + }); + + let output_format = { + let output_rule = params + .remove(&Rule::output_format) + .expect("output format is mandatory") + .into_inner() + .into_iter() + .next() + .unwrap() + .into_inner() + .into_iter() + .next() + .unwrap() + .as_rule(); + match output_rule { + Rule::csv_data_stream_format => ast::DataStreamFormat::CSV, + _ => unimplemented!( + "unimplemented format for {:?}: {:?}", + specific_output_rule, + output_rule + ), + } + }; + + let target = Box::new(ast::File { + path: params + .remove(&Rule::file_path) + .expect("file_path not found") + .into_inner() + .map(|character| character.as_str().chars().collect_vec()) + .into_iter() + .flatten() + .collect(), + }); + + // check we didn't miss any parsed rules + if !params.is_empty() { + let unhandled_rules = params.keys().collect_vec(); + if unhandled_rules.len().is_one() { + unimplemented!( + "rule not accounted for while parsing {:?}: {:?}", + specific_output_rule, + unhandled_rules.get(0).unwrap() + ) + } + unimplemented!( + "{} rules not accounted for while parsing {:?}: {:?}", + params.len(), + specific_output_rule, + unhandled_rules + ) + } + + Ok(ast::StreamOutput { + format: output_format, + target, + }) +} + +fn parse_transformation( + transformation: pest::iterators::Pair, +) -> Result, GrammarError> { + assert_eq!(transformation.as_rule(), Rule::transformation); + let specific_transformation = transformation.into_inner().next().unwrap(); + match specific_transformation.as_rule() { + Rule::ok_or_panic => Ok(Box::new(ast::OkOrPanic {})), + _ => unimplemented!( + "no parser for input type {:?}", + specific_transformation.as_rule() + ), + } +} + +#[derive(Parser)] +#[grammar = "marigold.pest"] +pub struct PARSER; + #[cfg(test)] mod tests { + use super::*; + use ast::*; + #[test] - fn it_works() { - let result = 2 + 2; - assert_eq!(result, 4); + fn csv_in_out() { + let parsed = marigold_parse( + r#" + read_file("./woof.csv", csv, struct=woof) + .ok_or_panic() + .write_file("miaow.csv", csv) + + read_file("poof.csv") + .write_file("doof.csv", csv) + "#, + ) + .unwrap(); + + assert_eq!( + parsed, + MarigoldProgram { + streams: vec![ + Stream { + input: StreamInput { + format: DataStreamFormat::CSV, + source: Box::new(File { + path: "./woof.csv".to_string() + }), + type_ident: Option::Some("woof".to_string()) + }, + transformations: vec![Box::new(OkOrPanic {})], + output: StreamOutput { + format: DataStreamFormat::CSV, + target: Box::new(File { + path: "miaow.csv".to_string() + }), + } + }, + Stream { + input: StreamInput { + format: DataStreamFormat::INFER, + source: Box::new(File { + path: "poof.csv".to_string() + }), + type_ident: None + }, + transformations: Vec::new(), + output: StreamOutput { + format: DataStreamFormat::CSV, + target: Box::new(File { + path: "doof.csv".to_string() + }), + } + }, + ] + } + ) } } diff --git a/marigold-grammar/src/marigold.pest b/marigold-grammar/src/marigold.pest new file mode 100644 index 0000000..a0946ed --- /dev/null +++ b/marigold-grammar/src/marigold.pest @@ -0,0 +1,31 @@ +// shared fields +valid_ident = { (ASCII_ALPHA | "_") ~ (ASCII_ALPHANUMERIC | "_")+ } +valid_file_path_character = {(ASCII_ALPHANUMERIC | "~" | "." | "_" | "/" | "-" | "\\" )} +file_path = ${ "\"" ~ (valid_file_path_character)+ ~ "\""} +csv_data_stream_format = {"csv"} +data_stream_format = { csv_data_stream_format } + +// define stream inputs +input_format = { data_stream_format } +input_struct = { valid_ident } +read_file = { "read_file" ~ "(" ~ file_path ~ ("," ~ input_format)? ~ ("," ~ "struct" ~ "=" ~ input_struct )? ~ ")" } +input = { read_file } + +// define stream transformations +ok_or_panic = { "ok_or_panic()" } +transformation = { ok_or_panic } + +// define stream outputs +output_format = { data_stream_format } +write_file = { "write_file" ~ "(" ~ file_path ~ "," ~ output_format ~ ")" } +output = { write_file } + +// define stream +stream = { input ~ ("." ~ transformation)* ~ "." ~output } + +// TODO: stream variables (impl: use `r#`-prefixed variable to avoid collisions with Rust reserved names) + +// programs are composed of streams +program = {stream+} + +WHITESPACE = _{ " " | "\t" | "\r" | "\n" } \ No newline at end of file