-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUtils.py
170 lines (144 loc) · 5.06 KB
/
Utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# =============================================================================
# bmle
# G4Pipeline: Utils.py
# Utilities for manipulating GFF, FASTA, and SAM files
# =============================================================================
def load(filePath):
"""Load the contents of a GFF file.
:param filePath: the absolute path to the GFF file
:return: a list of lists representing the contents of the GFF file
"""
import os
import errno
from operator import itemgetter
from natsort import natsorted
headerList = []
seqregList = []
dataList = []
try:
with open(filePath) as file:
for line in file:
temp = line.split('\t')
if len(temp) == 9:
temp[8] = temp[8].split(';')
dataList.append(temp)
elif temp[0].startswith('##sequence-region'):
seqregList.append(line)
elif temp[0].startswith('#'):
headerList.append(line)
else:
raise AssertionError('Unknown line in GFF file: ' + line)
except IOError:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filePath)
# Sorts by: seqid -> start position -> end position
dataList = natsorted(dataList, key=itemgetter(0,3,4))
seqregList = natsorted(seqregList)
return [headerList, seqregList, dataList]
def writeEntry(line):
"""Convert a GFF-formatted entry into a string.
GFF-formatted entry: [seqid, source, ..., strand, phase, [attributes]]
:param line: A GFF-formatted entry
:return: A string representation of the GFF-formatted entry
"""
toReturn = ''
for item in line:
if type(item) is list:
for item2 in item: toReturn += str(item2) + ';'
else:
toReturn += str(item) + '\t'
return toReturn[:-1]
def writeFile(filePath, header, data):
"""Write data to a GFF file.
:param filePath: the absolute path to the file to write to
:param header: the headers of the GFF file
:param data: the data for the file
:return: nothing
"""
import os
from natsort import natsorted
from itertools import groupby
os.makedirs(os.path.dirname(filePath), exist_ok=True)
with open(filePath, 'w') as file:
for line in header: file.write(line)
data = natsorted(data)
dataFiltered = list(l for l,_ in groupby(data)) # removes duplicates from data
for line in dataFiltered: file.write(writeEntry(line))
def generateSeqRegs(fastaPath):
"""Generate sequence headers from a FASTA file.
:param fastaPath: the absolute path to the FASTA file
:return: a list of sequence-regions (formatted as strings)
"""
import os
import errno
import re
from natsort import natsorted
tempList = []
pattern = '[>\|,\s]+'
try:
with open(fastaPath, 'r') as f:
# Prompts the user to specify the location of the sequence region name
s = f.readline().strip()
fline = re.split(pattern, s)
print(fline)
index = int(input('Index of position that contains sequence label: '))
tempList.append([fline[index], 0])
# Iterates over the rest of the strings
for line in f:
if not line.startswith('>'):
tempList[-1][1] += len(line.strip())
else:
tempList.append([re.split(pattern, line.strip())[index], 0])
except IOError:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), fastaPath)
# Builds the strings
toReturn = []
for pair in tempList:
toReturn.append('##sequence-region ' + pair[0] + ' 1 ' + str(pair[1]))
return natsorted(toReturn)
def reformatSAM(samPath, genomePath):
"""Reformat a blastn-outputted SAM file to replace the 'Query_#' sequence names with the actual sequence names.
:param samPath: path to the SAM-formatted blastn file
:param genomePath: path to the FASTA-formatted query genome file that 'samPath' was based off of
:return: nothing
"""
import fileinput
import re
print('Reformatting SAM file...')
seqList = [seq.split()[1] for seq in generateSeqRegs(genomePath)]
seqList.insert(0, 'null') # offsets the list by one because there doesn't exist a "Query_0"
with fileinput.FileInput(samPath, inplace=True) as f:
pattern = r'Query_[0-9]+'
for line in f:
temp = re.search(pattern, line)
if temp is not None:
num = int(temp.group()[6:])
line = re.sub(pattern, seqList[num], line)
print(line.strip())
print('Finished!\n')
def reformatGFF(gffPath, fastaPath):
"""Reformat a GFF annotation file to a GFF3 file.
:param gffPath: path to the GFF-formatted annotation file
:param fastaPath: path to the FASTA-formatted genome file that gffPath is based off of
:return: writes a GFF3-formatted file to the same directory as gffPath
"""
print('Reformatting GFF file...')
toWritePath = gffPath + '3'
seqs = generateSeqRegs(fastaPath)
dataToWrite = []
# Reformat each line of gff file
with open(gffPath, 'r') as gffFile:
for line in gffFile:
line = line.strip().split('\t')
temp = ''
line[8] = line[8].split(';')
for item in line[8]:
item = item.split()
temp += item[0].capitalize() + '=' + ' '.join(item[1:]) + ';'
line[8] = temp
dataToWrite.append('\t'.join(line))
# Write everything to output file
with open(toWritePath, 'w') as f:
f.write('##gff-version 3\n')
for s in seqs: f.write(s + '\n')
for datum in dataToWrite: f.write(datum + '\n')
print('Finished!\n')