Skip to content

Commit

Permalink
feat: Add wrapper of BGZF writer (#349)
Browse files Browse the repository at this point in the history
* Add wrapper of BGZF writer
  • Loading branch information
jemma-nelson authored Jul 5, 2022
1 parent b130634 commit 965ed88
Show file tree
Hide file tree
Showing 3 changed files with 360 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/bam/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ impl Writer {
///
/// * Uncompressed: No compression, zlib level 0
/// * Fastest: Lowest compression level, zlib level 1
/// * Maxium: Highest compression level, zlib level 9
/// * Maximum: Highest compression level, zlib level 9
/// * Level(i): Custom compression level in the range [0, 9]
#[derive(Debug, Clone, Copy)]
pub enum CompressionLevel {
Expand Down
356 changes: 356 additions & 0 deletions src/bgzf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,177 @@ impl std::io::Read for Reader {
}
}

/// The CompressionLevel used by the underlying GZIP writer
/// Note that the special level NoCompression will not use the GZIP writer.
/// Compression levels in BGZF files
///
/// * Uncompressed: No compression, zlib level 0
/// * Fastest: Lowest compression level, zlib level 1
/// * Maximum: Highest compression level, zlib level 9
/// * Default: Default compression level, zlib level 6
/// * Level(i): Custom compression level in the range [0, 9]
/// * NoCompression: No compression, zlib not used. Output will be identical to input
#[derive(Debug, Clone, Copy)]
pub enum CompressionLevel {
Default,
NoCompression,
Uncompressed,
Fastest,
Maximum,
Level(i8),
}
impl CompressionLevel {
// Convert and check the variants of the `CompressionLevel` enum to a numeric level
fn convert(self) -> Result<i8> {
match self {
CompressionLevel::NoCompression => Ok(-2),
CompressionLevel::Default => Ok(-1),
CompressionLevel::Uncompressed => Ok(0),
CompressionLevel::Fastest => Ok(1),
CompressionLevel::Maximum => Ok(9),
CompressionLevel::Level(i @ -2..=9) => Ok(i),
CompressionLevel::Level(i) => Err(Error::BgzfInvalidCompressionLevel { level: i }),
}
}
}

/// A writer that writes uncompressed, gzip, and bgzip files.
#[derive(Debug)]
pub struct Writer {
inner: *mut htslib::BGZF,
tpool: Option<ThreadPool>,
}

impl Writer {
/// Create a new Writer to write to stdout with default compression.
pub fn from_stdout() -> Result<Self, Error> {
Self::from_stdout_with_compression(CompressionLevel::Default)
}

/// Create a new Writer to write to stdout with specific compression
///
/// # Arguments
///
/// * `level` the compression level to use
pub fn from_stdout_with_compression(level: CompressionLevel) -> Result<Self, Error> {
Self::new(b"-", level)
}

/// Create a new Writer from a path with default compression.
///
/// # Arguments
///
/// * `path` - the path to open.
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
Self::from_path_with_level(path, CompressionLevel::Default)
}

/// Create a new Writer from a path with a specific compression level.
///
/// # Arguments
///
/// * `path` - the path to open.
pub fn from_path_with_level<P: AsRef<Path>>(
path: P,
level: CompressionLevel,
) -> Result<Self, Error> {
Self::new(&path_as_bytes(path, false)?, level)
}

/// Internal function to create a Writer from a file path
///
/// # Arguments
///
/// * `path` - the path or URL to open
fn new(path: &[u8], level: CompressionLevel) -> Result<Self, Error> {
let mode = Self::get_open_mode(level)?;
let cpath = ffi::CString::new(path).unwrap();
let inner = unsafe { htslib::bgzf_open(cpath.as_ptr(), mode.as_ptr()) };
Ok(Self { inner, tpool: None })
}

/// Internal function to convert compression level to "mode"
/// bgzf.c expects mode for writers to be one of: 'w', 'wu', 'w#', where # is 0-9.
/// # Arguments
///
/// * `level` - the level of compression to use
fn get_open_mode(level: CompressionLevel) -> Result<ffi::CString, Error> {
let write_string = match level.convert() {
Ok(-2) => "wu".to_string(),
Ok(-1) => "w".to_string(),
Ok(n @ 0..=9) => format!("w{}", n),
Err(e) => return Err(e),
// This should be unreachable
Ok(i) => return Err(Error::BgzfInvalidCompressionLevel { level: i }),
};
return Ok(ffi::CString::new(write_string).unwrap());
}

/// Set the thread pool to use for parallel compression.
///
/// # Arguments
///
/// * `tpool` - the thread-pool to use
pub fn set_thread_pool(&mut self, tpool: &ThreadPool) -> Result<()> {
self.tpool = Some(tpool.clone());
let b = tpool.handle.borrow_mut();
let r = unsafe {
htslib::bgzf_thread_pool(self.inner, b.inner.pool as *mut _, 0) // let htslib decide on the queue-size
};

if r != 0 {
Err(Error::ThreadPool)
} else {
Ok(())
}
}
}

impl std::io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let nbytes = unsafe {
htslib::bgzf_write(
self.inner,
buf.as_ptr() as *mut libc::c_void,
buf.len() as u64,
)
};
if nbytes < 0 {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Can not write",
))
} else {
Ok(nbytes as usize)
}
}

fn flush(&mut self) -> std::io::Result<()> {
let exit_code: i32 = unsafe { htslib::bgzf_flush(self.inner) };
if exit_code == 0 {
Ok(())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Can not flush",
))
}
}
}

impl std::ops::Drop for Writer {
fn drop(&mut self) {
unsafe {
htslib::bgzf_close(self.inner);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::io::Read;
use std::io::Write;

// Define paths to the test files
const FN_PLAIN: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test/bgzip/plain.vcf");
Expand Down Expand Up @@ -248,4 +415,193 @@ mod tests {
"Reading bgzip file with correct content using a threadpool"
);
}

#[test]
fn test_write_plain() {
let tmp = tempfile::Builder::new()
.prefix("rust-htslib")
.tempdir()
.expect("Cannot create temp dir");
let out_path = tmp.path().join("test.vcf");
println!("{:?}", out_path);

{
let w_result = Writer::from_path_with_level(&out_path, CompressionLevel::NoCompression);
if let Err(ref e) = w_result {
println!("w_result is {}", e);
}
assert!(w_result.is_ok(), "Create plain file with Bgzip writer");
assert!(out_path.exists(), "Plain file is created with Bgzip writer");
let mut w = w_result.unwrap();
let write_result = w.write_all(CONTENT.as_bytes());
assert!(
write_result.is_ok(),
"Plain file can write with Bgzip writer"
);
} // let Writer fall out of scope and implicitly close
assert!(
!is_bgzip(&out_path).unwrap(),
"NoCompression file should not be detected as BGZIP"
);
let my_content = std::fs::read_to_string(&out_path).unwrap();
assert_eq!(
my_content, CONTENT,
"Writing bgzip file with no compression"
);

tmp.close().expect("Failed to delete temp dir");
}

#[test]
fn test_write_default() {
let tmp = tempfile::Builder::new()
.prefix("rust-htslib")
.tempdir()
.expect("Cannot create temp dir");
let out_path = tmp.path().join("test.vcf.bgzf");
println!("{:?}", out_path);
{
let w_result = Writer::from_path(&out_path);
if let Err(ref e) = w_result {
println!("w_result is {}", e);
}
assert!(w_result.is_ok(), "Create bgzip file with Bgzip writer");
assert!(
std::path::Path::new(&out_path).exists(),
"Bgzip file is created with Bgzip writer"
);
let mut w = w_result.unwrap();
let write_result = w.write_all(CONTENT.as_bytes());
assert!(
write_result.is_ok(),
"Bgzip file can write with Bgzip writer"
);
} // let Writer fall out of scope and implicitly close

// Read in with bgzip reader
let mut my_content = String::new();
Reader::from_path(&out_path)
.unwrap()
.read_to_string(&mut my_content)
.unwrap();
assert_eq!(
my_content, CONTENT,
"Writing bgzip file with default compression"
);

assert!(
is_bgzip(&out_path).unwrap(),
"Default BGZIP file detected as BGZIP"
);
tmp.close().expect("Failed to delete temp dir");
}

#[test]
fn test_write_compression_levels() {
let tmp = tempfile::Builder::new()
.prefix("rust-htslib")
.tempdir()
.expect("Cannot create temp dir");
let out_path = tmp.path().join("test.vcf.bgzf");

// Test all levels except NoCompression
let compression_levels = vec![
CompressionLevel::Fastest,
CompressionLevel::Maximum,
CompressionLevel::Uncompressed,
]
.into_iter()
.chain((-1..=9_i8).map(|n| CompressionLevel::Level(n)));

for level in compression_levels {
{
let w_result = Writer::from_path_with_level(&out_path, level);
if let Err(ref e) = w_result {
println!("w_result is {}", e);
}
assert!(w_result.is_ok(), "Create bgzip file with Bgzip writer");
assert!(
std::path::Path::new(&out_path).exists(),
"Bgzip file is created with Bgzip writer"
);
let mut w = w_result.unwrap();
let write_result = w.write_all(CONTENT.as_bytes());
assert!(
write_result.is_ok(),
"Bgzip file can write with Bgzip writer"
);
} // let Writer fall out of scope and implicitly close

// Read in with bgzip reader
let mut my_content = String::new();
Reader::from_path(&out_path)
.unwrap()
.read_to_string(&mut my_content)
.unwrap();
assert_eq!(
my_content, CONTENT,
"Writing bgzip file with {:?} compression",
level
);

assert!(
is_bgzip(&out_path).unwrap(),
"Writing BGZIP file with {:?} compression detected as BGZIP",
level
);
}
tmp.close().expect("Failed to delete temp dir");
}

#[test]
fn test_write_with_threadpool() {
let tmp = tempfile::Builder::new()
.prefix("rust-htslib")
.tempdir()
.expect("Cannot create temp dir");
let out_path = tmp.path().join("test.vcf.bgzf");

let content = CONTENT.as_bytes();
println!("{:?}", out_path);
{
let w_result = Writer::from_path(&out_path);
if let Err(ref e) = w_result {
println!("w_result is {}", e);
}
assert!(w_result.is_ok(), "Create bgzip file with Bgzip threadpool");
assert!(
std::path::Path::new(&out_path).exists(),
"Bgzip file is created with Bgzip threadpool"
);

let mut w = w_result.unwrap();
let tpool_result = ThreadPool::new(5);
assert!(tpool_result.is_ok(), "Creating thread pool");
let tpool = tpool_result.unwrap();

let set_tpool_result = w.set_thread_pool(&tpool);
assert!(set_tpool_result.is_ok(), "Setting thread pool");

let write_result = w.write_all(content);
assert!(
write_result.is_ok(),
"Bgzip file can write with Bgzip threadpool"
);
} // let Writer fall out of scope and implicitly close

// Read in with bgzip reader
let mut my_content = String::new();
Reader::from_path(&out_path)
.unwrap()
.read_to_string(&mut my_content)
.unwrap();
assert_eq!(my_content, CONTENT, "Writing bgzip file with threadpool");

assert!(
is_bgzip(&out_path).unwrap(),
"Threadpool BGZIP file detected as BGZIP"
);

tmp.close().expect("Failed to delete temp dir");
}
}
3 changes: 3 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,7 @@ pub enum Error {
BcfSetValues,
#[error("failed to remove alleles in BCF/VCF record")]
BcfRemoveAlleles,

#[error("invalid compression level {level}")]
BgzfInvalidCompressionLevel { level: i8 },
}

0 comments on commit 965ed88

Please sign in to comment.