-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtia_portal_translator.py
163 lines (136 loc) · 6.69 KB
/
tia_portal_translator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import multiprocessing as mp
import argparse
from openpyxl import load_workbook
from googletrans import Translator # use 3.1.0a0 or later
import openai
from deepl import Translator as DeepLTranslator
import textwrap
my_excel = 'TIAProjectTexts.xlsx'
my_excel_sheet_name = 'User Texts'
n_processes = min(os.cpu_count(), 64) #64 is maximum number in Windows, you can try to push the no of processes to the limits, but it can hit your system's stability
result_excel = f'{my_excel[:-5]}_translated.xlsx'
def parse_arguments():
parser = argparse.ArgumentParser(description='Translate an Excel file using Google Translate, GPT, or DeepL.')
parser.add_argument('--service', choices=['google', 'gpt', 'deepl'], required=True, help='Choose the translation service (google, gpt, or deepl).')
parser.add_argument('--source', required=True, help='Source language and region (e.g., en-US, fr-FR, de-DE).')
parser.add_argument('--dest', required=True, help='Destination language and region (e.g., en-US, fr-FR, de-DE).')
args = parser.parse_args()
return args
class TranslationService:
def __init__(self, api_key=None, destination_language=None):
self.api_key = api_key
self.destination_language = destination_language
def translate(self, text):
pass
class GoogleTranslationService(TranslationService):
def translate(self, text):
translator = Translator()
return translator.translate(text, dest=self.destination_language).text
class GPTTranslationService(TranslationService):
def translate(self, text):
openai.api_key = self.api_key
prompt = f'Translate the following text to "{self.destination_language}" language:\n{text}'
response = openai.Completion.create(
engine='text-davinci-002',
prompt=prompt,
max_tokens=100,
n=1,
stop=None,
temperature=0.5,
)
return response.choices[0].text.strip()
class DeepLTranslationService(TranslationService):
def translate(self, text):
translator = DeepLTranslator(self.api_key)
return translator.translate_text(text, target_lang=self.destination_language)
def translation_service_factory(service, api_key=None, destination_language=None):
if service == 'google':
return GoogleTranslationService(api_key, destination_language)
elif service == 'gpt':
return GPTTranslationService(api_key, destination_language)
elif service == 'deepl':
return DeepLTranslationService(api_key, destination_language)
else:
raise ValueError(f'Invalid service: {service}')
def process_frame(chunk_tuple, translator_instance, ws, destination_to_translation_col):
index, chunk = chunk_tuple
print(f'Translating chunk {index+1}...')
translated_chunk = []
for cell in chunk:
if cell.value:
# Split the source text into lines
source_lines = cell.value.split('\n')
# Translate each line separately
translated_lines = [translator_instance.translate(line) for line in source_lines]
# Try to re-wrap the translated lines to match the source lines
wrapped_translated_lines = []
for source_line, translated_line in zip(source_lines, translated_lines):
if len(source_line) > 0: # Only apply textwrap if the line is not empty
wrapped_translated_line = textwrap.wrap(translated_line, width=len(source_line))
wrapped_translated_lines.extend(wrapped_translated_line)
else:
wrapped_translated_lines.append('') # Append an empty line if the source line was empty
# Join the lines back together
cell_translation = '\n'.join(wrapped_translated_lines)
translated_chunk.append(cell_translation)
else:
translated_chunk.append(ws[destination_to_translation_col][index].value)
return index, translated_chunk
def find_column_letter(column_name, ws):
for cell in ws[1]:
if cell.value == column_name:
return cell.column_letter
return None
if __name__ == '__main__':
try:
# Parse command-line arguments
args = parse_arguments()
translator_service = args.service
source_to_translation = args.source
destination_to_translation = args.dest
# Extract the destination language from the column name
destination_language = destination_to_translation.split('-')[0]
# Check if the API key is required and available
api_key = None
if translator_service == 'gpt' or translator_service == 'deepl':
api_key_env_var = 'OPENAI_API_KEY' if translator_service == 'gpt' else 'DEEPL_API_KEY'
try:
api_key = os.environ[api_key_env_var]
except KeyError:
print(f'Error: {translator_service.upper()} translation requires the {api_key_env_var} environment variable.')
exit(1)
# Read Excel file
wb = load_workbook(my_excel)
ws = wb[my_excel_sheet_name]
# Find column letters for source and destination
source_to_translation_col = find_column_letter(source_to_translation, ws)
destination_to_translation_col = find_column_letter(destination_to_translation, ws)
if not source_to_translation_col or not destination_to_translation_col:
print('Could not find column names.')
exit(1)
# Split data into chunks
row_count = ws.max_row - 1
chunk_size = row_count // n_processes
data_chunks = [(i, ws[source_to_translation_col][i*chunk_size+1:(i+1)*chunk_size+1]) for i in range(n_processes)]
# Instantiate the translator
translator_instance = translation_service_factory(translator_service, api_key, destination_language)
# Use multiprocessing to translate chunks
pool = mp.Pool(n_processes)
result_list = pool.starmap(process_frame, [(chunk_tuple, translator_instance, ws, destination_to_translation_col) for chunk_tuple in data_chunks])
pool.close()
pool.join()
# Sort results by index
result_list.sort(key=lambda x: x[0])
# Write translations back to the worksheet
for index, chunk in result_list:
for idx, cell in enumerate(chunk):
ws[f'{destination_to_translation_col}{index * chunk_size + idx + 2}'].value = cell
# Save the workbook
if os.path.exists(result_excel):
print(f'Removed file {result_excel}')
wb.save(result_excel)
print(f'Created new file {result_excel}')
print('Translating finished!')
except Exception as e:
print(f'An error occurred: {e}')