diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index cd32ec2c880..aff28da0c6e 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -7,6 +7,9 @@ pub mod read; #[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] pub mod read_async; pub mod write; +#[cfg(feature = "io_avro_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] +pub mod write_async; /// Valid compressions #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs index cfb227d449e..158ec571a8d 100644 --- a/src/io/avro/write/mod.rs +++ b/src/io/avro/write/mod.rs @@ -6,7 +6,7 @@ use crate::error::Result; pub use super::Compression; mod header; -use header::serialize_header; +pub(super) use header::serialize_header; mod schema; pub use schema::to_avro_schema; mod serialize; @@ -14,12 +14,14 @@ pub use serialize::{can_serialize, new_serializer, BoxSerializer}; mod block; pub use block::*; mod compress; -mod util; +pub(super) mod util; pub use compress::compress; pub use super::{Block, CompressedBlock}; -const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; +pub(super) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; +// * Four bytes, ASCII 'O', 'b', 'j', followed by 1. +pub(super) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8]; /// Writes Avro's metadata to `writer`. pub fn write_metadata( @@ -27,20 +29,12 @@ pub fn write_metadata( fields: Vec, compression: Option, ) -> Result<()> { - // * Four bytes, ASCII 'O', 'b', 'j', followed by 1. - let avro_magic = [b'O', b'b', b'j', 1u8]; - writer.write_all(&avro_magic)?; + writer.write_all(&AVRO_MAGIC)?; // * file metadata, including the schema. let schema = AvroSchema::Record(Record::new("", fields)); - let header = serialize_header(&schema, compression)?; - util::zigzag_encode(header.len() as i64, writer)?; - for (name, item) in header { - util::write_binary(name.as_bytes(), writer)?; - util::write_binary(&item, writer)?; - } - writer.write_all(&[0])?; + write_schema(writer, &schema, compression)?; // The 16-byte, randomly-generated sync marker for this file. writer.write_all(&SYNC_NUMBER)?; @@ -68,3 +62,19 @@ pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) { } } } + +pub(super) fn write_schema( + writer: &mut W, + schema: &AvroSchema, + compression: Option, +) -> Result<()> { + let header = serialize_header(schema, compression)?; + + util::zigzag_encode(header.len() as i64, writer)?; + for (name, item) in header { + util::write_binary(name.as_bytes(), writer)?; + util::write_binary(&item, writer)?; + } + writer.write_all(&[0])?; + Ok(()) +} diff --git a/src/io/avro/write_async/block.rs b/src/io/avro/write_async/block.rs new file mode 100644 index 00000000000..6eba0a7fda4 --- /dev/null +++ b/src/io/avro/write_async/block.rs @@ -0,0 +1,26 @@ +use futures::{AsyncWrite, AsyncWriteExt}; + +use crate::error::Result; + +use super::super::write::{util::zigzag_encode, SYNC_NUMBER}; +use super::super::CompressedBlock; + +/// Writes a [`CompressedBlock`] to `writer` +pub async fn write_block(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> +where + W: AsyncWrite + Unpin, +{ + // write size and rows + let mut scratch = Vec::with_capacity(10); + zigzag_encode(compressed_block.number_of_rows as i64, &mut scratch)?; + writer.write_all(&scratch).await?; + scratch.clear(); + zigzag_encode(compressed_block.data.len() as i64, &mut scratch)?; + writer.write_all(&scratch).await?; + + writer.write_all(&compressed_block.data).await?; + + writer.write_all(&SYNC_NUMBER).await?; + + Ok(()) +} diff --git a/src/io/avro/write_async/mod.rs b/src/io/avro/write_async/mod.rs new file mode 100644 index 00000000000..81340125d4f --- /dev/null +++ b/src/io/avro/write_async/mod.rs @@ -0,0 +1,38 @@ +//! Async write Avro +mod block; +pub use block::write_block; + +use avro_schema::{Field as AvroField, Record, Schema as AvroSchema}; +use futures::{AsyncWrite, AsyncWriteExt}; + +use crate::error::Result; + +use super::{ + write::{write_schema, AVRO_MAGIC, SYNC_NUMBER}, + Compression, +}; + +/// Writes Avro's metadata to `writer`. +pub async fn write_metadata( + writer: &mut W, + fields: Vec, + compression: Option, +) -> Result<()> +where + W: AsyncWrite + Unpin, +{ + writer.write_all(&AVRO_MAGIC).await?; + + // * file metadata, including the schema. + let schema = AvroSchema::Record(Record::new("", fields)); + + let mut scratch = vec![]; + write_schema(&mut scratch, &schema, compression)?; + + writer.write_all(&scratch).await?; + + // The 16-byte, randomly-generated sync marker for this file. + writer.write_all(&SYNC_NUMBER).await?; + + Ok(()) +} diff --git a/tests/it/io/avro/mod.rs b/tests/it/io/avro/mod.rs index 29b3c6b003a..1d2dd3c4bba 100644 --- a/tests/it/io/avro/mod.rs +++ b/tests/it/io/avro/mod.rs @@ -4,3 +4,5 @@ mod read; #[cfg(feature = "io_avro_async")] mod read_async; mod write; +#[cfg(feature = "io_avro_async")] +mod write_async; diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index dd2a88f1ed7..5321f72d207 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -4,10 +4,12 @@ use arrow2::array::*; use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; -use arrow2::io::avro::write; +use arrow2::io::avro::{write, CompressedBlock}; use arrow2::types::months_days_ns; -fn schema() -> Schema { +use super::read::read_avro; + +pub(super) fn schema() -> Schema { Schema::from(vec![ Field::new("a", DataType::Int64, false), Field::new("b", DataType::Utf8, false), @@ -21,7 +23,7 @@ fn schema() -> Schema { ]) } -fn data() -> Chunk> { +pub(super) fn data() -> Chunk> { let columns = vec![ Arc::new(Int64Array::from_slice([27, 47])) as Arc, Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, @@ -40,13 +42,11 @@ fn data() -> Chunk> { Chunk::try_new(columns).unwrap() } -use super::read::read_avro; - -fn write_avro>( +pub(super) fn serialize_to_block>( columns: &Chunk, schema: &Schema, compression: Option, -) -> Result> { +) -> Result { let avro_fields = write::to_avro_schema(schema)?; let mut serializers = columns @@ -64,9 +64,20 @@ fn write_avro>( write::compress(&mut block, &mut compressed_block, compression)?; + Ok(compressed_block) +} + +fn write_avro>( + columns: &Chunk, + schema: &Schema, + compression: Option, +) -> Result> { + let compressed_block = serialize_to_block(columns, schema, compression)?; + + let avro_fields = write::to_avro_schema(schema)?; let mut file = vec![]; - write::write_metadata(&mut file, avro_fields.clone(), compression)?; + write::write_metadata(&mut file, avro_fields, compression)?; write::write_block(&mut file, &compressed_block)?; diff --git a/tests/it/io/avro/write_async.rs b/tests/it/io/avro/write_async.rs new file mode 100644 index 00000000000..6b2bf2a39fd --- /dev/null +++ b/tests/it/io/avro/write_async.rs @@ -0,0 +1,47 @@ +use arrow2::array::*; +use arrow2::chunk::Chunk; +use arrow2::datatypes::*; +use arrow2::error::Result; +use arrow2::io::avro::write; +use arrow2::io::avro::write_async; + +use super::read::read_avro; +use super::write::{data, schema, serialize_to_block}; + +async fn write_avro>( + columns: &Chunk, + schema: &Schema, + compression: Option, +) -> Result> { + // usually done on a different thread pool + let compressed_block = serialize_to_block(columns, schema, compression)?; + + let avro_fields = write::to_avro_schema(schema)?; + let mut file = vec![]; + + write_async::write_metadata(&mut file, avro_fields.clone(), compression).await?; + + write_async::write_block(&mut file, &compressed_block).await?; + + Ok(file) +} + +async fn roundtrip(compression: Option) -> Result<()> { + let expected = data(); + let expected_schema = schema(); + + let data = write_avro(&expected, &expected_schema, compression).await?; + + let (result, read_schema) = read_avro(&data)?; + + assert_eq!(expected_schema, read_schema); + for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) { + assert_eq!(c1.as_ref(), c2.as_ref()); + } + Ok(()) +} + +#[tokio::test] +async fn no_compression() -> Result<()> { + roundtrip(None).await +}