Skip to content

Commit

Permalink
lz4 -> lz4_flex
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Apr 28, 2024
1 parent 3887144 commit 01b819c
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 103 deletions.
37 changes: 21 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ locustdb-derive = {path = "./locustdb-derive", version = "0.2.0"}
locustdb-serialization = {path = "./locustdb-serialization", version = "0.1.0"}
log = {features = ["max_level_trace", "release_max_level_debug"], version = "0.4"}
lru = "0.12"
lz4 = {version = "1.22.0", optional = true}
lz4_flex = { version = "0.11" }
num = "0.4"
num_cpus = "1.0"
ordered-float = { version = "3", features = ["serde"] }
Expand Down Expand Up @@ -74,7 +74,6 @@ pretty_assertions = "1"

[features]
default = []
enable_lz4 = ["lz4"]
python = ["pyo3"]


Expand Down
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,6 @@ cargo run --release --bin repl -- --load test_data/nyc-taxi.csv.gz --reduced-tri
`cargo bench`
### LZ4
Compile with `--features "enable_lz4"` to enable an additional lz4 compression pass which can significantly reduce data size both on disk and in-memory, at the cost of slightly slower in-memory queries.
[nyc-taxi-trips]: https://www.dropbox.com/sh/4xm5vf1stnf7a0h/AADRRVLsqqzUNWEPzcKnGN_Pa?dl=0
[blogpost]: https://clemenswinter.com/2018/07/09/how-to-analyze-billions-of-records-per-second-on-a-single-desktop-pc/
Expand Down
1 change: 0 additions & 1 deletion src/engine/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ mod unhexpack_strings;
mod unpack_strings;
mod val_rows_pack;
mod val_rows_unpack;
#[cfg(feature = "enable_lz4")]
mod lz4_decode;
mod merge_deduplicate_partitioned;
mod partition;
Expand Down
10 changes: 0 additions & 10 deletions src/engine/operators/vector_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ pub mod operator {
]
}

#[cfg(feature = "enable_lz4")]
pub fn lz4_decode<'a>(
encoded: BufferRef<u8>,
decoded_len: usize,
Expand All @@ -525,15 +524,6 @@ pub mod operator {
}
}

#[cfg(not(feature = "enable_lz4"))]
pub fn lz4_decode<'a>(
_: BufferRef<u8>,
_: usize,
_: TypedBufferRef,
) -> Result<BoxedOperator<'a>, QueryError> {
panic!("LZ4 is not enabled in this build of LocustDB. Recompile with `features enable_lz4`")
}

pub fn unpack_strings<'a>(
packed: BufferRef<u8>,
unpacked: BufferRef<&'a str>,
Expand Down
12 changes: 4 additions & 8 deletions src/mem_store/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,13 @@ impl Column {
}

pub fn lz4_encode(&mut self) {
if cfg!(feature = "enable_lz4") {
let (encoded, worth_it) = self.data[0].lz4_encode();
if worth_it {
self.codec = self.codec.with_lz4(self.data[0].len());
self.data[0] = encoded;
}
let (encoded, worth_it) = self.data[0].lz4_encode();
if worth_it {
self.codec = self.codec.with_lz4(self.data[0].len());
self.data[0] = encoded;
}
}

#[cfg(feature = "enable_lz4")]
pub fn lz4_decode(&mut self) {
if let Some(CodecOp::LZ4(decoded_type, _)) = self.codec.ops().first().copied() {
trace!("lz4_decode before: {:?}", self);
Expand Down Expand Up @@ -322,7 +319,6 @@ impl DataSection {
}
}

#[cfg(feature = "enable_lz4")]
pub fn lz4_decode(&self, decoded_type: EncodingType, len: usize) -> DataSection {
match self {
DataSection::U8(encoded) => match decoded_type {
Expand Down
82 changes: 52 additions & 30 deletions src/mem_store/column_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use std::sync::Arc;

use ordered_float::OrderedFloat;

use crate::mem_store::integers::*;
use crate::mem_store::column::*;
use crate::mem_store::integers::*;
use crate::mem_store::strings::*;
use crate::stringpack::*;

use super::floats::FloatColumn;


pub trait ColumnBuilder<T: ?Sized>: Default {
fn push(&mut self, elem: &T);
fn finalize(self, name: &str, present: Option<Vec<u8>>) -> Arc<Column>;
Expand Down Expand Up @@ -45,12 +44,18 @@ impl<T: AsRef<str>> ColumnBuilder<T> for StringColBuilder {
}

fn finalize(self, name: &str, present: Option<Vec<u8>>) -> Arc<Column> {
fast_build_string_column(name, self.values.iter(), self.values.len(),
self.lhex, self.uhex, self.string_bytes, present)
fast_build_string_column(
name,
self.values.iter(),
self.values.len(),
self.lhex,
self.uhex,
self.string_bytes,
present,
)
}
}


pub struct IntColBuilder {
data: Vec<i64>,
min: i64,
Expand Down Expand Up @@ -91,14 +96,9 @@ impl ColumnBuilder<Option<i64>> for IntColBuilder {

fn finalize(self, name: &str, present: Option<Vec<u8>>) -> Arc<Column> {
// PERF: heuristic for deciding delta encoding could probably be improved
let delta_encode = self.allow_delta_encode &&
(self.increasing * 10 > self.data.len() as u64 * 9 && cfg!(feature = "enable_lz4"));
IntegerColumn::new_boxed(name,
self.data,
self.min,
self.max,
delta_encode,
present)
let delta_encode =
self.allow_delta_encode && (self.increasing * 10 > self.data.len() as u64 * 9);
IntegerColumn::new_boxed(name, self.data, self.min, self.max, delta_encode, present)
}
}

Expand All @@ -116,28 +116,50 @@ impl ColumnBuilder<Option<f64>> for FloatColBuilder {
}

fn finalize(self, name: &str, present: Option<Vec<u8>>) -> Arc<Column> {
FloatColumn::new_boxed(name,
self.data,
present)
FloatColumn::new_boxed(name, self.data, present)
}
}


fn is_lowercase_hex(string: &str) -> bool {
string.len() & 1 == 0 && string.chars().all(|c| {
c == '0' || c == '1' || c == '2' || c == '3' ||
c == '4' || c == '5' || c == '6' || c == '7' ||
c == '8' || c == '9' || c == 'a' || c == 'b' ||
c == 'c' || c == 'd' || c == 'e' || c == 'f'
})
string.len() & 1 == 0
&& string.chars().all(|c| {
c == '0'
|| c == '1'
|| c == '2'
|| c == '3'
|| c == '4'
|| c == '5'
|| c == '6'
|| c == '7'
|| c == '8'
|| c == '9'
|| c == 'a'
|| c == 'b'
|| c == 'c'
|| c == 'd'
|| c == 'e'
|| c == 'f'
})
}

fn is_uppercase_hex(string: &str) -> bool {
string.len() & 1 == 0 && string.chars().all(|c| {
c == '0' || c == '1' || c == '2' || c == '3' ||
c == '4' || c == '5' || c == '6' || c == '7' ||
c == '8' || c == '9' || c == 'A' || c == 'B' ||
c == 'C' || c == 'D' || c == 'E' || c == 'F'
})
string.len() & 1 == 0
&& string.chars().all(|c| {
c == '0'
|| c == '1'
|| c == '2'
|| c == '3'
|| c == '4'
|| c == '5'
|| c == '6'
|| c == '7'
|| c == '8'
|| c == '9'
|| c == 'A'
|| c == 'B'
|| c == 'C'
|| c == 'D'
|| c == 'E'
|| c == 'F'
})
}

10 changes: 4 additions & 6 deletions src/mem_store/lz4.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
extern crate lz4;

use std::io::{Read, Write};
use std::mem;
use std::slice::{from_raw_parts, from_raw_parts_mut};
use std::fmt::Debug;


pub fn decoder(data: &[u8]) -> lz4::Decoder<&[u8]> {
lz4::Decoder::new(data).unwrap()
pub fn decoder(data: &[u8]) -> lz4_flex::frame::FrameDecoder<&[u8]> {
lz4_flex::frame::FrameDecoder::new(data)
}

pub fn encode<T: Debug>(data: &[T]) -> Vec<u8> {
Expand All @@ -20,9 +18,9 @@ pub fn encode<T: Debug>(data: &[T]) -> Vec<u8> {

let mut result = Vec::new();
{
let mut encoder = lz4::EncoderBuilder::new().build(&mut result).unwrap();
let mut encoder = lz4_flex::frame::FrameEncoder::new(&mut result);
encoder.write_all(data_u8).unwrap();
encoder.finish().1.unwrap();
encoder.finish().unwrap();
}
result
}
Expand Down
12 changes: 1 addition & 11 deletions src/mem_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod column_builder;
pub mod floats;
pub mod integers;
pub(crate) mod lru;
#[cfg(feature = "enable_lz4")]
pub mod lz4;
mod mixed_column;
pub mod partition;
Expand All @@ -19,13 +18,4 @@ pub use self::column::{Column, DataSection, DataSource};
pub use self::lru::Lru;
pub use self::table::TableStats;
pub use self::tree::*;
pub use self::value::Val;

#[cfg(not(feature = "enable_lz4"))]
pub mod lz4 {
use std::fmt::Debug;

pub fn encode<T: Debug>(_: &[T]) -> Vec<u8> {
panic!("lz4 not supported in this build of LocustDB. Recompile with --features enable_lz4.")
}
}
pub use self::value::Val;
23 changes: 8 additions & 15 deletions src/scheduler/disk_read_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub struct DiskReadScheduler {
task_queue: Mutex<VecDeque<DiskRun>>,
reader_semaphore: Semaphore,
lru: Lru,
#[allow(dead_code)]
lz4_decode: bool,

background_load_wait_queue: Condvar,
Expand Down Expand Up @@ -120,14 +119,11 @@ impl DiskReadScheduler {
if handle.is_resident() {
let mut maybe_column = handle.try_get();
if let Some(ref mut column) = *maybe_column {
#[cfg(feature = "enable_lz4")]
{
if self.lz4_decode {
if let Some(c) = Arc::get_mut(column) {
c.lz4_decode()
};
handle.update_size_bytes(column.heap_size_of_children());
}
if self.lz4_decode {
if let Some(c) = Arc::get_mut(column) {
c.lz4_decode()
};
handle.update_size_bytes(column.heap_size_of_children());
}
self.lru.touch(handle.key());
return column.clone();
Expand Down Expand Up @@ -157,12 +153,9 @@ impl DiskReadScheduler {
let mut maybe_column = _handle.try_get();
// TODO: if not main handle, put it at back of lru
self.lru.put(_handle.key().clone());
#[cfg(feature = "enable_lz4")]
{
if self.lz4_decode {
column.lz4_decode();
_handle.update_size_bytes(column.heap_size_of_children());
}
if self.lz4_decode {
column.lz4_decode();
_handle.update_size_bytes(column.heap_size_of_children());
}
let column = Arc::new(column);
*maybe_column = Some(column.clone());
Expand Down

0 comments on commit 01b819c

Please sign in to comment.