Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXP] make lineage utils more flexible #2371

Closed
wants to merge 40 commits into from
Closed
Changes from 2 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
9089ba0
need taxid; classes help. Also think bout LINS while at it
bluegenes Nov 21, 2022
0542512
minor text fixes
bluegenes Nov 21, 2022
75571b7
Merge branch 'latest' into tax-explore
bluegenes Dec 19, 2022
dffceec
init tests
bluegenes Dec 19, 2022
8afb9a2
test lineage_str input; display lineage
bluegenes Dec 20, 2022
abec6ee
test display_taxid; lineage_match
bluegenes Dec 20, 2022
c04efbe
lineageD
bluegenes Dec 20, 2022
9d35af0
simplify methods a bit
bluegenes Dec 20, 2022
e11d293
tree and lca functionality
bluegenes Dec 20, 2022
62379f4
load from dict to allow csv loading
bluegenes Dec 20, 2022
67935da
more tests
bluegenes Dec 20, 2022
0c1e486
add classes for summarizing tax info
bluegenes Dec 22, 2022
c1574e4
Merge branch 'latest' into tax-explore
bluegenes Dec 22, 2022
ef82d66
summarize at each rank
bluegenes Dec 22, 2022
106f390
seen_perfect notification
bluegenes Dec 22, 2022
7d927b9
start trying to use new framework
bluegenes Dec 22, 2022
c8d2bce
rework writing fns for new framework
bluegenes Dec 23, 2022
5338268
rm note
bluegenes Dec 23, 2022
cd6d5c5
init tests for TaxResult,QueryTaxResult classes
bluegenes Dec 30, 2022
32bb504
taxcomparison tests
bluegenes Dec 31, 2022
9755b4d
rm old summarize_gather_at
bluegenes Dec 31, 2022
dcbe05e
agg lins; format for krona
bluegenes Jan 3, 2023
00b61a9
ksize to int; upd testing util
bluegenes Jan 3, 2023
3fef5bf
add second summarization test
bluegenes Jan 3, 2023
3871b51
test krona and agg lins
bluegenes Jan 3, 2023
c34df3d
init upd test_tax for new framework
bluegenes Jan 3, 2023
06333e6
whoops, we actually need the query names -- add back.
bluegenes Jan 3, 2023
e86b349
need to specify order=True to allow sorting
bluegenes Jan 3, 2023
88d0137
fix summarization for missing tax and skipped idents
bluegenes Jan 3, 2023
1a2b69f
use GatherRow class to make sure essential column names are present (…
bluegenes Jan 3, 2023
905c342
init annot changes
bluegenes Jan 3, 2023
0504cb2
init annot changes
bluegenes Jan 3, 2023
776f043
Merge branch 'latest' into tax-explore
bluegenes Jan 3, 2023
3e7b118
init genome classification changes
bluegenes Jan 4, 2023
3042d6f
init more methods into queryResult class
bluegenes Jan 4, 2023
98b8a0e
make frozen so can use RankLineageInfo as dict keys
bluegenes Jan 5, 2023
901e446
propagate modifications
bluegenes Jan 5, 2023
dae2a3c
rm classification logic from main
bluegenes Jan 5, 2023
3880249
work on writing methods
bluegenes Jan 5, 2023
a0a4516
write_summary
bluegenes Jan 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions src/sourmash/tax/taxcomparsion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""
bluegenes marked this conversation as resolved.
Show resolved Hide resolved
Taxonomic Information Classes
"""
from dataclasses import dataclass

@dataclass
class LineagePair:
"""Class for storing per-rank lineage information"""
name: str = None
rank: str = None

def is_empty(self):
return any(self.name is None, self.rank is None)

@dataclass
class LineageTuple(LineagePair):
"""Class for storing per-rank lineage information"""
taxid: int = None # taxid allowed to be empty


@dataclass
class BaseLineageInfo:
ranks: list = None
lineage: tuple = None # tuple of LineageTuples/LineagePairs
lineage_str: str = None # ';'- or ','-separated str of lineage names

def __post_init__(self):
"Initialize according to passed values"
if self.lineage is None:
if self.ranks is not None:
if self.lineage_str is not None:
self.lineage = self.make_lineage(self.lineage_str)
else:
self.lineage= self.init_empty()
else:
raise ValueError(f"Must provide ordered ranks for {self.lineage_str}")
else:
if self.ranks is not None:
self.validate_lineage()
else:
self.ranks = [a.rank for a in self.lineage]

def taxlist(self):
return self.ranks

def ascending_taxlist(self):
return self.ranks[::-1]

def init_empty(self):
'initialize empty genome lineage'
if self.lineage != None:
raise ValueError("lineage not empty")
for rank in self.ranks:
self.lineage.append(LineageTuple(rank=rank))

def validate_lineage(self):
"Check if all lineage ranks are in allowed ranks; check that they are in order"
for taxrank, lin in zip(self.ranks, self.lineage):
if lin.rank != taxrank:
raise ValueError(f'incomplete lineage at {taxrank} - is {lin.rank} instead')
if lin.rank not in self.ranks:
raise ValueError(f"Error: Lineage not valid. Rank {lin.rank} not in set ranks: {self.ranks}")

def make_lineage(self, lin):
"Turn a ; or ,-separated set of lineages into a tuple of LineageTuple objs."
new_lin = lin.split(';')
if len(new_lin) == 1:
new_lin = lin.split(',')
new_lin = [ LineageTuple(rank=rank, name=n) for (rank, n) in zip(self.ranks, new_lin) ]
self.lineage=tuple(new_lin)

def zip_lineage(self, truncate_empty=False):
"""
Return lineage names as a list
"""
zipped = [a.name for a in self.lineage]
# eliminate empty if so requested
if truncate_empty:
empty = ""
last_lineage_name = zipped[-1]
while zipped and last_lineage_name == empty:
zipped.pop(-1)
if zipped:
last_lineage_name = zipped[-1]
return zipped

def zip_taxid(self, truncate_empty=False):
"""
Return taxids as a list
"""
zipped = [a.taxid for a in self.lineage]
# eliminate empty if so requested
if truncate_empty:
empty = ""
last_lineage_taxid = zipped[-1]
while zipped and last_lineage_taxid == empty:
zipped.pop(-1)
if zipped:
last_lineage_taxid = zipped[-1]
return zipped

def display_lineage(self, truncate_empty=False):
"Return lineage names as ';'-separated list"
return ";".join(self.zip_lineage(truncate_empty=truncate_empty))

def display_taxid(self, truncate_empty=False):
"Return lineage taxids as ';'-separated list"
return ";".join(self.zip_taxid(truncate_empty=truncate_empty))

def is_lineage_match(self, other, rank):
"""
check to see if two lineages are a match down to given rank.
"""
if not other.ranks == self.ranks: # check same ranks
raise ValueError("Cannot compare lineages from taxonomies with different ranks.")
if rank not in self.ranks: # rank is available
raise ValueError(f"Desired Rank {rank} not available for this lineage")
for a, b in zip(self.lineage, other.lineage):
assert a.rank == b.rank
if a.rank == rank:
if a == b:
return 1
if a != b:
return 0

return 0

def pop_to_rank(self, rank):
"Remove lineage tuples from given lineage `lin` until `rank` is reached."
if rank not in self.ranks:
raise ValueError(f"Desired Rank {rank} not available for this lineage")

before_rank = []
for tax_rank in self.ranks:
if tax_rank != rank:
before_rank.append(tax_rank)
else:
break
# are we already above rank?
if self.lineage and self.lineage[-1].rank in before_rank:
return tuple(self.lineage)

# if not, get lineage at this rank
while self.lineage and self.lineage[-1].rank != rank:
self.lineage.pop()

return tuple(self.lineage)


@dataclass
class LineageInfoRanks(BaseLineageInfo):
"""Class for storing multi-rank lineage information"""
ranks: list = ['superkingdom', 'phylum', 'class', 'order', 'family','genus', 'species']
include_strain: bool = False

def __post_init__(self):
"Initialize according to passed values"
if self.include_strain:
self.ranks.append("strain")
if self.lineage is None:
if self.lineage_str is not None:
self.lineage = self.make_lineage(self.lineage_str)
else:
self.lineage= self.init_empty()
else:
self.validate_lineage()


@dataclass
class LineageInfoLINS(BaseLineageInfo):
"""Class for storing multi-rank lineage information"""
ranks: list
## WHAT special considerations do we have here?

def __post_init__(self):
"Initialize according to passed values"
if self.lineage is None:
if self.ranks is not None:
if self.lineage_str is not None:
self.lineage = self.make_lineage(self.lineage_str)
else:
self.lineage= self.init_empty()
else:
raise ValueError(f"Must provide ordered ranks for {self.lineage_str}")
else:
if self.ranks is not None:
self.validate_lineage()
else:
self.ranks = [a.rank for a in self.lineage]

def zip_taxid(self, truncate_empty=False):
raise NotImplementedError

def display_taxid(self, truncate_empty=False):
raise NotImplementedError


# not sure where to go here.. we already have MultiLineagesDB .. can we use/mod that instead?
#@dataclass
#class MultiLineages:
# # NOTE: explicitly allow any ranks so this will worth with LINS
# """Class for manipulating groups of LineageInfo"""
# lineages: # list of LineageInfo??
#
# def build_tree(self):
# return self
# def find_lca(self):
# return self


@dataclass
class QueryInfo: # prob don't need this if we just have iall info in Base gather result
res: list = None

@dataclass
class BaseGatherResult:
res: list = None

@dataclass
class SummarizedGatherResult(BaseGatherResult):
res: list = None

@dataclass
class ClassificationResult(BaseGatherResult):
res: list = None