Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NCBI Compress - parallel split by taxid #314

Merged
merged 8 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 10 additions & 50 deletions workflows/index-generation/index-generation.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,11 @@ workflow index_generation {
}

if (!skip_protein_compression) {
call SortFasta as SortFastaNR {
input:
fasta = DownloadNR.nr,
combined_sorted_path = "combined_sorted_nr.fa",
cpu = 64,
docker_image_id = docker_image_id
}

call CompressDatabase as CompressNR {
input:
database_type = "nr",
sorted_fasta = SortFastaNR.sorted,
fasta = DownloadNR.nr,
accession2taxid_files = [DownloadAccession2Taxid.pdb, DownloadAccession2Taxid.prot],
k = nr_compression_k,
scaled = nr_compression_scaled,
Expand All @@ -77,18 +70,10 @@ workflow index_generation {
File nr_or_compressed = select_first([CompressNR.compressed, DownloadNR.nr])

if (!skip_nuc_compression) {
call SortFasta as SortFastaNT {
input:
fasta = DownloadNT.nt,
combined_sorted_path = "combined_sorted_nt.fa",
cpu = 64,
docker_image_id = docker_image_id
}

call CompressDatabase as CompressNT {
input:
database_type = "nt",
sorted_fasta = SortFastaNT.sorted,
fasta = DownloadNT.nt,
accession2taxid_files = [DownloadAccession2Taxid.nucl_wgs, DownloadAccession2Taxid.nucl_gb],
k = nt_compression_k,
scaled = nt_compression_scaled,
Expand Down Expand Up @@ -617,40 +602,10 @@ task GenerateIndexMinimap2 {
}
}

task SortFasta {
input {
File fasta
String combined_sorted_path
Int cpu
Int threads = if cpu * 0.6 < 1 then 1 else floor(cpu * 0.6)
String docker_image_id
}

command <<<
# Sort NT/NR by length with the longer sequences first
# This is needed because the downstream compression algorithm iterates through NT/NR in order only emitting
# sequences if they are not contained by what it has already seen. If a shorter sequence is
# contained by a longer sequence, and the shorter sequence were to come first, it would be emitted
# even though it is redundant to the longer sequence.
set -euxo pipefail

ncbi-compress sort-fasta-by-sequence-length \
--input-fasta ~{fasta} \
--output ~{combined_sorted_path}
>>>
output {
File sorted = combined_sorted_path
}
runtime {
docker: docker_image_id
cpu: cpu
}
}

task CompressDatabase {
input {
String database_type = "nt" # nt or nr
File sorted_fasta
File fasta
Array[File] accession2taxid_files
Int k
Int scaled
Expand All @@ -664,19 +619,24 @@ task CompressDatabase {

READS_BY_TAXID_PATH=reads_by_taxid
SPLIT_APART_TAXID_DIR_NAME=split_apart_taxid
SORTED_TAXID_DIR_NAME=sorted_taxid

# It is critical that this split happens in the same step as the compression
# If the directory is a step output it will be uploaded, which takes an enormous amount of time
ncbi-compress break-into-individual-taxids-only \
--input-fasta ~{sorted_fasta} \
--input-fasta ~{fasta} \
--accession-mapping-files ~{sep=" " accession2taxid_files} \
--output-dir $READS_BY_TAXID_PATH

ncbi-compress sort-taxid-dir-by-sequence-length \
--input-taxid-dir $READS_BY_TAXID_PATH \
--output-taxid-dir $SORTED_TAXID_DIR_NAME

mkdir $SPLIT_APART_TAXID_DIR_NAME

if [ "~{logging_enabled}" == "true" ]; then
ncbi-compress fasta-compress-from-taxid-dir ~{if database_type == "nr" then "--is-protein-fasta" else ""} \
--input-fasta-dir $READS_BY_TAXID_PATH \
--input-fasta-dir $SORTED_TAXID_DIR_NAME \
--output-fasta ~{database_type}_compressed.fa \
--k ~{k} \
--scaled ~{scaled} \
Expand Down
27 changes: 14 additions & 13 deletions workflows/index-generation/ncbi-compress/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod commands {
use tempdir::TempDir;

use crate::ncbi_compress::ncbi_compress;
use crate::fasta_tools::fasta_tools;

fn fasta_compress_w_logging_option(
input_fasta_path: &str,
Expand Down Expand Up @@ -254,19 +255,21 @@ pub mod commands {
fs::create_dir_all(&temp_file_output_dir).expect("Error creating output directory");
let temp_taxid_dir = TempDir::new_in(temp_file_output_dir, "accessions_by_taxid")
.expect("Error creating tempdir");
let temp_taxid_dir_str = temp_taxid_dir.path().to_str().unwrap();

// let taxid_dir_str = format!("{}/accessions_by_taxid", temp_file_output_dir);
let taxid_dir = ncbi_compress::split_accessions_by_taxid(
let temp_taxid_dir_str = format!("{}/accessions_by_taxid", temp_file_output_dir);
;
ncbi_compress::split_accessions_by_taxid(
input_fasta_path,
accession_mapping_files,
&temp_taxid_dir_str,
);

let sorted_taxid_dir = format!("{}/sorted_taxid_dir", temp_file_output_dir);
fasta_tools::sort_taxid_dir_by_sequence_length(&temp_taxid_dir_str, &sorted_taxid_dir);

let mut accession_count = 0;
let mut unique_accession_count = 0;
let mut writer = fasta::Writer::to_file(output_fasta_path).unwrap();
for (_i, entry) in fs::read_dir(taxid_dir).unwrap().enumerate() {
for (_i, entry) in fs::read_dir(sorted_taxid_dir).unwrap().enumerate() {
let entry = entry.unwrap();
let path = entry.path();
let input_fasta_path = path.to_str().unwrap();
Expand Down Expand Up @@ -300,8 +303,7 @@ mod tests {
#[test]
fn test_fasta_compress_from_fasta_skip_split_by_taxid() {
let input_fasta_path = "test_data/fasta_tools/inputs/nt";
let truth_fasta_path =
"test_data/commands/fasta_compress_from_fasta_skip_split_by_taxid/truth-ouputs/nt_out.fa";
let truth_fasta_path = "test_data/commands/common_truth_output/nt_out.fa";
let test_directory = tempdir().unwrap();
let temp_dir_path_str = test_directory.path().to_str().unwrap();
let test_fasta_path = format!("{}/nt.fa", temp_dir_path_str);
Expand Down Expand Up @@ -337,6 +339,7 @@ mod tests {

#[test]
fn test_fasta_compress_from_fasta_end_to_end() {

let input_fasta_path = "test_data/fasta_tools/inputs/nt";
let truth_fasta_path = "test_data/commands/common_truth_output/nt_out.fa";
let test_directory = tempdir().unwrap();
Expand Down Expand Up @@ -365,7 +368,6 @@ mod tests {
let enable_sequence_retention_logging = false;
let logging_contained_in_tree_fn = "";
let logging_contained_in_chunk_fn = "";

commands::fasta_compress_end_to_end(
input_fasta_path,
input_mapping_file_paths,
Expand All @@ -382,8 +384,8 @@ mod tests {
logging_contained_in_tree_fn,
logging_contained_in_chunk_fn,
);
util::compare_fasta_records_from_files(&truth_fasta_path, &test_fasta_path);

assert!(util::are_files_equal(truth_fasta_path, &test_fasta_path));
}

#[test]
Expand Down Expand Up @@ -422,12 +424,11 @@ mod tests {
logging_contained_in_chunk_fn,
);

assert!(util::are_files_equal(truth_fasta_path, &output_fasta_path));
util::compare_fasta_records_from_files(truth_fasta_path, &output_fasta_path);
}

#[test]
fn test_split_fasta_into_chunks() {
use crate::util::util::create_fasta_records_from_file;
use bio::io::fasta;

use crate::commands::commands::count_fasta_reads;
Expand Down Expand Up @@ -469,9 +470,9 @@ mod tests {
.unwrap();

let mut chunk_1_reader =
create_fasta_records_from_file(format!("{}/123_1.fa", temp_dir_path_str).as_str());
util::create_fasta_records_from_file(format!("{}/123_1.fa", temp_dir_path_str).as_str());
let mut chunk_2_reader =
create_fasta_records_from_file(format!("{}/123_2.fa", temp_dir_path_str).as_str());
util::create_fasta_records_from_file(format!("{}/123_2.fa", temp_dir_path_str).as_str());

let mut expected_chunk_1_records = vec![
fasta::Record::with_attrs("1", None, b"ACGT"),
Expand Down
62 changes: 48 additions & 14 deletions workflows/index-generation/ncbi-compress/src/fasta_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod fasta_tools {

use bio::io::fasta;
use rayon::slice::ParallelSliceMut;
use rayon::prelude::*;


struct OffsetWriter<'a> {
file: &'a mut fs::File,
Expand Down Expand Up @@ -83,6 +85,8 @@ pub mod fasta_tools {
let mut records = fasta::Reader::from_file(&input_fasta_path)
.unwrap() // unwrap left here because this is an anyhow error
.records();


while let Some(Ok(record)) = records.next() {
fasta::Writer::new(&mut scratch).write_record(&record)?;
let sequence_length = record.seq().len();
Expand Down Expand Up @@ -142,6 +146,27 @@ pub mod fasta_tools {
}
Ok(())
}

pub fn sort_taxid_dir_by_sequence_length(input_taxid_dir: &str, output_taxid_dir: &str) {
fs::create_dir_all(&output_taxid_dir).expect("Error creating output directory");

// Read the directory and collect entries
let entries: Vec<_> = fs::read_dir(input_taxid_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.collect();

entries.par_iter().for_each(|entry| {
let path = entry.path();
let input_fasta_path = path.to_str().unwrap();
let input_fasta_basename = path.file_name().unwrap().to_str().unwrap();
let output_fasta_path = format!("{}/{}", output_taxid_dir, input_fasta_basename);
let _ = sort_fasta_by_sequence_length(
input_fasta_path,
&output_fasta_path,
);
});
}
}

#[cfg(test)]
Expand All @@ -151,6 +176,7 @@ mod tests {
use tempfile::tempdir;

use crate::fasta_tools::fasta_tools;
use crate::util::util;

// Define a struct to hold both ID and sequence
#[derive(Eq, PartialEq)]
Expand All @@ -174,29 +200,19 @@ mod tests {

#[test]
fn test_sort_fasta_by_sequence_length() {
use crate::util::util::create_fasta_records_from_file;
let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("sorted.fa");
let output_truth_fasta_file = "test_data/fasta_tools/truth_outputs/nt.sorted";
let input_fasta_file = "test_data/fasta_tools/inputs/nt";
let output_truth_fasta_file = "test_data/fasta_tools/truth_outputs/sorted_taxids/9771.fasta";
let input_fasta_file = "test_data/fasta_tools/inputs/unordered_taxids/9771.fasta";

let temp_file_path_str = temp_file.to_str().unwrap();
fasta_tools::sort_fasta_by_sequence_length(input_fasta_file, temp_file_path_str).unwrap();

let mut test_data_records = create_fasta_records_from_file(&temp_file_path_str);
let mut truth_data_records = create_fasta_records_from_file(&output_truth_fasta_file);

assert_eq!(
truth_data_records.sort(),
test_data_records.sort(),
"Files do not match: {:?}",
temp_file_path_str
);
util::compare_fasta_records_from_files(output_truth_fasta_file, temp_file_path_str);
}

#[test]
fn test_count_accessions_by_taxid() {
use crate::util::util::are_files_equal;
let output_truth_tsv_file = "test_data/fasta_tools/truth_outputs/count_accessions_by_taxid/output_counts.tsv";

let test_truth_tsv_file = tempfile::NamedTempFile::new().unwrap();
Expand All @@ -207,7 +223,25 @@ mod tests {
test_truth_tsv_file_path_str,
);

are_files_equal(output_truth_tsv_file, test_truth_tsv_file_path_str);
assert!(util::are_files_equal(output_truth_tsv_file, test_truth_tsv_file_path_str))
}

#[test]
fn test_sort_taxid_dir_by_sequence_length() {
use crate::util::util::compare_fasta_records_from_files;
let temp_dir = tempdir().unwrap();
let temp_dir_path_str = temp_dir.path().to_str().unwrap();
let output_truth_taxid_dir = "test_data/fasta_tools/truth_outputs/sorted_taxids";
let input_taxid_dir = "test_data/fasta_tools/inputs/unordered_taxids";

fasta_tools::sort_taxid_dir_by_sequence_length(input_taxid_dir, temp_dir_path_str);

for entry in std::fs::read_dir(temp_dir_path_str).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
let output_fasta_file = path.to_str().unwrap();
let truth_fasta_file = format!("{}/{}", output_truth_taxid_dir, path.file_name().unwrap().to_str().unwrap());
assert!(util::are_files_equal(output_fasta_file, &truth_fasta_file));
}
}
}
27 changes: 26 additions & 1 deletion workflows/index-generation/ncbi-compress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use ncbi_compress::commands::commands::{
fasta_compress_end_to_end, fasta_compress_from_fasta_skip_split_by_taxid,
fasta_compress_from_taxid_dir,
};
use ncbi_compress::fasta_tools::fasta_tools::{sort_fasta_by_sequence_length, count_accessions_by_taxid};
use ncbi_compress::fasta_tools::fasta_tools::{
sort_fasta_by_sequence_length,
count_accessions_by_taxid,
sort_taxid_dir_by_sequence_length
};
use ncbi_compress::ncbi_compress::ncbi_compress::split_accessions_by_taxid;

use ncbi_compress::logging::logging;
Expand All @@ -31,6 +35,22 @@ pub fn main() {
.required(true),
),
)
.subcommand(
Command::new("sort-taxid-dir-by-sequence-length")
.about("sort_taxid_dir_by_sequence_length")
.arg(
Arg::new("input_taxid_dir")
.help("Input directory of taxid fasta files")
.long("input-taxid-dir")
.required(true),
)
.arg(
Arg::new("output_taxid_dir")
.help("Output directory for the sorted taxid fasta files")
.long("output-taxid-dir")
.required(true),
),
)
.subcommand(
Command::new("break-into-individual-taxids-only")
.about("break_into_individual_taxids_only")
Expand Down Expand Up @@ -337,6 +357,11 @@ pub fn main() {
let output_fasta = sub_m.get_one::<String>("output_fasta").unwrap();
sort_fasta_by_sequence_length(input_fasta, output_fasta).unwrap();
}
Some(("sort-taxid-dir-by-sequence-length", sub_m)) => {
let input_taxid_dir = sub_m.get_one::<String>("input_taxid_dir").unwrap();
let output_taxid_dir = sub_m.get_one::<String>("output_taxid_dir").unwrap();
sort_taxid_dir_by_sequence_length(&input_taxid_dir, &output_taxid_dir);
}
Some(("break-into-individual-taxids-only", sub_m)) => {
let input_fasta = sub_m.get_one::<String>("input_fasta").unwrap();
let accession_mapping_files: Vec<String> = sub_m
Expand Down
Loading
Loading