Skip to content

Commit

Permalink
Simplified codec API (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Oct 18, 2021
1 parent 7fbf8e4 commit 93376c7
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 257 deletions.
324 changes: 91 additions & 233 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,174 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::io::{Read, Write};

pub use super::parquet_bridge::Compression;

use crate::error::{ParquetError, Result};

/// Parquet compression codec interface.
pub trait Codec: std::fmt::Debug {
/// Compresses data stored in slice `input_buf` and writes the compressed result
/// to `output_buf`.
/// Note that you'll need to call `clear()` before reusing the same `output_buf`
/// across different `compress` calls.
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;

/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`.
/// Returns the total number of bytes written.
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()>;
}

/// Given the compression type `codec`, returns a codec used to compress and decompress
/// bytes for the compression type.
/// This returns `None` if the codec type is `UNCOMPRESSED`.
pub fn create_codec(codec: &Compression) -> Result<Option<Box<dyn Codec>>> {
match *codec {
/// Compresses data stored in slice `input_buf` and writes the compressed result
/// to `output_buf`.
/// Note that you'll need to call `clear()` before reusing the same `output_buf`
/// across different `compress` calls.
pub fn compress(
compression: Compression,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
) -> Result<()> {
match compression {
#[cfg(feature = "brotli")]
Compression::Brotli => Ok(Some(Box::new(BrotliCodec::new()))),
#[cfg(feature = "gzip")]
Compression::Gzip => Ok(Some(Box::new(GZipCodec::new()))),
#[cfg(feature = "snappy")]
Compression::Snappy => Ok(Some(Box::new(SnappyCodec::new()))),
#[cfg(feature = "lz4")]
Compression::Lz4 => Ok(Some(Box::new(Lz4Codec::new()))),
#[cfg(feature = "zstd")]
Compression::Zstd => Ok(Some(Box::new(ZstdCodec::new()))),
Compression::Uncompressed => Ok(None),
_ => Err(general_err!("Compression {:?} is not installed", codec)),
}
}

#[cfg(feature = "snappy")]
mod snappy_codec {
use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};

use crate::compression::Codec;
use crate::error::Result;

/// Codec for Snappy compression format.
#[derive(Debug)]
pub struct SnappyCodec {
decoder: Decoder,
encoder: Encoder,
}

impl SnappyCodec {
/// Creates new Snappy compression codec.
pub(crate) fn new() -> Self {
Self {
decoder: Decoder::new(),
encoder: Encoder::new(),
}
}
}

impl Codec for SnappyCodec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
let len = decompress_len(input_buf)?;
assert!(len <= output_buf.len());
self.decoder
.decompress(input_buf, output_buf)
.map_err(|e| e.into())
.map(|_| ())
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let output_buf_len = output_buf.len();
let required_len = max_compress_len(input_buf.len());
output_buf.resize(output_buf_len + required_len, 0);
let n = self
.encoder
.compress(input_buf, &mut output_buf[output_buf_len..])?;
output_buf.truncate(output_buf_len + n);
Ok(())
}
}
}
#[cfg(feature = "snappy")]
pub use snappy_codec::*;

#[cfg(feature = "gzip")]
mod gzip_codec {

use std::io::{Read, Write};

use flate2::{read, write, Compression};

use crate::compression::Codec;
use crate::error::Result;

/// Codec for GZIP compression algorithm.
#[derive(Debug)]
pub struct GZipCodec {}

impl GZipCodec {
/// Creates new GZIP compression codec.
pub(crate) fn new() -> Self {
Self {}
}
}

impl Codec for GZipCodec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
let mut decoder = read::GzDecoder::new(input_buf);
decoder.read_exact(output_buf).map_err(|e| e.into())
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = write::GzEncoder::new(output_buf, Compression::default());
encoder.write_all(input_buf)?;
encoder.try_finish().map_err(|e| e.into())
}
}
}
#[cfg(feature = "gzip")]
pub use gzip_codec::*;

#[cfg(feature = "brotli")]
mod brotli_codec {

use std::io::{Read, Write};

use crate::compression::Codec;
use crate::error::Result;

const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22

/// Codec for Brotli compression algorithm.
#[derive(Debug)]
pub struct BrotliCodec {}

impl BrotliCodec {
/// Creates new Brotli compression codec.
pub(crate) fn new() -> Self {
Self {}
}
}

impl Codec for BrotliCodec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
.read_exact(output_buf)
.map_err(|e| e.into())
}
Compression::Brotli => {
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = brotli::CompressorWriter::new(
output_buf,
BROTLI_DEFAULT_BUFFER_SIZE,
Expand All @@ -178,38 +29,26 @@ mod brotli_codec {
encoder.write_all(input_buf)?;
encoder.flush().map_err(|e| e.into())
}
}
}
#[cfg(feature = "brotli")]
pub use brotli_codec::*;

#[cfg(feature = "lz4")]
mod lz4_codec {
use std::io::{Read, Write};

use crate::compression::Codec;
use crate::error::Result;

const LZ4_BUFFER_SIZE: usize = 4096;

/// Codec for LZ4 compression algorithm.
#[derive(Debug)]
pub struct Lz4Codec {}

impl Lz4Codec {
/// Creates new LZ4 compression codec.
pub(crate) fn new() -> Self {
Self {}
#[cfg(feature = "gzip")]
Compression::Gzip => {
let mut encoder = flate2::write::GzEncoder::new(output_buf, Default::default());
encoder.write_all(input_buf)?;
encoder.try_finish().map_err(|e| e.into())
}
}
#[cfg(feature = "snappy")]
Compression::Snappy => {
use snap::raw::{max_compress_len, Encoder};

impl Codec for Lz4Codec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
let mut decoder = lz4::Decoder::new(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
let output_buf_len = output_buf.len();
let required_len = max_compress_len(input_buf.len());
output_buf.resize(output_buf_len + required_len, 0);
let n = Encoder::new().compress(input_buf, &mut output_buf[output_buf_len..])?;
output_buf.truncate(output_buf_len + n);
Ok(())
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
#[cfg(feature = "lz4")]
Compression::Lz4 => {
const LZ4_BUFFER_SIZE: usize = 4096;
let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
let mut from = 0;
loop {
Expand All @@ -222,72 +61,91 @@ mod lz4_codec {
}
encoder.finish().1.map_err(|e| e.into())
}
}
}
#[cfg(feature = "lz4")]
pub use lz4_codec::*;

#[cfg(feature = "zstd")]
mod zstd_codec {
use std::io::Read;
use std::io::Write;

use crate::compression::Codec;
use crate::error::Result;

/// Codec for Zstandard compression algorithm.
#[derive(Debug)]
pub struct ZstdCodec {}
#[cfg(feature = "zstd")]
Compression::Zstd => {
/// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed.
const ZSTD_COMPRESSION_LEVEL: i32 = 1;

impl ZstdCodec {
/// Creates new Zstandard compression codec.
pub(crate) fn new() -> Self {
Self {}
let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
Compression::Uncompressed => {
Err(general_err!("Compressing without compression is not valid"))
}
_ => Err(general_err!(
"Compression {:?} is not installed",
compression
)),
}
}

/// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed.
const ZSTD_COMPRESSION_LEVEL: i32 = 1;
/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`.
/// Returns the total number of bytes written.
pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
match compression {
#[cfg(feature = "brotli")]
Compression::Brotli => {
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
.read_exact(output_buf)
.map_err(|e| e.into())
}
#[cfg(feature = "gzip")]
Compression::Gzip => {
let mut decoder = flate2::read::GzDecoder::new(input_buf);
decoder.read_exact(output_buf).map_err(|e| e.into())
}
#[cfg(feature = "snappy")]
Compression::Snappy => {
use snap::raw::{decompress_len, Decoder};

impl Codec for ZstdCodec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
let len = decompress_len(input_buf)?;
assert!(len <= output_buf.len());
Decoder::new()
.decompress(input_buf, output_buf)
.map_err(|e| e.into())
.map(|_| ())
}
#[cfg(feature = "lz4")]
Compression::Lz4 => {
let mut decoder = lz4::Decoder::new(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
}
#[cfg(feature = "zstd")]
Compression::Zstd => {
let mut decoder = zstd::Decoder::new(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
Compression::Uncompressed => {
Err(general_err!("Compressing without compression is not valid"))
}
_ => Err(general_err!(
"Compression {:?} is not installed",
compression
)),
}
}
#[cfg(any(feature = "zstd"))]
pub use zstd_codec::*;

#[cfg(test)]
mod tests {
use super::*;

fn test_roundtrip(c: Compression, data: &[u8]) {
let mut c1 = create_codec(&c).unwrap().unwrap();

let offset = 2;

// Compress to a buffer that already has data is possible
let mut compressed = vec![2; offset];
c1.compress(data, &mut compressed)
.expect("Error when compressing");
compress(c, data, &mut compressed).expect("Error when compressing");

// data is compressed...
assert!(compressed.len() - 2 < data.len());

let mut decompressed = vec![0; data.len()];
c1.decompress(&compressed[offset..], &mut decompressed)
.expect("Error when decompressing");
decompress(c, &compressed[offset..], &mut decompressed).expect("Error when decompressing");
assert_eq!(data, decompressed.as_slice());
}

Expand Down
7 changes: 3 additions & 4 deletions src/page/page_dict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use primitive::PrimitivePageDict;

use std::{any::Any, sync::Arc};

use crate::compression::{create_codec, Compression};
use crate::compression::{decompress, Compression};
use crate::error::{ParquetError, Result};
use crate::schema::types::PhysicalType;

Expand Down Expand Up @@ -51,10 +51,9 @@ pub fn read_dict_page(
is_sorted: bool,
physical_type: &PhysicalType,
) -> Result<Arc<dyn DictPage>> {
let decompressor = create_codec(&compression.0)?;
if let Some(mut decompressor) = decompressor {
if compression.0 != Compression::Uncompressed {
let mut decompressed = vec![0; compression.1];
decompressor.decompress(&page.buffer, &mut decompressed)?;
decompress(compression.0, &page.buffer, &mut decompressed)?;
deserialize(&decompressed, page.num_values, is_sorted, physical_type)
} else {
deserialize(&page.buffer, page.num_values, is_sorted, physical_type)
Expand Down
Loading

0 comments on commit 93376c7

Please sign in to comment.