Skip to content

Commit

Permalink
cram/io/reader/header: Add reader
Browse files Browse the repository at this point in the history
  • Loading branch information
zaeleus committed Jan 8, 2025
1 parent 2048a97 commit 0770b2a
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 264 deletions.
3 changes: 1 addition & 2 deletions noodles-cram/src/async/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ where
/// # }
/// ```
pub async fn read_file_header(&mut self) -> io::Result<sam::Header> {
use self::header::container::read_header_container;
read_header_container(&mut self.inner).await
header::read_file_header(&mut self.inner).await
}

/// Reads the SAM header.
Expand Down
116 changes: 109 additions & 7 deletions noodles-cram/src/async/io/reader/header.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,138 @@
pub(super) mod container;
mod container;
mod file_id;
mod format_version;
mod magic_number;

use noodles_sam as sam;
use tokio::io::{self, AsyncRead};
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader};

use self::{
file_id::read_file_id, format_version::read_format_version, magic_number::read_magic_number,
};
use crate::FileDefinition;
use crate::{file_definition::Version, FileDefinition, MAGIC_NUMBER};

struct Reader<R> {
inner: R,
}

impl<R> Reader<R>
where
R: AsyncRead + Unpin,
{
fn new(inner: R) -> Self {
Self { inner }
}

async fn read_magic_number(&mut self) -> io::Result<[u8; MAGIC_NUMBER.len()]> {
read_magic_number(&mut self.inner).await
}

async fn read_format_version(&mut self) -> io::Result<Version> {
read_format_version(&mut self.inner).await
}

async fn read_file_id(&mut self) -> io::Result<[u8; 20]> {
read_file_id(&mut self.inner).await
}

async fn container_reader(&mut self) -> io::Result<container::Reader<&mut R>> {
let len = container::read_header(&mut self.inner).await?;
Ok(container::Reader::new(&mut self.inner, len))
}
}

pub(super) async fn read_header<R>(reader: &mut R) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
read_file_definition(reader).await?;
container::read_header_container(reader).await
read_file_header(reader).await
}

pub(super) async fn read_file_definition<R>(reader: &mut R) -> io::Result<FileDefinition>
where
R: AsyncRead + Unpin,
{
let mut header_reader = Reader::new(reader);
read_file_definition_inner(&mut header_reader).await
}

async fn read_file_definition_inner<R>(reader: &mut Reader<R>) -> io::Result<FileDefinition>
where
R: AsyncRead + Unpin,
{
use crate::io::reader::header::magic_number;

read_magic_number(reader)
reader
.read_magic_number()
.await
.and_then(magic_number::validate)?;

let version = read_format_version(reader).await?;
let file_id = read_file_id(reader).await?;
let version = reader.read_format_version().await?;
let file_id = reader.read_file_id().await?;

Ok(FileDefinition::new(version, file_id))
}

pub(super) async fn read_file_header<R>(reader: &mut R) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
let mut header_reader = Reader::new(reader);
read_file_header_inner(&mut header_reader).await
}

async fn read_file_header_inner<R>(reader: &mut Reader<R>) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
let mut container_reader = reader.container_reader().await?;

let mut raw_sam_header_reader = container_reader.raw_sam_header_reader().await?;
let header = read_sam_header(&mut raw_sam_header_reader).await?;

Ok(header)
}

async fn read_sam_header<R>(reader: &mut R) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
let mut parser = sam::header::Parser::default();

let mut header_reader = BufReader::new(reader);
let mut buf = Vec::new();

while read_line(&mut header_reader, &mut buf).await? != 0 {
parser
.parse_partial(&buf)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
}

Ok(parser.finish())
}

async fn read_line<R>(reader: &mut R, dst: &mut Vec<u8>) -> io::Result<usize>
where
R: AsyncBufRead + Unpin,
{
const LINE_FEED: u8 = b'\n';
const CARRIAGE_RETURN: u8 = b'\r';

dst.clear();

match reader.read_until(LINE_FEED, dst).await? {
0 => Ok(0),
n => {
if dst.ends_with(&[LINE_FEED]) {
dst.pop();

if dst.ends_with(&[CARRIAGE_RETURN]) {
dst.pop();
}
}

Ok(n)
}
}
}
140 changes: 17 additions & 123 deletions noodles-cram/src/async/io/reader/header/container.rs
Original file line number Diff line number Diff line change
@@ -1,140 +1,34 @@
mod block;
mod header;

use noodles_sam as sam;
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
use tokio::io::{self, AsyncRead, AsyncReadExt, Take};

use self::{block::read_block, header::read_header};
use self::block::read_block;
pub(super) use self::header::read_header;

pub async fn read_header_container<R>(reader: &mut R) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
let len = read_header(reader).await?;

let mut reader = reader.take(len);
let header = read_sam_header(&mut reader).await?;
io::copy(&mut reader, &mut io::sink()).await?;

Ok(header)
pub(super) struct Reader<R> {
inner: Take<R>,
}

async fn read_sam_header<R>(reader: &mut R) -> io::Result<sam::Header>
impl<R> Reader<R>
where
R: AsyncRead + Unpin,
{
let mut block_reader = read_block(reader).await?;

let len = block_reader.read_i32_le().await.and_then(|n| {
u64::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})?;

let mut parser = sam::header::Parser::default();

let mut header_reader = BufReader::new(block_reader.take(len));
let mut buf = Vec::new();

while read_line(&mut header_reader, &mut buf).await? != 0 {
parser
.parse_partial(&buf)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
}

Ok(parser.finish())
}

async fn read_line<R>(reader: &mut R, dst: &mut Vec<u8>) -> io::Result<usize>
where
R: AsyncBufRead + Unpin,
{
const LINE_FEED: u8 = b'\n';
const CARRIAGE_RETURN: u8 = b'\r';

dst.clear();

match reader.read_until(LINE_FEED, dst).await? {
0 => Ok(0),
n => {
if dst.ends_with(&[LINE_FEED]) {
dst.pop();

if dst.ends_with(&[CARRIAGE_RETURN]) {
dst.pop();
}
}

Ok(n)
pub(super) fn new(inner: R, len: u64) -> Self {
Self {
inner: inner.take(len),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_read_raw_sam_header() -> io::Result<()> {
use sam::header::record::value::{
map::{self, header::Version},
Map,
};

const RAW_HEADER: &str = "@HD\tVN:1.6\n";

let mut src = vec![
0x00, // compression method = none (0)
0x00, // content type = file header (0)
0x00, // block content ID
0x0f, // compressed size = 15
0x0f, // uncompressed size = 15
];
src.extend(11i32.to_le_bytes());
src.extend(RAW_HEADER.as_bytes());
src.extend([0x00, 0x00, 0x00, 0x00]); // CRC32

let mut reader = &src[..];
let actual = read_sam_header(&mut reader).await?;

let expected = sam::Header::builder()
.set_header(Map::<map::Header>::new(Version::new(1, 6)))
.build();

assert_eq!(actual, expected);

Ok(())
}

#[tokio::test]
async fn test_read_raw_sam_header_with_invalid_compression_method() {
let src = [
0x03, // compression method = LZMA (3)
0x00, // content type = file header (0)
0x00, // block content ID
0x0f, // compressed size = 15
0x0f, // uncompressed size = 15
// ...
0x00, 0x00, 0x00, 0x00, // CRC32
];

let mut reader = &src[..];
assert!(matches!(
read_sam_header(&mut reader).await,
Err(e) if e.kind() == io::ErrorKind::InvalidData
));
}

#[tokio::test]
async fn test_read_raw_sam_header_with_invalid_content_type() {
let src = [
0x00, // compression method = none (0)
0x04, // content type = external data (4)
];
pub(super) async fn raw_sam_header_reader(
&mut self,
) -> io::Result<impl AsyncRead + Unpin + '_> {
let mut reader = read_block(&mut self.inner).await?;

let mut reader = &src[..];
let len = reader.read_i32_le().await.and_then(|n| {
u64::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})?;

assert!(matches!(
read_sam_header(&mut reader).await,
Err(e) if e.kind() == io::ErrorKind::InvalidData
));
Ok(reader.take(len))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::r#async::io::reader::{
CrcReader,
};

pub async fn read_header<R>(reader: &mut R) -> io::Result<u64>
pub(crate) async fn read_header<R>(reader: &mut R) -> io::Result<u64>
where
R: AsyncRead + Unpin,
{
Expand Down
3 changes: 1 addition & 2 deletions noodles-cram/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ where
/// # Ok::<(), io::Error>(())
/// ```
pub fn read_file_header(&mut self) -> io::Result<sam::Header> {
use self::header::container::read_header_container;
read_header_container(&mut self.inner)
header::read_file_header(&mut self.inner)
}

/// Reads the SAM header.
Expand Down
Loading

0 comments on commit 0770b2a

Please sign in to comment.