Skip to content

Commit

Permalink
MRG: Use core manifest utils (#217)
Browse files Browse the repository at this point in the history
This PR updates manifest generation in `manysketch` to use `Record` and `Manifest` utils from core. This also makes the code significantly simpler, since we can generate manifest information directly from the signature, rather than saving sketch parameters alongside sketches. 

Required `0.13.0` core due to these fixes:
- sourmash-bio/sourmash#3007
- sourmash-bio/sourmash#3019

* Fixes #203
  • Loading branch information
bluegenes authored Feb 23, 2024
1 parent 01cce21 commit 43caeba
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 226 deletions.
361 changes: 255 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ crate-type = ["cdylib"]
pyo3 = { version = "0.20.2", features = ["extension-module", "anyhow"] }
rayon = "1.8.1"
serde = { version = "1.0.196", features = ["derive"] }
sourmash = { version = "0.12.1", features = ["branchwater"] }
sourmash = { version = "0.13.0", features = ["branchwater"] }
serde_json = "1.0.113"
niffler = "2.4.0"
log = "0.4.14"
Expand Down
2 changes: 1 addition & 1 deletion src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box<dyn std:
}

println!("Opening DB");
let db = RevIndex::open(index, true)?;
let db = RevIndex::open(index, true, None)?;

println!("Starting check");
db.check(quick);
Expand Down
1 change: 1 addition & 0 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::utils::{
ReportType,
};

#[allow(clippy::too_many_arguments)]
pub fn fastgather(
query_filepath: String,
against_filepath: String,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ fn do_manysearch(
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn do_fastgather(
query_filename: String,
siglist_path: String,
Expand Down
57 changes: 22 additions & 35 deletions src/manysketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,8 @@ fn parse_params_str(params_strs: String) -> Result<Vec<Params>, String> {
Ok(unique_params.into_iter().collect())
}

fn build_siginfo(
params: &[Params],
moltype: &str,
name: &str,
filename: &Path,
) -> (Vec<Signature>, Vec<Params>) {
fn build_siginfo(params: &[Params], moltype: &str) -> Vec<Signature> {
let mut sigs = Vec::new();
let mut params_vec = Vec::new();

for param in params.iter().cloned() {
match moltype {
Expand All @@ -112,20 +106,11 @@ fn build_siginfo(
.track_abundance(param.track_abundance)
.build();

// let sig = Signature::from_params(&cp); // cant set name with this
let template = sourmash::cmd::build_template(&cp);
let sig = Signature::builder()
.hash_function("0.murmur64")
.name(Some(name.to_string()))
.filename(Some(filename.to_string()))
.signatures(template)
.build();
let sig = Signature::from_params(&cp);
sigs.push(sig);

params_vec.push(param);
}

(sigs, params_vec)
sigs
}

pub fn manysketch(
Expand All @@ -144,7 +129,7 @@ pub fn manysketch(
bail!("No files to load, exiting.");
}

// if output doesnt end in zip, bail
// if output doesn't end in zip, bail
if Path::new(&output)
.extension()
.map_or(true, |ext| ext != "zip")
Expand Down Expand Up @@ -195,7 +180,7 @@ pub fn manysketch(
}

// build sig templates from params
let (mut sigs, sig_params) = build_siginfo(&params_vec, moltype, name, filename);
let mut sigs = build_siginfo(&params_vec, moltype);
// if no sigs to build, skip
if sigs.is_empty() {
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
Expand All @@ -212,36 +197,38 @@ pub fn manysketch(
}
};
// parse fasta and add to signature
let mut set_name = false;
while let Some(record_result) = reader.next() {
match record_result {
Ok(record) => {
// do we need to normalize to make sure all the bases are consistently capitalized?
// let norm_seq = record.normalize(false);
for sig in &mut sigs {
sigs.iter_mut().for_each(|sig| {
if !set_name {
sig.set_name(name);
sig.set_filename(filename.as_str());
set_name = true;
};
if moltype == "protein" {
sig.add_protein(&record.seq()).unwrap();
sig.add_protein(&record.seq())
.expect("Failed to add protein");
} else {
sig.add_sequence(&record.seq(), true).unwrap();
sig.add_sequence(&record.seq(), true)
.expect("Failed to add sequence");
// if not force, panics with 'N' in dna sequence
}
}
}
Err(err) => {
eprintln!("Error while processing record: {:?}", err);
});
}
Err(err) => eprintln!("Error while processing record: {:?}", err),
}
}
Some((sigs, sig_params, filename))

Some(sigs)
})
.try_for_each_with(
send.clone(),
|s: &mut std::sync::Arc<std::sync::mpsc::SyncSender<ZipMessage>>,
(sigs, sig_params, filename)| {
if let Err(e) = s.send(ZipMessage::SignatureData(
sigs,
sig_params,
filename.clone(),
)) {
|s: &mut std::sync::Arc<std::sync::mpsc::SyncSender<ZipMessage>>, sigs| {
if let Err(e) = s.send(ZipMessage::SignatureData(sigs)) {
Err(format!("Unable to send internal data: {:?}", e))
} else {
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/mastiff_manygather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn mastiff_manygather(
bail!("'{}' is not a valid RevIndex database", index);
}
// Open database once
let db = RevIndex::open(index, true)?;
let db = RevIndex::open(index, true, None)?;
println!("Loaded DB");

let query_collection = load_collection(
Expand Down
2 changes: 1 addition & 1 deletion src/mastiff_manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn mastiff_manysearch(
bail!("'{}' is not a valid RevIndex database", index);
}
// Open database once
let db = RevIndex::open(index, true)?;
let db = RevIndex::open(index, true, None)?;

println!("Loaded DB");

Expand Down
7 changes: 5 additions & 2 deletions src/python/tests/test_sketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ def get_test_data(filename):


def make_file_csv(filename, genome_paths, protein_paths = []):
# equalize path lengths by adding "".
names = [os.path.basename(x).split('.fa')[0] for x in genome_paths]
# Check if the number of protein paths is less than genome paths
# and fill in the missing paths with "".
if len(protein_paths) < len(genome_paths):
protein_paths.extend(["" for _ in range(len(genome_paths) - len(protein_paths))])
elif len(genome_paths) < len(protein_paths):
genome_paths.extend(["" for _ in range(len(protein_paths) - len(genome_paths))])
names = [os.path.basename(x).split('.fa')[0] for x in protein_paths]

with open(filename, 'wt') as fp:
fp.write("name,genome_filename,protein_filename\n")
for name, genome_path, protein_path in zip(names, genome_paths, protein_paths):
Expand Down
90 changes: 11 additions & 79 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/// Utility functions for sourmash_plugin_branchwater.
use rayon::prelude::*;
use sourmash::encodings::HashFunctions;
use sourmash::manifest::Manifest;
use sourmash::selection::Select;

use anyhow::{anyhow, Context, Result};
Expand All @@ -19,12 +18,11 @@ use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use sourmash::collection::Collection;
use sourmash::manifest::Record;
use sourmash::manifest::{Manifest, Record};
use sourmash::selection::Selection;
use sourmash::signature::{Signature, SigsTrait};
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::{FSStorage, InnerStorage, SigStore};

/// Track a name/minhash.
pub struct SmallSignature {
Expand Down Expand Up @@ -377,11 +375,11 @@ fn collection_from_pathlist(
let n_failed = AtomicUsize::new(0);
let records: Vec<Record> = lines
.par_iter()
.filter_map(|path| match Signature::from_path(&path) {
.filter_map(|path| match Signature::from_path(path) {
Ok(signatures) => {
let recs: Vec<Record> = signatures
.into_iter()
.flat_map(|v| Record::from_sig(&v, &path))
.flat_map(|v| Record::from_sig(&v, path))
.collect();
Some(recs)
}
Expand Down Expand Up @@ -760,47 +758,6 @@ impl Serialize for BoolPython {
}
}

pub fn make_manifest_row(
sig: &Signature,
filename: &Path,
internal_location: &str,
scaled: u64,
num: u32,
abund: bool,
is_dna: bool,
is_protein: bool,
) -> ManifestRow {
if is_dna && is_protein {
panic!("Both is_dna and is_protein cannot be true at the same time.");
} else if !is_dna && !is_protein {
panic!("Either is_dna or is_protein must be true.");
}
let moltype = if is_dna {
"DNA".to_string()
} else {
"protein".to_string()
};
let sketch = &sig.sketches()[0];
let ksize: u32 = if is_dna {
sketch.ksize() as u32
} else {
sketch.ksize() as u32 / 3
};
ManifestRow {
internal_location: internal_location.to_string(),
md5: sig.md5sum(),
md5short: sig.md5sum()[0..8].to_string(),
ksize: ksize,
moltype,
num,
scaled,
n_hashes: sketch.size(),
with_abundance: BoolPython(abund),
name: sig.name().to_string(),
filename: filename.to_string(),
}
}

pub fn open_stdout_or_file(output: Option<String>) -> Box<dyn Write + Send + 'static> {
// if output is a file, use open_output_file
if let Some(path) = output {
Expand Down Expand Up @@ -845,7 +802,7 @@ impl Hash for Params {
}

pub enum ZipMessage {
SignatureData(Vec<Signature>, Vec<Params>, PathBuf),
SignatureData(Vec<Signature>),
WriteManifest,
}

Expand All @@ -863,18 +820,15 @@ pub fn sigwriter(
.compression_method(zip::CompressionMethod::Stored)
.large_file(true);
let mut zip = zip::ZipWriter::new(file_writer);
let mut manifest_rows: Vec<ManifestRow> = Vec::new();
let mut manifest_rows: Vec<Record> = Vec::new();
// keep track of md5sum occurrences to prevent overwriting duplicates
let mut md5sum_occurrences: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();

while let Ok(message) = recv.recv() {
match message {
ZipMessage::SignatureData(sigs, params, filename) => {
if sigs.len() != params.len() {
bail!("Mismatched lengths of signatures and parameters");
}
for (sig, param) in sigs.iter().zip(params.iter()) {
ZipMessage::SignatureData(sigs) => {
for sig in sigs.iter() {
let md5sum_str = sig.md5sum();
let count = md5sum_occurrences.entry(md5sum_str.clone()).or_insert(0);
*count += 1;
Expand All @@ -884,38 +838,16 @@ pub fn sigwriter(
format!("signatures/{}.sig.gz", md5sum_str)
};
write_signature(sig, &mut zip, options, &sig_filename);
manifest_rows.push(make_manifest_row(
sig,
&filename,
&sig_filename,
param.scaled,
param.num,
param.track_abundance,
param.is_dna,
param.is_protein,
));
let records: Vec<Record> = Record::from_sig(sig, sig_filename.as_str());
manifest_rows.extend(records);
}
}
ZipMessage::WriteManifest => {
println!("Writing manifest");
// Start the CSV file inside the zip
zip.start_file("SOURMASH-MANIFEST.csv", options).unwrap();
// write manifest version line
writeln!(&mut zip, "# SOURMASH-MANIFEST-VERSION: 1.0").unwrap();
// scoped block for csv writing
{
let mut csv_writer = Writer::from_writer(&mut zip);

for row in &manifest_rows {
if let Err(e) = csv_writer.serialize(row) {
eprintln!("Error writing item: {:?}", e);
}
}
// CSV writer must be manually flushed to ensure all data is written
if let Err(e) = csv_writer.flush() {
eprintln!("Error flushing CSV writer: {:?}", e);
}
} // drop csv writer here
let manifest: Manifest = manifest_rows.clone().into();
manifest.to_writer(&mut zip)?;

// Properly finish writing to the ZIP file
if let Err(e) = zip.finish() {
Expand Down

0 comments on commit 43caeba

Please sign in to comment.