Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shuffle fasta by seq index #321

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions workflows/index-generation/index-generation.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -444,21 +444,28 @@ task CompressDatabase {
--logging-contained-in-chunk-fn ~{database_type}_contained_in_chunk.tsv
else
ncbi-compress fasta-compress-from-taxid-dir ~{if database_type == "nr" then "--is-protein-fasta" else ""} \
--input-fasta-dir $READS_BY_TAXID_PATH \
--input-fasta-dir $SORTED_TAXID_DIR_NAME \
--output-fasta ~{database_type}_compressed.fa \
--k ~{k} \
--scaled ~{scaled} \
--similarity-threshold ~{similarity_threshold} \
--split-apart-taxid-dir-name $SPLIT_APART_TAXID_DIR_NAME
fi

// shuffle compressed fasta to distribute the accessions evenly across the file
// this is important for spreading SC2 accessions (and any other large taxid) over
// the chunked minimap2 and diamond indexes which impacts alignment time.
ncbi-compress shuffle-fasta \
--input-fasta ~{database_type}_compressed.fa \
--output-fasta ~{database_type}_compressed_shuffled.fa

# Remove to save space, intermediate files are not cleaned up within a run
# rm -rf $READS_BY_TAXID_PATH
# rm -rf $SPLIT_APART_TAXID_DIR_NAME
>>>

output {
File compressed = "~{database_type}_compressed.fa"
File compressed = "~{database_type}_compressed_shuffled.fa"
File? contained_in_tree = "~{database_type}_contained_in_tree.tsv"
File? contained_in_chunk = "~{database_type}_contained_in_chunk.tsv"
Directory split_apart_taxid_dir = "split_apart_taxid_~{database_type}"
Expand Down
89 changes: 89 additions & 0 deletions workflows/index-generation/ncbi-compress/src/fasta_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ pub mod fasta_tools {
use std::io::Write;
use std::os::unix::prelude::FileExt;

use rand::thread_rng;
use rand::seq::SliceRandom;

use bio::io::fasta;
use rayon::slice::ParallelSliceMut;
use rayon::prelude::*;
Expand Down Expand Up @@ -72,6 +75,66 @@ pub mod fasta_tools {
Ok(())
}

pub fn shuffle_fasta_by_sequence_index(
input_fasta_path: &str,
output_fasta_path: &str,
) -> std::io::Result<()> {
let mut n_bytes_by_sequence_index = HashMap::new();
// pre-allocate scratch space to write to so we know how many bytes a sequence is when
// written. pre-allocate it so we can clear it and we don't need a fresh allocation for
// every record.
let mut scratch = Vec::new();

let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();

for (index, record) in reader.records().enumerate() {
let record = record.unwrap();
fasta::Writer::new(&mut scratch).write_record(&record)?;
n_bytes_by_sequence_index.insert(index, scratch.len() as u64);
// Clear the scratch space so we can re-use it for the next record
scratch.clear();
}

let mut shuffled_indexes: Vec<usize> = n_bytes_by_sequence_index.keys().cloned().collect();
// set seed for testing purposes. This should be a parameter
let mut rng = thread_rng();
shuffled_indexes.shuffle(&mut rng);

// Save memory by creating our offset map in place on top of our n_bytes map
let mut offset_by_sequence_index = n_bytes_by_sequence_index;

let mut offset = 0;
for index in shuffled_indexes {
// Save how many bytes we will need for it
// it is safe to unwrap here because sorted_lengths are the keys of this map
let n_bytes = *offset_by_sequence_index.get(&index).unwrap();
// Overwrite the length with the offset
offset_by_sequence_index.insert(index, offset);
// Add the saved number of bytes to the offset, so the next offset starts at the end of
// the space that this sequence will occupy
offset += n_bytes;
}

let mut writer = fs::File::create(&output_fasta_path)?;

let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
for (index, record) in reader.records().enumerate() {
let record = record.unwrap();
// It is safe to unwrap here because all indexes in the file are in the map from the
// first pass
let offset = offset_by_sequence_index.get(&index).unwrap();
let mut offset_writer = OffsetWriter::new(&mut writer, *offset);

// This scope lets us create a fasta_writer from borrowing offset_writer then drops
// fasta_writer so we can get offset_writer back and read it's local offset
{
let mut fasta_writer = fasta::Writer::new(&mut offset_writer);
fasta_writer.write_record(&record)?;
}
}
Ok(())
}

pub fn sort_fasta_by_sequence_length(
input_fasta_path: &str,
output_fasta_path: &str,
Expand Down Expand Up @@ -172,6 +235,7 @@ pub mod fasta_tools {
#[cfg(test)]
mod tests {
use std::cmp::Ordering;
use std::fs::File;

use tempfile::tempdir;

Expand Down Expand Up @@ -243,4 +307,29 @@ mod tests {
assert!(util::are_files_equal(output_fasta_file, &truth_fasta_file));
}
}

#[test]
fn test_shuffle_fasta() {

let input_fasta_file = "test_data/fasta_tools/inputs/nt";

let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("shuffled.fa");
let temp_file_path_str = temp_file.to_str().unwrap();


let _ = fasta_tools::shuffle_fasta_by_sequence_index(
input_fasta_file,
temp_file_path_str,
);

// make sure the shuffled file contains the same records as the original
util::compare_fasta_records_from_files(input_fasta_file, temp_file_path_str);

// test that the shuffled file has accessions out of order compared to the input
let input_records = util::create_fasta_records_from_file(input_fasta_file);
let shuffled_records = util::create_fasta_records_from_file(temp_file_path_str);
// the test file has 201 records so the chance is very small that the ordering will be the same
assert_ne!(input_records, shuffled_records);
}
}
24 changes: 23 additions & 1 deletion workflows/index-generation/ncbi-compress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use ncbi_compress::commands::commands::{
use ncbi_compress::fasta_tools::fasta_tools::{
sort_fasta_by_sequence_length,
count_accessions_by_taxid,
sort_taxid_dir_by_sequence_length
sort_taxid_dir_by_sequence_length,
shuffle_fasta_by_sequence_index
};
use ncbi_compress::ncbi_compress::ncbi_compress::split_accessions_by_taxid;

Expand Down Expand Up @@ -51,6 +52,22 @@ pub fn main() {
.required(true),
),
)
.subcommand(
Command::new("shuffle-fasta")
.about("shuffle-fasta")
.arg(
Arg::new("input_fasta")
.help("Input fasta file to be shuffled")
.long("input-fasta")
.required(true),
)
.arg(
Arg::new("output_fasta")
.help("Output fasta file for the shuffled fasta")
.long("output-fasta")
.required(true),
),
)
.subcommand(
Command::new("break-into-individual-taxids-only")
.about("break_into_individual_taxids_only")
Expand Down Expand Up @@ -362,6 +379,11 @@ pub fn main() {
let output_taxid_dir = sub_m.get_one::<String>("output_taxid_dir").unwrap();
sort_taxid_dir_by_sequence_length(&input_taxid_dir, &output_taxid_dir);
}
Some(("shuffle-fasta", sub_m)) => {
let input_fasta = sub_m.get_one::<String>("input_fasta").unwrap();
let output_fasta = sub_m.get_one::<String>("output_fasta").unwrap();
let _ = shuffle_fasta_by_sequence_index(&input_fasta, &output_fasta);
}
Some(("break-into-individual-taxids-only", sub_m)) => {
let input_fasta = sub_m.get_one::<String>("input_fasta").unwrap();
let accession_mapping_files: Vec<String> = sub_m
Expand Down
14 changes: 9 additions & 5 deletions workflows/index-generation/ncbi-compress/src/ncbi_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ pub mod ncbi_compress {

log::info!("Creating accession to taxid db");

let taxid_path = Path::new(output_dir);
log::info!("Creating taxid dir {:?}", taxid_path);
log::info!("Creating taxid dir {:?}", output_dir);
let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
// Build a trie of the accessions in the input fasta
reader.records().enumerate().par_bridge().for_each(|(i, result)| {
Expand Down Expand Up @@ -114,6 +113,12 @@ pub mod ncbi_compress {
});
log::info!("Finished building accession to taxid db");

let outpath = write_accessions_to_taxid(input_fasta_path, &accession_to_taxid, output_dir);
fs::remove_dir_all(dir).expect("Error removing db");
outpath
}

pub fn write_accessions_to_taxid(input_fasta_path: &str, accession_to_taxid: &rocksdb::DB, output_dir: &str) -> std::path::PathBuf {
log::info!("Splitting accessions by taxid");
let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
// Split the input fasta accessions into one file per taxid
Expand All @@ -133,7 +138,7 @@ pub mod ncbi_compress {
} else {
0 // no taxid found
};
let file_path = taxid_path.join(format!("{}.fasta", taxid));
let file_path = format!("{}/{}.fasta", output_dir, taxid);
let file = fs::OpenOptions::new()
.create(true)
.append(true)
Expand All @@ -142,9 +147,8 @@ pub mod ncbi_compress {
let mut writer = fasta::Writer::new(file);
writer.write_record(&record).unwrap();
}
fs::remove_dir_all(dir).expect("Error removing db");
log::info!("Finished splitting accessions by taxid");
taxid_path.to_path_buf()
Path::new(output_dir).to_path_buf()
}

pub fn fasta_compress_taxid_w_logging(
Expand Down
Loading