Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to write Avro async (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jan 6, 2022
1 parent e001ba5 commit 299df30
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 21 deletions.
3 changes: 3 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub mod read;
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod write_async;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
Expand Down
36 changes: 23 additions & 13 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,35 @@ use crate::error::Result;
pub use super::Compression;

mod header;
use header::serialize_header;
pub(super) use header::serialize_header;
mod schema;
pub use schema::to_avro_schema;
mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod compress;
mod util;
pub(super) mod util;
pub use compress::compress;

pub use super::{Block, CompressedBlock};

const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
pub(super) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
pub(super) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8];

/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
fields: Vec<AvroField>,
compression: Option<Compression>,
) -> Result<()> {
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
let avro_magic = [b'O', b'b', b'j', 1u8];
writer.write_all(&avro_magic)?;
writer.write_all(&AVRO_MAGIC)?;

// * file metadata, including the schema.
let schema = AvroSchema::Record(Record::new("", fields));
let header = serialize_header(&schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;
write_schema(writer, &schema, compression)?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER)?;
Expand Down Expand Up @@ -68,3 +62,19 @@ pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) {
}
}
}

pub(super) fn write_schema<W: std::io::Write>(
writer: &mut W,
schema: &AvroSchema,
compression: Option<Compression>,
) -> Result<()> {
let header = serialize_header(schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;
Ok(())
}
26 changes: 26 additions & 0 deletions src/io/avro/write_async/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use futures::{AsyncWrite, AsyncWriteExt};

use crate::error::Result;

use super::super::write::{util::zigzag_encode, SYNC_NUMBER};
use super::super::CompressedBlock;

/// Writes a [`CompressedBlock`] to `writer`
pub async fn write_block<W>(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()>
where
W: AsyncWrite + Unpin,
{
// write size and rows
let mut scratch = Vec::with_capacity(10);
zigzag_encode(compressed_block.number_of_rows as i64, &mut scratch)?;
writer.write_all(&scratch).await?;
scratch.clear();
zigzag_encode(compressed_block.data.len() as i64, &mut scratch)?;
writer.write_all(&scratch).await?;

writer.write_all(&compressed_block.data).await?;

writer.write_all(&SYNC_NUMBER).await?;

Ok(())
}
38 changes: 38 additions & 0 deletions src/io/avro/write_async/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Async write Avro
mod block;
pub use block::write_block;

use avro_schema::{Field as AvroField, Record, Schema as AvroSchema};
use futures::{AsyncWrite, AsyncWriteExt};

use crate::error::Result;

use super::{
write::{write_schema, AVRO_MAGIC, SYNC_NUMBER},
Compression,
};

/// Writes Avro's metadata to `writer`.
pub async fn write_metadata<W>(
writer: &mut W,
fields: Vec<AvroField>,
compression: Option<Compression>,
) -> Result<()>
where
W: AsyncWrite + Unpin,
{
writer.write_all(&AVRO_MAGIC).await?;

// * file metadata, including the schema.
let schema = AvroSchema::Record(Record::new("", fields));

let mut scratch = vec![];
write_schema(&mut scratch, &schema, compression)?;

writer.write_all(&scratch).await?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER).await?;

Ok(())
}
2 changes: 2 additions & 0 deletions tests/it/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ mod read;
#[cfg(feature = "io_avro_async")]
mod read_async;
mod write;
#[cfg(feature = "io_avro_async")]
mod write_async;
27 changes: 19 additions & 8 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::avro::write;
use arrow2::io::avro::{write, CompressedBlock};
use arrow2::types::months_days_ns;

fn schema() -> Schema {
use super::read::read_avro;

pub(super) fn schema() -> Schema {
Schema::from(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Utf8, false),
Expand All @@ -21,7 +23,7 @@ fn schema() -> Schema {
])
}

fn data() -> Chunk<Arc<dyn Array>> {
pub(super) fn data() -> Chunk<Arc<dyn Array>> {
let columns = vec![
Arc::new(Int64Array::from_slice([27, 47])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from_slice(["foo", "bar"])) as Arc<dyn Array>,
Expand All @@ -40,13 +42,11 @@ fn data() -> Chunk<Arc<dyn Array>> {
Chunk::try_new(columns).unwrap()
}

use super::read::read_avro;

fn write_avro<R: AsRef<dyn Array>>(
pub(super) fn serialize_to_block<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<Vec<u8>> {
) -> Result<CompressedBlock> {
let avro_fields = write::to_avro_schema(schema)?;

let mut serializers = columns
Expand All @@ -64,9 +64,20 @@ fn write_avro<R: AsRef<dyn Array>>(

write::compress(&mut block, &mut compressed_block, compression)?;

Ok(compressed_block)
}

fn write_avro<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<Vec<u8>> {
let compressed_block = serialize_to_block(columns, schema, compression)?;

let avro_fields = write::to_avro_schema(schema)?;
let mut file = vec![];

write::write_metadata(&mut file, avro_fields.clone(), compression)?;
write::write_metadata(&mut file, avro_fields, compression)?;

write::write_block(&mut file, &compressed_block)?;

Expand Down
47 changes: 47 additions & 0 deletions tests/it/io/avro/write_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::avro::write;
use arrow2::io::avro::write_async;

use super::read::read_avro;
use super::write::{data, schema, serialize_to_block};

async fn write_avro<R: AsRef<dyn Array>>(
columns: &Chunk<R>,
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<Vec<u8>> {
// usually done on a different thread pool
let compressed_block = serialize_to_block(columns, schema, compression)?;

let avro_fields = write::to_avro_schema(schema)?;
let mut file = vec![];

write_async::write_metadata(&mut file, avro_fields.clone(), compression).await?;

write_async::write_block(&mut file, &compressed_block).await?;

Ok(file)
}

async fn roundtrip(compression: Option<write::Compression>) -> Result<()> {
let expected = data();
let expected_schema = schema();

let data = write_avro(&expected, &expected_schema, compression).await?;

let (result, read_schema) = read_avro(&data)?;

assert_eq!(expected_schema, read_schema);
for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) {
assert_eq!(c1.as_ref(), c2.as_ref());
}
Ok(())
}

#[tokio::test]
async fn no_compression() -> Result<()> {
roundtrip(None).await
}

0 comments on commit 299df30

Please sign in to comment.