Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Jan 21, 2025
0 parents commit c382d4f
Show file tree
Hide file tree
Showing 21 changed files with 579 additions and 0 deletions.
10 changes: 10 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#
# git add --renormalize .
#
* text=auto eol=lf
*.{cmd,[cC][mM][dD]} text eol=crlf
*.{bat,[bB][aA][tT]} text eol=crlf
#
# LFS can be used as well
#
# *.zip filter=lfs diff=lfs merge=lfs -text
25 changes: 25 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates

version: 2
updates:
- package-ecosystem: "cargo" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"
# group of dependencies to be updated together
groups:
# all datafusion deps should be updated together
datafusion-dependencies:
# A pattern can be...
patterns:
- "datafusion*"
- "datafusion-*"
- "ballista*"
- "ballista-*"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
54 changes: 54 additions & 0 deletions .github/workflows/basic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Default Features

on:
push:
branches: [master, ci_*]
paths-ignore:
- "**.md"
- "**.yaml"
pull_request:
branches: [master]
paths-ignore:
- "**.md"
- "**.yaml"
workflow_dispatch:

env:
CARGO_TERM_COLOR: always
# this one speeds up builds, they say
CARGO_INCREMENTAL: 0

jobs:
build:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable
components: rustfmt
# make cargo format to fail first
# before bringing other things, and spending time
# compiling
- name: Cargo Format Check
run: cargo fmt --all --check --
- name: Update Packages (apt-cache cache)
run: sudo apt-get update
- name: Install Required Libraries (using apt-get)
run: sudo apt-get install -y protobuf-compiler
# we use cache if format is correct
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Update gitconfig (Access to private repo)
run: git config --global url."https://${{ secrets.GIT_CREDENTIALS }}@github.com".insteadOf ssh://git@github.com
# - name: Cargo Compile (with tests)
# # should be faster than compiling tests again in
# # test phase
# # run: cargo test --no-run --locked --all-features
# run: cargo test --no-run --all-features
- name: Cargo Run Tests
run: cargo test -- --nocapture --quiet
# - name: Cargo Run Tests (Client Executor Feature Enabled)
# run: cargo test --features client_executor -- --nocapture --quiet
55 changes: 55 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
## macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride

# Icon must end with two \r
Icon

# Thumbnails
._*

# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk

### Rust ###
# Generated by Cargo
# will have compiled files and executables
target/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk

### VisualStudioCode ###
.vscode/*
.devcontainer
*.code-workspace


.idea
**/*.iml

secrets_plain.yaml
**/*secrets_plain.yaml

node_modules
.aws-sam
.vscode
20 changes: 20 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "ballista_delta"
version = "0.1.0"
edition = "2021"

[dependencies]

ballista = "43"
ballista-core = { version = "43", default-features = false }
ballista-executor = { version = "43", default-features = false }
ballista-scheduler = { version = "43", default-features = false }
datafusion = { version = "43" }
datafusion-proto = { version = "43" }
deltalake = { version = "0.22.0", features = ["datafusion"] }
log = "0.4"
tokio = { version = "1", features = ["full"] }
url = "2"

[dev-dependencies]
env_logger = "0.11"
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Datafusion Ballista Support for Delta.RS

Since version `43.0.0` [datafusion ballista](/~https://github.com/apache/datafusion-ballista) extending core components and functionalities.

This example demonstrate extending [datafusion ballista](/~https://github.com/apache/datafusion-ballista) to support [delta.rs](https://delta-io.github.io/delta-rs/) read operations.

> [!IMPORTANT]
> This is just a showcase project and it is not meant to be maintained.
Configuring [standalone ballista](examples/standalone.rs):

```rust
use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_delta::{BallistaDeltaLogicalCodec, BallistaDeltaPhysicalCodec};
use datafusion::{
common::Result,
execution::SessionStateBuilder,
prelude::{SessionConfig, SessionContext},
};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let config = SessionConfig::new_with_ballista()
.with_ballista_logical_extension_codec(Arc::new(BallistaDeltaLogicalCodec::default()))
.with_ballista_physical_extension_codec(Arc::new(BallistaDeltaPhysicalCodec::default()));

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let table = deltalake::open_table("./data/people_countries_delta_dask")
.await
.unwrap();

let ctx = SessionContext::standalone_with_state(state).await?;

ctx.register_table("demo", Arc::new(table)).unwrap();

ctx.sql("select * from demo").await?.show().await?;


Ok(())
}
```

other examples show extending [client](examples/cluster_client.rs), [scheduler](examples/cluster_scheduler.rs) and [executor](examples/cluster_executor.rs)
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"commitInfo":{"timestamp":1706278148531,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[\"country\"]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"3","numOutputRows":"5","numOutputBytes":"3045"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"1cbc9537-63eb-4799-8647-2d947ae8fa41"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"1f110132-a652-4be9-815e-348f294515cf","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"country\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"continent\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["country"],"configuration":{},"createdTime":1706278146762}}
{"add":{"path":"country=Argentina/part-00000-8d0390a3-f797-4265-b9c2-da1c941680a3.c000.snappy.parquet","partitionValues":{"country":"Argentina"},"size":1018,"modificationTime":1706278148083,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"first_name\":\"Ernesto\",\"last_name\":\"Guevara\",\"continent\":\"NaN\"},\"maxValues\":{\"first_name\":\"Ernesto\",\"last_name\":\"Guevara\",\"continent\":\"NaN\"},\"nullCount\":{\"first_name\":0,\"last_name\":0,\"continent\":0}}"}}
{"add":{"path":"country=China/part-00000-88fba1af-b28d-4303-9c85-9a97be631d40.c000.snappy.parquet","partitionValues":{"country":"China"},"size":1002,"modificationTime":1706278148138,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"first_name\":\"Bruce\",\"last_name\":\"Lee\",\"continent\":\"Asia\"},\"maxValues\":{\"first_name\":\"Jack\",\"last_name\":\"Ma\",\"continent\":\"Asia\"},\"nullCount\":{\"first_name\":0,\"last_name\":0,\"continent\":0}}"}}
{"add":{"path":"country=Germany/part-00000-030076e1-5ec9-47c2-830a-1569f823b6ee.c000.snappy.parquet","partitionValues":{"country":"Germany"},"size":1025,"modificationTime":1706278148185,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"first_name\":\"Soraya\",\"last_name\":\"Jala\",\"continent\":\"NaN\"},\"maxValues\":{\"first_name\":\"Wolfgang\",\"last_name\":\"Manche\",\"continent\":\"NaN\"},\"nullCount\":{\"first_name\":0,\"last_name\":0,\"continent\":0}}"}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
40 changes: 40 additions & 0 deletions examples/cluster_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_delta::{BallistaDeltaLogicalCodec, BallistaDeltaPhysicalCodec};
use datafusion::{
common::Result,
execution::SessionStateBuilder,
prelude::{SessionConfig, SessionContext},
};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.parse_filters("ballista=debug,ballista_scheduler=debug,ballista_executor=debug")
.is_test(true)
.try_init();

let config = SessionConfig::new_with_ballista()
.with_ballista_logical_extension_codec(Arc::new(BallistaDeltaLogicalCodec::default()))
.with_ballista_physical_extension_codec(Arc::new(BallistaDeltaPhysicalCodec::default()));

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let table = deltalake::open_table("./data/people_countries_delta_dask")
.await
.unwrap();

let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

ctx.register_table("demo", Arc::new(table)).unwrap();

let df = ctx.sql("select * from demo").await?;

df.show().await?;

Ok(())
}
37 changes: 37 additions & 0 deletions examples/cluster_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use ballista_delta::{BallistaDeltaLogicalCodec, BallistaDeltaPhysicalCodec, CustomObjectStoreRegistry};
use ballista_executor::executor_process::{start_executor_process, ExecutorProcessConfig};
use datafusion::{
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
prelude::SessionConfig,
};
use std::sync::Arc;
///
/// # Custom Ballista Executor
///
/// This example demonstrates how to crate custom ballista executors with support
/// for custom logical and physical codecs.
///
#[tokio::main]
async fn main() -> ballista_core::error::Result<()> {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init();

let config: ExecutorProcessConfig = ExecutorProcessConfig {
override_logical_codec: Some(Arc::new(BallistaDeltaLogicalCodec::default())),
override_physical_codec: Some(Arc::new(BallistaDeltaPhysicalCodec::default())),

override_runtime_producer: Some(Arc::new(|_: &SessionConfig| {
let runtime_config =
RuntimeConfig::new().with_object_store_registry(Arc::new(CustomObjectStoreRegistry::default()));
let runtime_env = RuntimeEnv::try_new(runtime_config)?;

Ok(Arc::new(runtime_env))
})),

..Default::default()
};

start_executor_process(Arc::new(config)).await
}
37 changes: 37 additions & 0 deletions examples/cluster_scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use ballista_core::error::BallistaError;
use ballista_delta::{BallistaDeltaLogicalCodec, BallistaDeltaPhysicalCodec};
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::SchedulerConfig;
use ballista_scheduler::scheduler_process::start_server;
use std::net::AddrParseError;
use std::sync::Arc;

///
/// # Custom Ballista Scheduler
///
/// This example demonstrates how to crate custom ballista schedulers with support
/// for custom logical and physical codecs.
///
#[tokio::main]
async fn main() -> ballista_core::error::Result<()> {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init();

let config: SchedulerConfig = SchedulerConfig {
override_logical_codec: Some(Arc::new(BallistaDeltaLogicalCodec::default())),
override_physical_codec: Some(Arc::new(BallistaDeltaPhysicalCodec::default())),
..Default::default()
};

let addr = format!("{}:{}", config.bind_host, config.bind_port);
let addr = addr
.parse()
.map_err(|e: AddrParseError| BallistaError::Configuration(e.to_string()))?;

let cluster = BallistaCluster::new_from_config(&config).await?;
start_server(cluster, addr, Arc::new(config)).await?;

Ok(())
}
40 changes: 40 additions & 0 deletions examples/standalone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_delta::{BallistaDeltaLogicalCodec, BallistaDeltaPhysicalCodec};
use datafusion::{
common::Result,
execution::SessionStateBuilder,
prelude::{SessionConfig, SessionContext},
};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.parse_filters("ballista=debug,ballista_scheduler=debug,ballista_executor=debug")
.is_test(true)
.try_init();

let config = SessionConfig::new_with_ballista()
.with_ballista_logical_extension_codec(Arc::new(BallistaDeltaLogicalCodec::default()))
.with_ballista_physical_extension_codec(Arc::new(BallistaDeltaPhysicalCodec::default()));

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let table = deltalake::open_table("./data/people_countries_delta_dask")
.await
.unwrap();

let ctx = SessionContext::standalone_with_state(state).await?;

ctx.register_table("demo", Arc::new(table)).unwrap();

let df = ctx.sql("select * from demo").await?;

df.show().await?;

Ok(())
}
3 changes: 3 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# https://rust-lang.github.io/rustfmt/?version=master&search=
max_width = 120
reorder_imports = true
Loading

0 comments on commit c382d4f

Please sign in to comment.