-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobo_tools.py
470 lines (404 loc) · 18.9 KB
/
obo_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
import os
import obonet
from collections import defaultdict
from typing import List, Tuple, Dict
import sys
import pickle
import numpy as np
import shutil
import requests
# default paths
file_dir = os.path.dirname(os.path.abspath(__file__))
user_home = os.path.expanduser("~")
tmp_dir = os.path.join(user_home, ".obo_tools")
default_go_obo = os.path.join(tmp_dir, "go-basic.obo")
default_obo_pkl = os.path.join(tmp_dir, "obo.pkl")
class ObOTools:
def __init__(self,
go_obo: str = default_go_obo, # Path to the go.obo file
obo_url: str = "http://purl.obolibrary.org/obo/go/go-basic.obo", # URL to the go.obo file
obo_pkl: str = default_obo_pkl, # Path to the obo.pkl file
add_part_of:bool = True, # whether to add the part_of relationship to the is_a.csv file
verbose:bool = True, # whether to print the warning message
tmp_dir: str = tmp_dir, # the tmp dir to store the go.obo and obo.pkl file
forced: bool = False # whether to force to download the obo file
):
self.go_obo = go_obo
self.obo_url = obo_url
self.obo_pkl = obo_pkl
self.add_part_of = add_part_of
self.tmp_dir = tmp_dir
self.verbose = verbose
self.aspects = ['BPO', 'CCO', 'MFO']
if forced:
self.clear_obo()
self.is_a_dict, self.alt_id_dict, self.term2aspect, self.net \
= self.init_obo(go_obo=self.go_obo, obo_url=self.obo_url)
self.obo_version = self.net.graph['data-version']
def clear_obo(self):
if os.path.exists(self.obo_pkl):
os.remove(self.obo_pkl)
if self.go_obo is not None and os.path.exists(self.go_obo):
os.remove(self.go_obo)
@staticmethod
def fetch_release(release:str="current", add_part_of:bool=True, verbose:bool=True, tmp_dir:str=tmp_dir):
if release == "current":
return ObOTools(
go_obo=None,
obo_url='http://purl.obolibrary.org/obo/go/go-basic.obo',
add_part_of=add_part_of,
verbose=verbose,
tmp_dir=tmp_dir,
forced=True
)
else:
url = f"https://release.geneontology.org/{release}/ontology/go-basic.obo"
# first check if the url is valid
r = requests.head(url)
if r.status_code != 200:
print(f"Error: {url} is not valid")
return None
return ObOTools(
go_obo=None,
obo_url=url,
add_part_of=add_part_of,
verbose=verbose,
tmp_dir=tmp_dir,
forced=True
)
def init_obo(self, go_obo:str=None, obo_url:str=None):
"""
Initialize the obo file
"""
# check if the obo.pkl file exists
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
if os.path.exists(self.obo_pkl):
is_a_dict, alt_id_dict, term2aspect, net = self.load_obo_pkl()
return is_a_dict, alt_id_dict, term2aspect, net
# load from the obo file
obo_path = go_obo
if obo_path is None or not os.path.exists(obo_path):
obo_path = obo_url
if self.verbose:
print(f"Loading the obo file from {obo_path}")
obo = obonet.read_obo(obo_path)
alt_id_dict = self.creat_alt_id_dict(obo)
term2aspect = self.create_ns_dict(obo)
is_a_dict = self.create_is_a_dict(obo, term2aspect, alt_id_dict, self.add_part_of)
net = obo
# save the obo.pkl file
if self.verbose:
print(f"Saving the obo.pkl file to {self.obo_pkl} for future use")
obo_pkl = {"is_a_dict":is_a_dict, "alt_id_dict":alt_id_dict, "term2aspect":term2aspect, "net":net}
pickle.dump(obo_pkl, open(self.obo_pkl, "wb"))
return is_a_dict, alt_id_dict, term2aspect, net
def update_obo(self, go_obo:str=None, obo_url:str=None, newest:bool=False):
if go_obo is not None and os.path.exists(go_obo):
# remove the old go_obo file
if os.path.exists(go_obo) and go_obo != self.go_obo:
shutil.copyfile(go_obo, self.go_obo)
if os.path.exists(self.obo_pkl):
os.remove(self.obo_pkl)
self.is_a_dict, self.alt_id_dict, self.term2aspect, self.net \
= self.init_obo(go_obo=self.go_obo, obo_url=None)
return
if newest:
obo_url = "http://purl.obolibrary.org/obo/go/go-basic.obo"
if obo_url is not None:
if os.path.exists(self.obo_pkl):
os.remove(self.obo_pkl)
obo_path = obo_url
self.is_a_dict, self.alt_id_dict, self.term2aspect, self.net \
= self.init_obo(go_obo=None, obo_url=obo_path)
return
def load_obo_pkl(self):
"""
Load the obo.pkl file
"""
obo_pkl = pickle.load(open(self.obo_pkl, "rb"))
is_a_dict = obo_pkl["is_a_dict"]
alt_id_dict = obo_pkl["alt_id_dict"]
term2aspect = obo_pkl["term2aspect"]
net = obo_pkl["net"]
version = net.graph['data-version']
if self.verbose:
print(f"Loading the obo.pkl file from {self.obo_pkl}\n{version}")
return is_a_dict, alt_id_dict, term2aspect, net
def creat_alt_id_dict(self, obo):
alt_id_dict = dict()
for term_id, data in obo.nodes(data=True):
if term_id not in alt_id_dict:
alt_id_dict[term_id] = term_id
if 'alt_id' in data:
for alt_id in data['alt_id']:
alt_id_dict[alt_id] = term_id
return alt_id_dict
def create_ns_dict(self, obo):
term2aspect = dict()
aspect_map = {'biological_process': 'BPO', 'molecular_function': 'MFO', 'cellular_component': 'CCO'}
for term_id, data in obo.nodes(data=True):
if 'namespace' in data:
term2aspect[term_id] = aspect_map[data['namespace']]
return term2aspect
def find_all_parents(self, term_id:str, is_a_dict:dict, cache=dict())->dict:
parents = set()
if term_id in cache:
return cache[term_id]
if term_id in is_a_dict:
parents.update(is_a_dict[term_id])
for parent in is_a_dict[term_id]:
parents.update(self.find_all_parents(parent, is_a_dict))
cache[term_id] = parents
return parents
def create_is_a_dict(self, obo, aspect_dict:dict, alt_id_dict:dict=None, add_part_of:bool=True)->dict:
is_a_dict = dict()
for term_id, data in obo.nodes(data=True):
# replace term_id with its main id if it is an alt_id
if alt_id_dict is not None:
if term_id in alt_id_dict:
term_id = alt_id_dict[term_id]
term_aspect = aspect_dict[term_id]
if term_id not in is_a_dict:
is_a_dict[term_id] = set()
# add is_a parent
if 'is_a' in data:
for parent_term_id in data['is_a']:
parent_term_aspect = aspect_dict[parent_term_id]
if term_aspect == parent_term_aspect:
is_a_dict[term_id].add(parent_term_id)
# add part_of parent
if add_part_of:
if 'relationship' in data:
for record in data['relationship']:
rel, parent_term_id = record.split(' ')
if rel == 'part_of':
parent_term_aspect = aspect_dict[parent_term_id]
if term_aspect == parent_term_aspect:
is_a_dict[term_id].add(parent_term_id)
# create a dict of all ancestors
is_a_dict_all = dict()
for term_id in is_a_dict:
is_a_dict_all[term_id] = self.find_all_parents(term_id, is_a_dict)
return is_a_dict_all
def update_parent(self, protein_name:str, cur_terms:set)->Tuple[str,set]:
"""
Update the cur_terms to the parents of the cur_terms
Args:
protein_name: the name of the protein
cur_terms: the current terms of the protein
Returns:
protein_name: the name of the protein
final_terms: the updated terms of the protein
"""
final_terms = set()
for term in cur_terms:
if term not in self.alt_id_dict:
# warning: the term is not in the alt_id_dict, the obo file may be outdated
if self.verbose:
print(f"Warning: {term} is not in the alt_id_dict, the obo file may be outdated, please check protein: {protein_name}")
final_terms.add(term)
else:
all_parent = self.is_a_dict[self.alt_id_dict[term]].copy()
all_parent.add(self.alt_id_dict[term])
final_terms.update(all_parent)
return protein_name, final_terms
def backprop_terms(self, cur_terms:set)->set:
if isinstance(cur_terms, List):
cur_terms = set(cur_terms)
final_terms = set()
for term in cur_terms:
if term not in self.alt_id_dict:
# warning: the term is not in the alt_id_dict, the obo file may be outdated
if self.verbose:
print(f"Warning: skipping {term} because it was not in the alt_id_dict, the obo file may be outdated.", file=sys.stderr)
final_terms.add(term)
else:
all_parent = self.is_a_dict[self.alt_id_dict[term]].copy()
all_parent.add(self.alt_id_dict[term])
final_terms.update(all_parent)
return final_terms
def backprop_cscore(self, go_cscore:dict, min_cscore:float=None, sorting=True)->Dict[str, float]:
"""
backproprate the child cscore to the parents
parent cscore should not be smaller than the child cscore
if the parent cscore is smaller than the child cscore, update the child cscore to the parent cscore
Args:
go_cscore: the go_cscore dict where the key is the go term and the value is the cscore
min_cscore: the minimum cscore, if the cscore is smaller than the min_cscore, the cscore will be set to 0
return:
go_cscore: the updated go_cscore dict
"""
if min_cscore is not None:
filter_score = {term:cscore for term, cscore in go_cscore.items() if cscore > min_cscore}
backprop_cscore = filter_score.copy()
else:
filter_score = go_cscore
backprop_cscore = go_cscore.copy()
for term in filter_score:
if term not in self.alt_id_dict:
# warning: the term is not in the alt_id_dict, the obo file may be outdated
if self.verbose:
print(f"Warning: skipping {term} because it was not in the alt_id_dict, the obo file may be outdated.", file=sys.stderr)
continue
# replace the alt_id with the main_id, and get all the parents for the term
all_parent = self.is_a_dict[self.alt_id_dict[term]].copy()
# loop through all the parents, if the parent is not in the backprop_cscore or the parent cscore is smaller than the child cscore
# update the child cscore to the parent cscore
for parent in all_parent:
if parent not in backprop_cscore or backprop_cscore[parent] < backprop_cscore[term]:
backprop_cscore[parent] = backprop_cscore[term]
if sorting:
# high to low
backprop_cscore = {k: v for k, v in sorted(backprop_cscore.items(), key=lambda item: item[1], reverse=True)}
return backprop_cscore
def get_aspect_terms(self, aspect:str):
"""
Get all the go terms for the aspect
Args:
aspect: the aspect, BP, MF, CC
Returns:
go_terms: a set of go terms
"""
go_terms = set()
for term in self.term2aspect:
if self.term2aspect[term] == aspect:
go_terms.add(term)
return go_terms
def get_aspect(self,term:str):
"""
Get the aspect of the term
Args:
term: the go term
Returns:
aspect: the aspect of the term
"""
aspect = self.term2aspect.get(term, None)
return aspect
def generate_child_matrix(self, term_list:List[str]):
"""
Generate the child matrix for the aspect
Args:
go2vec: the go2vec dict where the key is the go term and the value is the index of the go term in the embedding matrix
Returns:
child_matrix: the child matrix for the aspect where child_matrix[i][j] = 1 if the jth GO term is a subclass of the ith GO term else 0
"""
training_terms = term_list
#CM_ij = 1 if the jth GO term is a subclass of the ith GO term
child_matrix = np.zeros((len(training_terms), len(training_terms)))
# fill diagonal with 1
np.fill_diagonal(child_matrix, 1)
for i, term in enumerate(training_terms):
for j, child in enumerate(training_terms):
if i == j:
continue
if term in self.is_a_dict[child]:
child_matrix[i][j] = 1
return child_matrix
def toplogical_child_matrix(self, term_list:List[str]):
"""
Generate the child matrix for the aspect
the input term_list should be topologically sorted
Then we can ignore these i rows on the bottom of the matrix where the sum of the row is 1 (itself)
Args:
term_list (List[str]): selected go terms for the aspect
return:
child_matrix: the child matrix for the aspect where child_matrix[i][j] = 1 if the jth GO term is a subclass of the ith GO term else 0
"""
# check if the term_list is topologically sorted
# if not, raise the error
sorted_terms,leafs = self.top_sort(term_list, return_leaf=True)
assert len(sorted_terms) == len(term_list), "The input term_list is not topologically sorted"
for i, term in enumerate(sorted_terms):
assert term == term_list[i], f"The {i} idx term:{term_list[i]} is not the same as the sorted term:{term}"
training_terms = term_list
#CM_ij = 1 if the jth GO term is a subclass of the ith GO term
child_matrix = np.zeros((len(training_terms), len(training_terms)))
# fill diagonal with 1
np.fill_diagonal(child_matrix, 1)
for i, term in enumerate(training_terms):
for j, child in enumerate(training_terms):
if i == j:
continue
if term in self.is_a_dict[child]:
child_matrix[i][j] = 1
num_leaf = len(leafs)
child_matrix = child_matrix[:-num_leaf,:]
return child_matrix
def get_num_leaf(self, term_list:List[str]):
"""
Get the number of leaf go terms for the aspect
Args:
term_list (List[str]): selected go terms for the aspect
Returns:
num_leaf (int): the number of leaf go terms for the aspect
"""
term_list = sorted(term_list)
set_term_list = set(term_list)
# reverse the is_a_dict to get the child dict
child_dict = defaultdict(set)
for child, parent_set in self.is_a_dict.items():
for parent in parent_set:
if parent in set_term_list and child in set_term_list:
child_dict[child].add(parent)
# create a dict to store the indegree of each go term
indegree = {term:0 for term in term_list}
# sort the indegree dict by the key
indegree = dict(sorted(indegree.items(), key=lambda item: item[0]))
# loop through the child dict to get the indegree of each go term
for parent, child_set in child_dict.items():
for child in child_set:
indegree[child] += 1
queue = [ term for term in term_list if indegree[term] == 0 ]
return len(queue)
def top_sort(self, term_list:List[str], return_leaf:bool=False)->List[str]:
"""
Topological sort the go terms
Args:
term_list (List[str]): selected go terms for the aspect
Returns:
sorted_terms (List[str]): sorted go terms
"""
term_list = sorted(term_list)
set_term_list = set(term_list)
# reverse the is_a_dict to get the child dict
child_dict = defaultdict(set)
for child, parent_set in self.is_a_dict.items():
for parent in parent_set:
if parent in set_term_list and child in set_term_list:
child_dict[child].add(parent)
# create a dict to store the indegree of each go term
indegree = {term:0 for term in term_list}
# sort the indegree dict by the key
indegree = dict(sorted(indegree.items(), key=lambda item: item[0]))
# loop through the child dict to get the indegree of each go term
for parent, child_set in child_dict.items():
for child in child_set:
indegree[child] += 1
# find the go terms with indegree 0 (leaves) and add them to the queue
indexes = []
visited = set()
queue = [ term for term in term_list if indegree[term] == 0 ]
leafs = queue.copy()
# for each element of the queue increment visits, add them to the list of ordered nodes
# and decrease the in-degree of the neighbor nodes
# and add them to the queue if they reach in-degree == 0
while queue:
node = queue.pop(0)
visited.add(node)
indexes.append(node)
parents = self.is_a_dict[node].copy()
parents = parents.intersection(set_term_list)
# sort the parents
parents = sorted(parents)
if parents:
for parent in parents:
indegree[parent] -= 1
if indegree[parent] == 0:
queue.append(parent)
if len(visited) != len(term_list):
print("Warning: the graph is not a DAG, the topological sort may not be correct", file=sys.stderr)
else:
if return_leaf:
return indexes[::-1], leafs
return indexes[::-1]