From 93376c76d268a6e54585036bcf0af733d191ed99 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 18 Oct 2021 07:00:35 +0200 Subject: [PATCH] Simplified codec API (#63) --- src/compression.rs | 324 +++++++++++--------------------------- src/page/page_dict/mod.rs | 7 +- src/read/compression.rs | 26 +-- src/write/compression.rs | 18 ++- 4 files changed, 118 insertions(+), 257 deletions(-) diff --git a/src/compression.rs b/src/compression.rs index c073ff0db..d892cc601 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -1,174 +1,25 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +use std::io::{Read, Write}; pub use super::parquet_bridge::Compression; use crate::error::{ParquetError, Result}; -/// Parquet compression codec interface. -pub trait Codec: std::fmt::Debug { - /// Compresses data stored in slice `input_buf` and writes the compressed result - /// to `output_buf`. - /// Note that you'll need to call `clear()` before reusing the same `output_buf` - /// across different `compress` calls. - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()>; - - /// Decompresses data stored in slice `input_buf` and writes output to `output_buf`. - /// Returns the total number of bytes written. - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()>; -} - -/// Given the compression type `codec`, returns a codec used to compress and decompress -/// bytes for the compression type. -/// This returns `None` if the codec type is `UNCOMPRESSED`. -pub fn create_codec(codec: &Compression) -> Result>> { - match *codec { +/// Compresses data stored in slice `input_buf` and writes the compressed result +/// to `output_buf`. +/// Note that you'll need to call `clear()` before reusing the same `output_buf` +/// across different `compress` calls. +pub fn compress( + compression: Compression, + input_buf: &[u8], + output_buf: &mut Vec, +) -> Result<()> { + match compression { #[cfg(feature = "brotli")] - Compression::Brotli => Ok(Some(Box::new(BrotliCodec::new()))), - #[cfg(feature = "gzip")] - Compression::Gzip => Ok(Some(Box::new(GZipCodec::new()))), - #[cfg(feature = "snappy")] - Compression::Snappy => Ok(Some(Box::new(SnappyCodec::new()))), - #[cfg(feature = "lz4")] - Compression::Lz4 => Ok(Some(Box::new(Lz4Codec::new()))), - #[cfg(feature = "zstd")] - Compression::Zstd => Ok(Some(Box::new(ZstdCodec::new()))), - Compression::Uncompressed => Ok(None), - _ => Err(general_err!("Compression {:?} is not installed", codec)), - } -} - -#[cfg(feature = "snappy")] -mod snappy_codec { - use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; - - use crate::compression::Codec; - use crate::error::Result; - - /// Codec for Snappy compression format. - #[derive(Debug)] - pub struct SnappyCodec { - decoder: Decoder, - encoder: Encoder, - } - - impl SnappyCodec { - /// Creates new Snappy compression codec. - pub(crate) fn new() -> Self { - Self { - decoder: Decoder::new(), - encoder: Encoder::new(), - } - } - } - - impl Codec for SnappyCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> { - let len = decompress_len(input_buf)?; - assert!(len <= output_buf.len()); - self.decoder - .decompress(input_buf, output_buf) - .map_err(|e| e.into()) - .map(|_| ()) - } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let output_buf_len = output_buf.len(); - let required_len = max_compress_len(input_buf.len()); - output_buf.resize(output_buf_len + required_len, 0); - let n = self - .encoder - .compress(input_buf, &mut output_buf[output_buf_len..])?; - output_buf.truncate(output_buf_len + n); - Ok(()) - } - } -} -#[cfg(feature = "snappy")] -pub use snappy_codec::*; - -#[cfg(feature = "gzip")] -mod gzip_codec { - - use std::io::{Read, Write}; - - use flate2::{read, write, Compression}; - - use crate::compression::Codec; - use crate::error::Result; - - /// Codec for GZIP compression algorithm. - #[derive(Debug)] - pub struct GZipCodec {} - - impl GZipCodec { - /// Creates new GZIP compression codec. - pub(crate) fn new() -> Self { - Self {} - } - } - - impl Codec for GZipCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> { - let mut decoder = read::GzDecoder::new(input_buf); - decoder.read_exact(output_buf).map_err(|e| e.into()) - } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = write::GzEncoder::new(output_buf, Compression::default()); - encoder.write_all(input_buf)?; - encoder.try_finish().map_err(|e| e.into()) - } - } -} -#[cfg(feature = "gzip")] -pub use gzip_codec::*; - -#[cfg(feature = "brotli")] -mod brotli_codec { - - use std::io::{Read, Write}; - - use crate::compression::Codec; - use crate::error::Result; - - const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096; - const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9 - const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22 - - /// Codec for Brotli compression algorithm. - #[derive(Debug)] - pub struct BrotliCodec {} - - impl BrotliCodec { - /// Creates new Brotli compression codec. - pub(crate) fn new() -> Self { - Self {} - } - } - - impl Codec for BrotliCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> { - brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE) - .read_exact(output_buf) - .map_err(|e| e.into()) - } + Compression::Brotli => { + const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096; + const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9 + const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22 - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let mut encoder = brotli::CompressorWriter::new( output_buf, BROTLI_DEFAULT_BUFFER_SIZE, @@ -178,38 +29,26 @@ mod brotli_codec { encoder.write_all(input_buf)?; encoder.flush().map_err(|e| e.into()) } - } -} -#[cfg(feature = "brotli")] -pub use brotli_codec::*; - -#[cfg(feature = "lz4")] -mod lz4_codec { - use std::io::{Read, Write}; - - use crate::compression::Codec; - use crate::error::Result; - - const LZ4_BUFFER_SIZE: usize = 4096; - - /// Codec for LZ4 compression algorithm. - #[derive(Debug)] - pub struct Lz4Codec {} - - impl Lz4Codec { - /// Creates new LZ4 compression codec. - pub(crate) fn new() -> Self { - Self {} + #[cfg(feature = "gzip")] + Compression::Gzip => { + let mut encoder = flate2::write::GzEncoder::new(output_buf, Default::default()); + encoder.write_all(input_buf)?; + encoder.try_finish().map_err(|e| e.into()) } - } + #[cfg(feature = "snappy")] + Compression::Snappy => { + use snap::raw::{max_compress_len, Encoder}; - impl Codec for Lz4Codec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> { - let mut decoder = lz4::Decoder::new(input_buf)?; - decoder.read_exact(output_buf).map_err(|e| e.into()) + let output_buf_len = output_buf.len(); + let required_len = max_compress_len(input_buf.len()); + output_buf.resize(output_buf_len + required_len, 0); + let n = Encoder::new().compress(input_buf, &mut output_buf[output_buf_len..])?; + output_buf.truncate(output_buf_len + n); + Ok(()) } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + #[cfg(feature = "lz4")] + Compression::Lz4 => { + const LZ4_BUFFER_SIZE: usize = 4096; let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; let mut from = 0; loop { @@ -222,72 +61,91 @@ mod lz4_codec { } encoder.finish().1.map_err(|e| e.into()) } - } -} -#[cfg(feature = "lz4")] -pub use lz4_codec::*; - -#[cfg(feature = "zstd")] -mod zstd_codec { - use std::io::Read; - use std::io::Write; - - use crate::compression::Codec; - use crate::error::Result; - - /// Codec for Zstandard compression algorithm. - #[derive(Debug)] - pub struct ZstdCodec {} + #[cfg(feature = "zstd")] + Compression::Zstd => { + /// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed. + const ZSTD_COMPRESSION_LEVEL: i32 = 1; - impl ZstdCodec { - /// Creates new Zstandard compression codec. - pub(crate) fn new() -> Self { - Self {} + let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?; + encoder.write_all(input_buf)?; + match encoder.finish() { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } + } + Compression::Uncompressed => { + Err(general_err!("Compressing without compression is not valid")) } + _ => Err(general_err!( + "Compression {:?} is not installed", + compression + )), } +} - /// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed. - const ZSTD_COMPRESSION_LEVEL: i32 = 1; +/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`. +/// Returns the total number of bytes written. +pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> { + match compression { + #[cfg(feature = "brotli")] + Compression::Brotli => { + const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096; + brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE) + .read_exact(output_buf) + .map_err(|e| e.into()) + } + #[cfg(feature = "gzip")] + Compression::Gzip => { + let mut decoder = flate2::read::GzDecoder::new(input_buf); + decoder.read_exact(output_buf).map_err(|e| e.into()) + } + #[cfg(feature = "snappy")] + Compression::Snappy => { + use snap::raw::{decompress_len, Decoder}; - impl Codec for ZstdCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> { + let len = decompress_len(input_buf)?; + assert!(len <= output_buf.len()); + Decoder::new() + .decompress(input_buf, output_buf) + .map_err(|e| e.into()) + .map(|_| ()) + } + #[cfg(feature = "lz4")] + Compression::Lz4 => { + let mut decoder = lz4::Decoder::new(input_buf)?; + decoder.read_exact(output_buf).map_err(|e| e.into()) + } + #[cfg(feature = "zstd")] + Compression::Zstd => { let mut decoder = zstd::Decoder::new(input_buf)?; decoder.read_exact(output_buf).map_err(|e| e.into()) } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?; - encoder.write_all(input_buf)?; - match encoder.finish() { - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - } + Compression::Uncompressed => { + Err(general_err!("Compressing without compression is not valid")) } + _ => Err(general_err!( + "Compression {:?} is not installed", + compression + )), } } -#[cfg(any(feature = "zstd"))] -pub use zstd_codec::*; #[cfg(test)] mod tests { use super::*; fn test_roundtrip(c: Compression, data: &[u8]) { - let mut c1 = create_codec(&c).unwrap().unwrap(); - let offset = 2; // Compress to a buffer that already has data is possible let mut compressed = vec![2; offset]; - c1.compress(data, &mut compressed) - .expect("Error when compressing"); + compress(c, data, &mut compressed).expect("Error when compressing"); // data is compressed... assert!(compressed.len() - 2 < data.len()); let mut decompressed = vec![0; data.len()]; - c1.decompress(&compressed[offset..], &mut decompressed) - .expect("Error when decompressing"); + decompress(c, &compressed[offset..], &mut decompressed).expect("Error when decompressing"); assert_eq!(data, decompressed.as_slice()); } diff --git a/src/page/page_dict/mod.rs b/src/page/page_dict/mod.rs index 3d3df1166..b1bb14e42 100644 --- a/src/page/page_dict/mod.rs +++ b/src/page/page_dict/mod.rs @@ -8,7 +8,7 @@ pub use primitive::PrimitivePageDict; use std::{any::Any, sync::Arc}; -use crate::compression::{create_codec, Compression}; +use crate::compression::{decompress, Compression}; use crate::error::{ParquetError, Result}; use crate::schema::types::PhysicalType; @@ -51,10 +51,9 @@ pub fn read_dict_page( is_sorted: bool, physical_type: &PhysicalType, ) -> Result> { - let decompressor = create_codec(&compression.0)?; - if let Some(mut decompressor) = decompressor { + if compression.0 != Compression::Uncompressed { let mut decompressed = vec![0; compression.1]; - decompressor.decompress(&page.buffer, &mut decompressed)?; + decompress(compression.0, &page.buffer, &mut decompressed)?; deserialize(&decompressed, page.num_values, is_sorted, physical_type) } else { deserialize(&page.buffer, page.num_values, is_sorted, physical_type) diff --git a/src/read/compression.rs b/src/read/compression.rs index c304e1d9d..a861f643c 100644 --- a/src/read/compression.rs +++ b/src/read/compression.rs @@ -1,22 +1,21 @@ use parquet_format_async_temp::DataPageHeaderV2; use streaming_decompression; -use crate::compression::{create_codec, Codec}; +use crate::compression::{self, Compression}; use crate::error::{ParquetError, Result}; use crate::page::{CompressedDataPage, DataPage, DataPageHeader}; -use crate::parquet_bridge::Compression; use crate::FallibleStreamingIterator; use super::PageIterator; -fn decompress_v1(compressed: &[u8], decompressor: &mut dyn Codec, buffer: &mut [u8]) -> Result<()> { - decompressor.decompress(compressed, buffer) +fn decompress_v1(compressed: &[u8], compression: Compression, buffer: &mut [u8]) -> Result<()> { + compression::decompress(compression, compressed, buffer) } fn decompress_v2( compressed: &[u8], page_header: &DataPageHeaderV2, - decompressor: &mut dyn Codec, + compression: Compression, buffer: &mut [u8], ) -> Result<()> { // When processing data page v2, depending on enabled compression for the @@ -33,7 +32,7 @@ fn decompress_v2( if can_decompress { (&mut buffer[..offset]).copy_from_slice(&compressed[..offset]); - decompressor.decompress(&compressed[offset..], &mut buffer[offset..])?; + compression::decompress(compression, &compressed[offset..], &mut buffer[offset..])?; } else { buffer.copy_from_slice(compressed); } @@ -47,19 +46,22 @@ pub fn decompress_buffer( compressed_page: &mut CompressedDataPage, buffer: &mut Vec, ) -> Result { - let codec = create_codec(&compressed_page.compression())?; - - if let Some(mut codec) = codec { + if compressed_page.compression() != Compression::Uncompressed { let compressed_buffer = &compressed_page.buffer; // prepare the compression buffer buffer.clear(); buffer.resize(compressed_page.uncompressed_size(), 0); match compressed_page.header() { - DataPageHeader::V1(_) => decompress_v1(compressed_buffer, codec.as_mut(), buffer)?, - DataPageHeader::V2(header) => { - decompress_v2(compressed_buffer, header, codec.as_mut(), buffer)? + DataPageHeader::V1(_) => { + decompress_v1(compressed_buffer, compressed_page.compression(), buffer)? } + DataPageHeader::V2(header) => decompress_v2( + compressed_buffer, + header, + compressed_page.compression(), + buffer, + )?, } Ok(true) } else { diff --git a/src/write/compression.rs b/src/write/compression.rs index bfa7b6e35..13e627481 100644 --- a/src/write/compression.rs +++ b/src/write/compression.rs @@ -3,7 +3,7 @@ use crate::page::{CompressedDictPage, CompressedPage, DataPageHeader, EncodedDic use crate::parquet_bridge::Compression; use crate::FallibleStreamingIterator; use crate::{ - compression::create_codec, + compression, page::{CompressedDataPage, DataPage, EncodedPage}, }; @@ -20,18 +20,21 @@ fn compress_data( descriptor, } = page; let uncompressed_page_size = buffer.len(); - let codec = create_codec(&compression)?; - if let Some(mut codec) = codec { + if compression != Compression::Uncompressed { match &header { DataPageHeader::V1(_) => { - codec.compress(&buffer, &mut compressed_buffer)?; + compression::compress(compression, &buffer, &mut compressed_buffer)?; } DataPageHeader::V2(header) => { let levels_byte_length = (header.repetition_levels_byte_length + header.definition_levels_byte_length) as usize; compressed_buffer.extend_from_slice(&buffer[..levels_byte_length]); - codec.compress(&buffer[levels_byte_length..], &mut compressed_buffer)?; + compression::compress( + compression, + &buffer[levels_byte_length..], + &mut compressed_buffer, + )?; } }; } else { @@ -56,9 +59,8 @@ fn compress_dict( mut buffer, num_values, } = page; - let codec = create_codec(&compression)?; - if let Some(mut codec) = codec { - codec.compress(&buffer, &mut compressed_buffer)?; + if compression != Compression::Uncompressed { + compression::compress(compression, &buffer, &mut compressed_buffer)?; } else { std::mem::swap(&mut buffer, &mut compressed_buffer); }