Skip to content

Commit

Permalink
bam/async/io/reader/header: Add reader
Browse files Browse the repository at this point in the history
  • Loading branch information
zaeleus committed Dec 17, 2024
1 parent ebf28d5 commit d09e4af
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
56 changes: 45 additions & 11 deletions noodles-bam/src/async/io/reader/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,61 @@ mod magic_number;
mod reference_sequences;
mod sam_header;

use noodles_sam as sam;
use noodles_sam::{self as sam, header::ReferenceSequences};
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt};

use self::{magic_number::read_magic_number, reference_sequences::read_reference_sequences};
use crate::io::reader::header::reference_sequences_eq;
use crate::{io::reader::header::reference_sequences_eq, MAGIC_NUMBER};

struct Reader<'r, R> {
inner: &'r mut R,
}

impl<'r, R> Reader<'r, R>
where
R: AsyncRead + Unpin,
{
fn new(inner: &'r mut R) -> Self {
Self { inner }
}

async fn read_magic_number(&mut self) -> io::Result<[u8; MAGIC_NUMBER.len()]> {
read_magic_number(self.inner).await
}

async fn raw_sam_header_reader(&mut self) -> io::Result<sam_header::Reader<R>> {
let len = self.inner.read_u32_le().await.map(u64::from)?;
Ok(sam_header::Reader::new(self.inner, len))
}

async fn read_reference_sequences(&mut self) -> io::Result<ReferenceSequences> {
read_reference_sequences(self.inner).await
}
}

pub(super) async fn read_header<R>(reader: &mut R) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
let mut header_reader = Reader::new(reader);
read_header_inner(&mut header_reader).await
}

async fn read_header_inner<R>(reader: &mut Reader<'_, R>) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
use crate::io::reader::header::magic_number;

read_magic_number(reader)
reader
.read_magic_number()
.await
.and_then(magic_number::validate)?;

let mut header = read_header_inner(reader).await?;
let reference_sequences = read_reference_sequences(reader).await?;
let mut raw_sam_header_reader = reader.raw_sam_header_reader().await?;
let mut header = read_sam_header(&mut raw_sam_header_reader).await?;

let reference_sequences = reader.read_reference_sequences().await?;

if header.reference_sequences().is_empty() {
*header.reference_sequences_mut() = reference_sequences;
Expand All @@ -33,24 +70,21 @@ where
Ok(header)
}

async fn read_header_inner<R>(reader: &mut R) -> io::Result<sam::Header>
async fn read_sam_header<R>(reader: &mut sam_header::Reader<'_, R>) -> io::Result<sam::Header>
where
R: AsyncRead + Unpin,
{
let l_text = reader.read_u32_le().await.map(u64::from)?;

let mut parser = sam::header::Parser::default();

let mut header_reader = sam_header::Reader::new(reader, l_text);
let mut buf = Vec::new();

while read_line(&mut header_reader, &mut buf).await? != 0 {
while read_line(reader, &mut buf).await? != 0 {
parser
.parse_partial(&buf)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
}

header_reader.discard_to_end().await?;
reader.discard_to_end().await?;

Ok(parser.finish())
}
Expand Down
12 changes: 6 additions & 6 deletions noodles-bam/src/async/io/reader/header/sam_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ use tokio::io::{
};

pin_project! {
pub(super) struct Reader<R> {
pub(super) struct Reader<'r, R> {
#[pin]
inner: BufReader<Take<R>>,
inner: BufReader<Take<&'r mut R>>,
is_eol: bool,
}
}

impl<R> Reader<R>
impl<'r, R> Reader<'r, R>
where
R: AsyncRead + Unpin,
{
pub(super) fn new(inner: R, len: u64) -> Self {
pub(super) fn new(inner: &'r mut R, len: u64) -> Self {
Self {
inner: BufReader::new(inner.take(len)),
is_eol: true,
Expand All @@ -46,7 +46,7 @@ where
}
}

impl<R> AsyncRead for Reader<R>
impl<R> AsyncRead for Reader<'_, R>
where
R: AsyncRead + Unpin,
{
Expand All @@ -66,7 +66,7 @@ where
}
}

impl<R> AsyncBufRead for Reader<R>
impl<R> AsyncBufRead for Reader<'_, R>
where
R: AsyncRead + Unpin,
{
Expand Down

0 comments on commit d09e4af

Please sign in to comment.