-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathutil.py
210 lines (165 loc) · 7.63 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from basic import *
from all_genes import all_genes
from scipy.cluster import hierarchy
from scipy.spatial import distance
import os
import html_colors
import parse_tsv
verbose = __name__ == '__main__'
# these helper functions used to do more work, now it's a little silly...
def get_rep( gene, organism ):
assert gene.startswith('TR')
return all_genes[organism][gene].rep
def get_mm1_rep( gene, organism ):
assert gene.startswith('TR')
return all_genes[organism][gene].mm1_rep
def get_rep_ignoring_allele( gene, organism ):
rep = get_rep( gene, organism )
rep = rep[:rep.index('*')]
return rep
def get_mm1_rep_gene_for_counting( allele, organism ):
return all_genes[organism][allele].count_rep
def countreps_from_genes( genes, organism ):
return set( ( all_genes[organism][x].count_rep for x in genes ) )
def tree_sort( old_l, distances, return_leaves=True ): ## average linkage
assert len(distances) == len(old_l)
if len(old_l)==1:
leaves = [0]
else:
y = distance.squareform( distances, checks=True )
Z = hierarchy.average( y )
#c,coph_dists = hierarchy.cophenet(Z,y)
leaves = hierarchy.leaves_list( Z )
new_l = [ old_l[x] for x in leaves ]
if not return_leaves:
return new_l
else:
return new_l, leaves
def get_top_genes( blast_hits_string ):
hits = dict( [ ( x.split(':')[0], int( x.split(':')[1] ) ) for x in blast_hits_string.split(';') ] )
top_score = max( hits.values() )
return set( [ x for x,y in hits.iteritems() if y >= top_score ] )
def get_top_reps( blast_hits_string, organism ):
hits = dict( [ ( x.split(':')[0], int( x.split(':')[1] ) ) for x in blast_hits_string.split(';') ] )
top_score = max( hits.values() )
return set( [ all_genes[organism][x].rep for x,y in hits.iteritems() if y >= top_score ] )
def reps_from_genes( genes, organism, mm1=False, trim_allele=False ):
reps = set( ( all_genes[organism][x].mm1_rep for x in genes ) ) if mm1 else \
set( ( all_genes[organism][x].rep for x in genes ) )
if trim_allele:
reps = set( ( x[:x.index('*')] for x in reps ) )
return reps
def readme( pngfile, text ):
"""Generate some readme text associated to an image file, that will be incorporated into the
big html results file by run_basic_analysis.py"""
out = open(pngfile+'.readme','w')
cmd = ' '.join(argv)
out.write("""
<u>Command</u>:
{}
<br><br>
<u>Filename</u>:
{}
<br><br>
<u>Readme</u>:
{}
<br><br>
""".format(cmd, pngfile, text))
out.close()
## setup a mapping that we can use for counting when allowing mm1s and also ignoring alleles
# allele2mm1_rep_gene_for_counting = {}
# def get_mm1_rep_ignoring_allele( gene, organism ): # helper fxn
# rep = get_mm1_rep( gene, organism )
# rep = rep[:rep.index('*')]
# return rep
# for organism in ['human','mouse']:
# allele2mm1_rep_gene_for_counting[ organism ] = {}
# for chain in 'AB':
# ## look at gene/allele maps
# vj_alleles = { 'V': [ id for (id,g) in all_genes[organism].iteritems() if g.chain==chain and g.region=='V'],
# 'J': [ id for (id,g) in all_genes[organism].iteritems() if g.chain==chain and g.region=='J'] }
# for vj, alleles in vj_alleles.iteritems():
# gene2rep = {}
# gene2alleles = {}
# rep_gene2alleles = {}
# for allele in alleles:
# #assert allele[2] == chain
# gene = allele[:allele.index('*')]
# rep_gene = get_mm1_rep_ignoring_allele( allele, organism )
# if rep_gene not in rep_gene2alleles:
# rep_gene2alleles[ rep_gene ] = []
# rep_gene2alleles[ rep_gene ].append( allele )
# if gene not in gene2rep:
# gene2rep[gene] = set()
# gene2alleles[gene] = []
# gene2rep[ gene ].add( rep_gene )
# gene2alleles[gene].append( allele )
# merge_rep_genes = {}
# for gene,reps in gene2rep.iteritems():
# if len(reps)>1:
# assert vj=='V'
# if verbose:
# print 'multireps:',organism, gene, reps
# for allele in gene2alleles[gene]:
# print ' '.join(all_genes[organism][allele].cdrs), allele, \
# get_rep(allele,organism), get_mm1_rep(allele,organism)
# ## we are going to merge these reps
# ## which one should we choose?
# l = [ (len(rep_gene2alleles[rep]), rep ) for rep in reps ]
# l.sort()
# l.reverse()
# assert l[0][0] > l[1][0]
# toprep = l[0][1]
# for (count,rep) in l:
# if rep in merge_rep_genes:
# assert rep == toprep and merge_rep_genes[rep] == rep
# merge_rep_genes[ rep ] = toprep
# for allele in alleles:
# count_rep = get_mm1_rep_ignoring_allele( allele, organism )
# if count_rep in merge_rep_genes:
# count_rep = merge_rep_genes[ count_rep ]
# allele2mm1_rep_gene_for_counting[ organism ][ allele] = count_rep
# if verbose:
# print 'allele2mm1_rep_gene_for_counting:',organism, allele, count_rep
def assign_label_reps_and_colors_based_on_most_common_genes_in_repertoire( tcr_infos, organism ):
## assumes that each element of tcr_infos is a dictionary with fields that would have come from parse_tsv_line
## uses the *_countreps info that was filled in by read_pair_seqs.py
## the _label_rep* fields get over-written if they were present
for segtype in segtypes_lowercase:
countreps_tag = segtype+'_countreps'
rep_tag = segtype+'_label_rep'
color_tag = segtype+'_label_rep_color' ## where we will store the rep info
counts = {}
for tcr_info in tcr_infos:
reps = tcr_info[countreps_tag].split(';')
for rep in reps:
counts[rep] = counts.get(rep,0)+1
newcounts = {}
for tcr_info in tcr_infos:
reps = tcr_info[countreps_tag].split(';')
toprep = max( [ ( counts[x],x) for x in reps ] )[1]
tcr_info[rep_tag] = toprep ## doesnt have allele info anymore
newcounts[toprep] = newcounts.get(toprep,0)+1
l = [(y,x) for x,y in newcounts.iteritems()]
l.sort()
l.reverse()
rep_colors = dict( zip( [x[1] for x in l], html_colors.get_rank_colors_no_lights(len(l)) ) )
for tcr_info in tcr_infos:
tcr_info[ color_tag ] = rep_colors[ tcr_info[ rep_tag ] ]
return ## we modified the elements of the tcr_infos list in place
## this is not exactly perfect, but probably OK to start with...
##
def detect_fake_chains( clones_file, Achain='A', Bchain='B' ):
tcrs = parse_tsv.parse_tsv_file( clones_file, key_fields = [], store_fields = ['va_gene','cdr3a','vb_gene','cdr3b'] )
fake_chains = []
if len( set( [ (x[0],x[1]) for x in tcrs ] ) )==1:
fake_chains.append( Achain )
if len( set( [ (x[2],x[3]) for x in tcrs ] ) )==1:
fake_chains.append( Bchain )
if fake_chains:
print 'Fake sequence data detected for chains: {}'.format( ' '.join( fake_chains ) )
return fake_chains
# if __name__ == '__main__':
# for organism in allele2mm1_rep_gene_for_counting:
# for allele in allele2mm1_rep_gene_for_counting[ organism ]:
# print 'get_mm1_rep_gene_for_counting