-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpipeline_categorizacao.py
393 lines (336 loc) · 13.5 KB
/
pipeline_categorizacao.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
print("Importando bibliotecas...\n")
from Canonica import Canonica
lexico_json = "LexicoPoetisa.json"
lexico = Canonica(lexico_json)
import os
import shutil
import sys
import json
from tqdm import tqdm
from nltk import sent_tokenize
import nltk
nltk.download('punkt')
nltk.download('stopwords')
stopwords = nltk.corpus.stopwords.words('portuguese')
from symspellpy import SymSpell, Verbosity
from itertools import islice
symsp = SymSpell()
symsp.load_dictionary('formas.totalbr.txt', term_index=1, count_index=0, separator='\t', encoding='ISO-8859-1')
from math import log
# dictionary with pipeline's feedback generated by human in the loop
f = open('dicionario-especifico-corretor.json', encoding="utf8")
dicionario_correcoes = json.load(f)
# Build a cost dictionary, assuming Zipf's law and cost = -math.log(probability).
words = open("word_frequency_linguateca.txt", encoding="utf8").read().split()
wordcost = dict((k, log((i+1)*log(len(words)))) for i,k in enumerate(words))
maxword = max(len(x) for x in words)
# INFER SPACES ******************************************************************************
def infer_spaces(s):
"""Uses dynamic programming to infer the location of spaces in a string without spaces."""
#******************************************************************************
# Find the best match for the i first characters, assuming cost has
# been built for the i-1 first characters.
# Returns a pair (match_cost, match_length).
def best_match(i):
candidates = enumerate(reversed(cost[max(0, i-maxword):i]))
return min((c + wordcost.get(s[i-k-1:i], 9e999), k+1) for k,c in candidates)
#fim def
#******************************************************************************
# Build the cost array.
cost = [0]
for i in range(1,len(s)+1):
c,k = best_match(i)
cost.append(c)
#fim for
# Backtrack to recover the minimal-cost string.
out = []
i = len(s)
while i>0:
c,k = best_match(i)
assert c == cost[i]
out.append(s[i-k:i])
i -= k
#fim for
return " ".join(reversed(out))
#fim def
# GET PARAMETERS ******************************************************************************
#print('\nDigite o nome da pasta que contém os textos: ')
#pasta = input()
qtde_parametros = len(sys.argv)
if (qtde_parametros >= 2):
# read folder name in which the texts are in
pasta = sys.argv[1]
anotado = False
arquivo_texto = False
# indicates if the texts are annotated or not
if qtde_parametros >= 3 and sys.argv[2] == "anotado":
print('--CORPUS ANOTADO--')
anotado = True
else:
print('--CORPUS SEM ANOTAÇÃO--')
# optionally run the pipeline in only one text file
if qtde_parametros >= 4:
arquivo_texto = sys.argv[3]
else:
print("Erro de sintaxe!\n")
print("Comande: python3 pipeline_categorizacao.py <pasta-de-arquivos-txt-corpus>/ | <anotado> | <arquivo-especifico>")
print("\tExemplo: python3 pipeline_categorizacao.py /home/corpus/ anotado arquivo.txt")
print("\tExemplo: python3 pipeline_categorizacao.py /home/corpus/ nao arquivo.txt")
print("\tExemplo: python3 pipeline_categorizacao.py /home/corpus/ anotado")
print("\tExemplo: python3 pipeline_categorizacao.py /home/corpus/ nao")
sys.exit()
#fim if
if not arquivo_texto:
try:
textos = os.listdir(pasta)
except:
print("Nome de pasta inválida")
sys.exit()
#fim try
else:
textos = [arquivo_texto]
#fim if
# GENERATE TOKENS ******************************************************************************
sentencas = []
for filename in tqdm(textos):
f = open(os.path.join(pasta,filename), encoding="utf8")
projeto = f.read()
# separate sentences
frases = sent_tokenize(projeto)
for frase in frases:
# verify words in <...>
if anotado:
tokens = frase.split(" ")
processada = [w.lower() for w in tokens if not w.lower() in stopwords and w.isalnum() and "<" not in w]
# tokenize sentences
else:
tokens = nltk.word_tokenize(frase)
processada = [w.lower() for w in tokens if not w.lower() in stopwords and w.isalnum()]
#fim if
sentencas.append(processada)
#fim for
#fim for
from nltk.lm.preprocessing import flatten
# concatenate all tokens from texts in a list
tokens = list(flatten(sentencas))
print("Total de Tokens:",len(tokens))
# filter non repeated words (types)
types = list(set(tokens))
print("Total de Types:",len(types))
print("Token/Type:",str(len(tokens)/len(types)))
# IDENTIFY UNKNOWN WORDS ******************************************************************************
# verify if token exists in lexic
desconhecidos = []
for word in types:
if not lexico.existePalavra(word) and (word not in dicionario_correcoes):
desconhecidos.append(word)
#fim if
#fim for
print("\n Preparando funções para categorização das desconhecidas...")
import pandas as pd
# GENERATE DATAFRAME ******************************************************************************
# starts dataframe to categorize unknown words
def create_clear_df():
df = pd.DataFrame(columns=['palavra', 'sugestão', 'classe'])
df['palavra'] = desconhecidos
df['sugestão'] = '-'
df['classe'] = 'desconhecida'
return df
#fim def
# READ LEXIC ******************************************************************************
df = create_clear_df()
import re
import unidecode
# read lexic json to string
arquivo_obj = open(lexico_json,encoding='utf-8')
str_json_lexico = arquivo_obj.read()
arquivo_obj.close()
# capture all json keys
palavras_lexico = re.findall('(?<=")(\S*)(?=":)', str_json_lexico)
# ACCENTUATION ERRORS ******************************************************************************
# unaccentuate lexic words
def desacentuando_palavra(palavra):
return unidecode.unidecode(palavra)
#fim def
# decode words from unicode
def tirando_palavra_do_unicode(palavra_unicode):
return palavra_unicode.encode().decode('unicode-escape')
#fim def
# unaccentuate lexic words that are in unicode
def desacentuando_palavra_unicode(palavra_unicode):
palavra_sem_unicode = tirando_palavra_do_unicode(palavra_unicode)
return desacentuando_palavra(palavra_sem_unicode)
#fim def
dic_palavras_lexico_sem_acento = {}
# creates a new dictionary with unaccentuated words from lexic
for palavra_acentuada in palavras_lexico:
palavra_sem_acento = desacentuando_palavra_unicode(palavra_acentuada)
dic_palavras_lexico_sem_acento[palavra_sem_acento] = tirando_palavra_do_unicode(palavra_acentuada)
#fim for
#PLACES FROM IBGE AND GOOGLE MAPS ******************************************************************************
from ibge.localidades import *
dados_regioes = Regioes().getNome()
dados_estados = Estados().getNome()
dados_municipios = Municipios().getNome()
#concatenate data from regions, states and cities from Brazil
localidades_ibge = dados_regioes + dados_municipios + dados_estados
# words with places from Brazil
# in lower case and without accent
localidades_ibge_lower_sem_acento = []
for localidade in localidades_ibge:
localidade_sem_acento = desacentuando_palavra(localidade)
localidades_ibge_lower_sem_acento.append(localidade_sem_acento.lower())
#fim for
json_paises = open("paises-gentilicos-google-maps.json", encoding="utf8")
paises = json.load(json_paises)
json_paises.close()
# list with country names and gentiles
# in lower case and without accent
nome_paises_gentilicos_lower_sem_acento = []
for pais in paises:
pais_sem_acento = desacentuando_palavra(pais['nome_pais'])
gentilico_sem_acento = desacentuando_palavra(pais['gentilico'])
nome_paises_gentilicos_lower_sem_acento.append(pais_sem_acento.lower())
nome_paises_gentilicos_lower_sem_acento.append(gentilico_sem_acento.lower())
#fim for
from googletrans import Translator
translator = Translator()
df = create_clear_df()
# SIMILAR SPELL ******************************************************************************
# function to find if word has another word with similar spell
def verifica_symspell(palavra):
sugestao = symsp.lookup(palavra, Verbosity.CLOSEST, max_edit_distance=1, transfer_casing=True, include_unknown=True)[0].term
if (sugestao != palavra) and lexico.existePalavra(sugestao):
return sugestao
else:
return -1
#fim if
#fim def
# AGGLUTINATED WORD ******************************************************************************
# function to find if word is a concatenation of many words
# it uses a function to infer spaces
def palavra_aglutinada_existe(palavra):
sugestao = infer_spaces(palavra)
lista = (sugestao).split(' ')
ok = False
for termo in lista:
if (len(termo)>1):
ok = True
if not lexico.existePalavra(termo):
ok = False
#fim for
if ok:
return [sugestao]
else:
return ''
#fim if
#fim def
# TRANSLATION ******************************************************************************
# translate word if there is a translation
def traduz(palavra,lingua):
try:
translation = translator.translate(palavra, src=lingua, dest='pt')
except:
print("Erro traducao ",lingua,":",palavra)
else:
return translation.text
#fim def
# PIPELINE FUNCTION ******************************************************************************
def categorizacao(palavra):
palavra_sem_acento = desacentuando_palavra(palavra)
if palavra.isnumeric():
return ['numerico', False]
elif (palavra_sem_acento in localidades_ibge_lower_sem_acento) or (palavra_sem_acento in nome_paises_gentilicos_lower_sem_acento):
return ['local', False]
elif palavra in dic_palavras_lexico_sem_acento:
#troca pela palavra acetuada
return ['falta_acento', dic_palavras_lexico_sem_acento[palavra]] #vai trocar pela palavra acetuada
elif palavra_sem_acento in dic_palavras_lexico_sem_acento:
#troca pela palavra acetuada de forma correta
return ['acent_err', dic_palavras_lexico_sem_acento[palavra_sem_acento]]
elif verifica_symspell(palavra) != -1:
#troca pela palavra corrigida
return ['symspell', verifica_symspell(palavra)]
elif (len(palavra_aglutinada_existe(palavra)) != 0):
palavras_desaglutinadas = palavra_aglutinada_existe(palavra)
palavras_desaglutinadas.insert(0, 'aglutinada')
return palavras_desaglutinadas
elif traduz(palavra,'en') != palavra:
return ['traduzida', False]
elif traduz(palavra,'es') != palavra:
return ['traduzida', False]
elif traduz(palavra,'fr') != palavra:
return ['traduzida', False]
else:
return ['desconhecida', False]
#fim if
#fim def
# EXECUTE PIPELINE ******************************************************************************
print("Começando categorização")
indice = 0
for palavra in tqdm(desconhecidos):
palavra_categorizada = categorizacao(palavra)
df.at[indice, 'classe'] = palavra_categorizada[0]
if (palavra_categorizada[1]):
df.at[indice, 'sugestão'] = palavra_categorizada[1]
#fim if
indice = indice + 1
#fim for
folder = "desconhecidas"
if not os.path.exists(folder):
os.makedirs(folder)
else:
shutil.rmtree(folder)
os.makedirs(folder)
#fim if
print("Exportando o csv de desconhecidas para a pasta desconhecidas...")
pipeline_order = ['numerico', 'local', 'falta_acento', 'acent_err', 'symspell', 'aglutinada', 'traduzida', 'desconhecida']
df['classe'] = pd.Categorical(df['classe'], pipeline_order)
df.sort_values(['classe', 'palavra'], inplace=True, ignore_index=True)
df.to_csv(folder+'/desconhecidas.csv', index=False)
print("\n--FIM DO PIPELINE--")
# REWRITE TEXTS WITH PIPELINE SUGGESTIONS ******************************************************************************
print("\n--COMEÇANDO A REMONTAR OS TEXTOS--")
path_corrigidos = "textos-corrigidos"
print("Os textos remontados ficarão na pasta", path_corrigidos)
if not os.path.exists(path_corrigidos):
os.makedirs(path_corrigidos)
else:
shutil.rmtree(path_corrigidos)
os.makedirs(path_corrigidos)
#fim if
subsdf = df[(df.sugestão != '-')]
subsdict = subsdf.set_index('palavra').T.to_dict('list')
error_occurr_dict = {}
mapping = {'falta_acento': 0, 'acent_err': 1, 'symspell':2, 'aglutinada':3}
for filename in tqdm(textos):
name = filename.split('-')[0]
error_occurr_dict[name] = [0, 0, 0, 0]
f = open(os.path.join(pasta,filename), encoding="utf8")
projeto = f.read()
if anotado:
proj_corrig = projeto.replace("<", "").replace(">", "")
else:
proj_corrig = projeto
#fim if
for palavra in nltk.word_tokenize(proj_corrig):
pattern = r'\b' + re.escape(palavra) + r'\b(?!\[)'
if palavra in dicionario_correcoes:
proj_corrig = re.sub(pattern, dicionario_correcoes[palavra], proj_corrig)
elif (palavra not in stopwords) and (palavra in subsdict):
sugestao, classe = subsdict[palavra]
proj_corrig = re.sub(pattern, palavra+'['+sugestao+']'+'{'+classe+'}', proj_corrig)
idx = mapping[classe]
error_occurr_dict[name][idx] += 1
#fim if
#fim for
f = open(os.path.join(path_corrigidos,filename[:-4]+'-corrigido.txt',), 'w', encoding="utf8")
f.write(proj_corrig)
f.close()
#fim for
error_occurr_df = pd.DataFrame.from_dict(error_occurr_dict, orient='index')
inv_map = {v: k for k, v in mapping.items()}
error_occurr_df.rename(columns=inv_map, inplace=True)
print("Exportando err_occur para a pasta desconhecidas:")
error_occurr_df.to_csv(os.path.join(folder,'err_occurr.csv'), index_label='ID do Projeto')
print('\n--FIM--')