Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move transaction module into actions/ and rename to set_transaction #386

Merged
merged 13 commits into from
Oct 14, 2024
26 changes: 17 additions & 9 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
//! Provides parsing and manipulation of the various actions defined in the [Delta
//! specification](/~https://github.com/delta-io/delta/blob/master/PROTOCOL.md)

pub mod deletion_vector;
pub(crate) mod schemas;
pub(crate) mod visitors;

use delta_kernel_derive::Schema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand All @@ -16,23 +12,30 @@ use crate::actions::schemas::GetStructField;
use crate::features::{ReaderFeatures, WriterFeatures};
use crate::{schema::StructType, DeltaResult, EngineData};

pub mod deletion_vector;
pub mod set_transaction;

pub(crate) mod schemas;
pub(crate) mod visitors;

pub(crate) const ADD_NAME: &str = "add";
pub(crate) const REMOVE_NAME: &str = "remove";
pub(crate) const METADATA_NAME: &str = "metaData";
pub(crate) const PROTOCOL_NAME: &str = "protocol";
pub(crate) const TRANSACTION_NAME: &str = "txn";
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";

static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
StructType::new(vec![
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
Option::<Transaction>::get_struct_field(TRANSACTION_NAME),
Option::<SetTransaction>::get_struct_field(TRANSACTION_NAME),
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
// We don't support the following actions yet
//Option<Cdc>::get_field(CDC_NAME),
//Option<CommitInfo>::get_field(COMMIT_INFO_NAME),
//Option<DomainMetadata>::get_field(DOMAIN_METADATA_NAME),
//Option::<Cdc>::get_struct_field(CDC_NAME),
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
});

Expand Down Expand Up @@ -128,6 +131,11 @@ impl Protocol {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct CommitInfo {
pub kernel_version: Option<String>,
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Add {
/// A relative path to a data file from the root of the table or an absolute path to a file
Expand Down Expand Up @@ -237,7 +245,7 @@ impl Remove {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Transaction {
pub struct SetTransaction {
/// A unique identifier for the application performing the transaction.
pub app_id: String,

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use crate::actions::visitors::TransactionVisitor;
use crate::actions::{get_log_schema, Transaction, TRANSACTION_NAME};
use crate::actions::{get_log_schema, SetTransaction, TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::{DeltaResult, Engine, EngineData, SchemaRef};

Expand Down Expand Up @@ -58,7 +58,7 @@ impl TransactionScanner {
&self,
engine: &dyn Engine,
application_id: &str,
) -> DeltaResult<Option<Transaction>> {
) -> DeltaResult<Option<SetTransaction>> {
let mut transactions = self.scan_application_transactions(engine, Some(application_id))?;
Ok(transactions.remove(application_id))
}
Expand All @@ -78,7 +78,10 @@ mod tests {
use crate::Table;
use itertools::Itertools;

fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option<Transaction>) {
fn get_latest_transactions(
path: &str,
app_id: &str,
) -> (TransactionMap, Option<SetTransaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
Expand All @@ -105,7 +108,7 @@ mod tests {
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(Transaction {
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
Expand All @@ -119,7 +122,7 @@ mod tests {
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(Transaction {
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
Expand Down
13 changes: 7 additions & 6 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
};

use super::{
deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove, Transaction,
deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove,
SetTransaction,
};

#[derive(Default)]
Expand Down Expand Up @@ -230,7 +231,7 @@ impl DataVisitor for RemoveVisitor {
}
}

pub type TransactionMap = HashMap<String, Transaction>;
pub type TransactionMap = HashMap<String, SetTransaction>;

/// Extact application transaction actions from the log into a map
///
Expand Down Expand Up @@ -259,10 +260,10 @@ impl TransactionVisitor {
row_index: usize,
app_id: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Transaction> {
) -> DeltaResult<SetTransaction> {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
let version: i64 = getters[1].get(row_index, "txn.version")?;
let last_updated: Option<i64> = getters[2].get_long(row_index, "txn.lastUpdated")?;
Ok(Transaction {
Ok(SetTransaction {
app_id,
version,
last_updated,
Expand Down Expand Up @@ -490,15 +491,15 @@ mod tests {
let mut actual = txn_visitor.transactions;
assert_eq!(
actual.remove("myApp2"),
Some(Transaction {
Some(SetTransaction {
app_id: "myApp2".to_string(),
version: 4,
last_updated: Some(1670892998177),
},)
);
assert_eq!(
actual.remove("myApp"),
Some(Transaction {
Some(SetTransaction {
app_id: "myApp".to_string(),
version: 3,
last_updated: None,
Expand Down
1 change: 0 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub mod scan;
pub mod schema;
pub mod snapshot;
pub mod table;
pub mod transaction;
pub(crate) mod utils;

pub use engine_data::{DataVisitor, EngineData};
Expand Down
Loading