Skip to content

Commit

Permalink
shuffle fasta by seq index (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenixAja authored Feb 5, 2024
1 parent aee5216 commit af541cd
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 8 deletions.
11 changes: 9 additions & 2 deletions workflows/index-generation/index-generation.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -444,21 +444,28 @@ task CompressDatabase {
--logging-contained-in-chunk-fn ~{database_type}_contained_in_chunk.tsv
else
ncbi-compress fasta-compress-from-taxid-dir ~{if database_type == "nr" then "--is-protein-fasta" else ""} \
--input-fasta-dir $READS_BY_TAXID_PATH \
--input-fasta-dir $SORTED_TAXID_DIR_NAME \
--output-fasta ~{database_type}_compressed.fa \
--k ~{k} \
--scaled ~{scaled} \
--similarity-threshold ~{similarity_threshold} \
--split-apart-taxid-dir-name $SPLIT_APART_TAXID_DIR_NAME
fi

// shuffle compressed fasta to distribute the accessions evenly across the file
// this is important for spreading SC2 accessions (and any other large taxid) over
// the chunked minimap2 and diamond indexes which impacts alignment time.
ncbi-compress shuffle-fasta \
--input-fasta ~{database_type}_compressed.fa \
--output-fasta ~{database_type}_compressed_shuffled.fa

# Remove to save space, intermediate files are not cleaned up within a run
# rm -rf $READS_BY_TAXID_PATH
# rm -rf $SPLIT_APART_TAXID_DIR_NAME
>>>

output {
File compressed = "~{database_type}_compressed.fa"
File compressed = "~{database_type}_compressed_shuffled.fa"
File? contained_in_tree = "~{database_type}_contained_in_tree.tsv"
File? contained_in_chunk = "~{database_type}_contained_in_chunk.tsv"
Directory split_apart_taxid_dir = "split_apart_taxid_~{database_type}"
Expand Down
89 changes: 89 additions & 0 deletions workflows/index-generation/ncbi-compress/src/fasta_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ pub mod fasta_tools {
use std::io::Write;
use std::os::unix::prelude::FileExt;

use rand::thread_rng;
use rand::seq::SliceRandom;

use bio::io::fasta;
use rayon::slice::ParallelSliceMut;
use rayon::prelude::*;
Expand Down Expand Up @@ -72,6 +75,66 @@ pub mod fasta_tools {
Ok(())
}

pub fn shuffle_fasta_by_sequence_index(
input_fasta_path: &str,
output_fasta_path: &str,
) -> std::io::Result<()> {
let mut n_bytes_by_sequence_index = HashMap::new();
// pre-allocate scratch space to write to so we know how many bytes a sequence is when
// written. pre-allocate it so we can clear it and we don't need a fresh allocation for
// every record.
let mut scratch = Vec::new();

let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();

for (index, record) in reader.records().enumerate() {
let record = record.unwrap();
fasta::Writer::new(&mut scratch).write_record(&record)?;
n_bytes_by_sequence_index.insert(index, scratch.len() as u64);
// Clear the scratch space so we can re-use it for the next record
scratch.clear();
}

let mut shuffled_indexes: Vec<usize> = n_bytes_by_sequence_index.keys().cloned().collect();
// set seed for testing purposes. This should be a parameter
let mut rng = thread_rng();
shuffled_indexes.shuffle(&mut rng);

// Save memory by creating our offset map in place on top of our n_bytes map
let mut offset_by_sequence_index = n_bytes_by_sequence_index;

let mut offset = 0;
for index in shuffled_indexes {
// Save how many bytes we will need for it
// it is safe to unwrap here because sorted_lengths are the keys of this map
let n_bytes = *offset_by_sequence_index.get(&index).unwrap();
// Overwrite the length with the offset
offset_by_sequence_index.insert(index, offset);
// Add the saved number of bytes to the offset, so the next offset starts at the end of
// the space that this sequence will occupy
offset += n_bytes;
}

let mut writer = fs::File::create(&output_fasta_path)?;

let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
for (index, record) in reader.records().enumerate() {
let record = record.unwrap();
// It is safe to unwrap here because all indexes in the file are in the map from the
// first pass
let offset = offset_by_sequence_index.get(&index).unwrap();
let mut offset_writer = OffsetWriter::new(&mut writer, *offset);

// This scope lets us create a fasta_writer from borrowing offset_writer then drops
// fasta_writer so we can get offset_writer back and read it's local offset
{
let mut fasta_writer = fasta::Writer::new(&mut offset_writer);
fasta_writer.write_record(&record)?;
}
}
Ok(())
}

pub fn sort_fasta_by_sequence_length(
input_fasta_path: &str,
output_fasta_path: &str,
Expand Down Expand Up @@ -172,6 +235,7 @@ pub mod fasta_tools {
#[cfg(test)]
mod tests {
use std::cmp::Ordering;
use std::fs::File;

use tempfile::tempdir;

Expand Down Expand Up @@ -243,4 +307,29 @@ mod tests {
assert!(util::are_files_equal(output_fasta_file, &truth_fasta_file));
}
}

#[test]
fn test_shuffle_fasta() {

let input_fasta_file = "test_data/fasta_tools/inputs/nt";

let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("shuffled.fa");
let temp_file_path_str = temp_file.to_str().unwrap();


let _ = fasta_tools::shuffle_fasta_by_sequence_index(
input_fasta_file,
temp_file_path_str,
);

// make sure the shuffled file contains the same records as the original
util::compare_fasta_records_from_files(input_fasta_file, temp_file_path_str);

// test that the shuffled file has accessions out of order compared to the input
let input_records = util::create_fasta_records_from_file(input_fasta_file);
let shuffled_records = util::create_fasta_records_from_file(temp_file_path_str);
// the test file has 201 records so the chance is very small that the ordering will be the same
assert_ne!(input_records, shuffled_records);
}
}
24 changes: 23 additions & 1 deletion workflows/index-generation/ncbi-compress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use ncbi_compress::commands::commands::{
use ncbi_compress::fasta_tools::fasta_tools::{
sort_fasta_by_sequence_length,
count_accessions_by_taxid,
sort_taxid_dir_by_sequence_length
sort_taxid_dir_by_sequence_length,
shuffle_fasta_by_sequence_index
};
use ncbi_compress::ncbi_compress::ncbi_compress::split_accessions_by_taxid;

Expand Down Expand Up @@ -51,6 +52,22 @@ pub fn main() {
.required(true),
),
)
.subcommand(
Command::new("shuffle-fasta")
.about("shuffle-fasta")
.arg(
Arg::new("input_fasta")
.help("Input fasta file to be shuffled")
.long("input-fasta")
.required(true),
)
.arg(
Arg::new("output_fasta")
.help("Output fasta file for the shuffled fasta")
.long("output-fasta")
.required(true),
),
)
.subcommand(
Command::new("break-into-individual-taxids-only")
.about("break_into_individual_taxids_only")
Expand Down Expand Up @@ -362,6 +379,11 @@ pub fn main() {
let output_taxid_dir = sub_m.get_one::<String>("output_taxid_dir").unwrap();
sort_taxid_dir_by_sequence_length(&input_taxid_dir, &output_taxid_dir);
}
Some(("shuffle-fasta", sub_m)) => {
let input_fasta = sub_m.get_one::<String>("input_fasta").unwrap();
let output_fasta = sub_m.get_one::<String>("output_fasta").unwrap();
let _ = shuffle_fasta_by_sequence_index(&input_fasta, &output_fasta);
}
Some(("break-into-individual-taxids-only", sub_m)) => {
let input_fasta = sub_m.get_one::<String>("input_fasta").unwrap();
let accession_mapping_files: Vec<String> = sub_m
Expand Down
14 changes: 9 additions & 5 deletions workflows/index-generation/ncbi-compress/src/ncbi_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ pub mod ncbi_compress {

log::info!("Creating accession to taxid db");

let taxid_path = Path::new(output_dir);
log::info!("Creating taxid dir {:?}", taxid_path);
log::info!("Creating taxid dir {:?}", output_dir);
let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
// Build a trie of the accessions in the input fasta
reader
Expand Down Expand Up @@ -164,6 +163,12 @@ pub mod ncbi_compress {
});
log::info!("Finished building accession to taxid db");

let outpath = write_accessions_to_taxid(input_fasta_path, &accession_to_taxid, output_dir);
fs::remove_dir_all(dir).expect("Error removing db");
outpath
}

pub fn write_accessions_to_taxid(input_fasta_path: &str, accession_to_taxid: &rocksdb::DB, output_dir: &str) -> std::path::PathBuf {
log::info!("Splitting accessions by taxid");
let reader = fasta::Reader::from_file(&input_fasta_path).unwrap();
// Split the input fasta accessions into one file per taxid
Expand All @@ -183,7 +188,7 @@ pub mod ncbi_compress {
} else {
0 // no taxid found
};
let file_path = taxid_path.join(format!("{}.fasta", taxid));
let file_path = format!("{}/{}.fasta", output_dir, taxid);
let file = fs::OpenOptions::new()
.create(true)
.append(true)
Expand All @@ -192,9 +197,8 @@ pub mod ncbi_compress {
let mut writer = fasta::Writer::new(file);
writer.write_record(&record).unwrap();
}
fs::remove_dir_all(dir).expect("Error removing db");
log::info!("Finished splitting accessions by taxid");
taxid_path.to_path_buf()
Path::new(output_dir).to_path_buf()
}

pub fn fasta_compress_taxid_w_logging(
Expand Down

0 comments on commit af541cd

Please sign in to comment.