diff --git a/Makefile b/Makefile index 7933d28..297651c 100644 --- a/Makefile +++ b/Makefile @@ -265,3 +265,17 @@ wait-opensearch: publish-tag: podman tag $(IMAGE_NAMESPACE)/$(IMAGE_NAME):${IMAGE_TAG} $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags) podman push $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags) + +.PHONY: stop-aggregate-gazettes +stop-aggregate-gazettes: + podman stop --ignore agg-gazettes + podman rm --force --ignore agg-gazettes + +.PHONY: aggregate-gazettes +aggregate-gazettes: stop-aggregate-gazettes set-run-variable-values + podman run -ti --volume $(CURDIR):/mnt/code:rw \ + --pod $(POD_NAME) \ + --env PYTHONPATH=/mnt/code \ + --env-file envvars \ + --name agg-gazettes \ + $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(IMAGE_TAG) python main -p aggregates diff --git a/contrib/sample.env b/contrib/sample.env index 904214d..9ad30a2 100644 --- a/contrib/sample.env +++ b/contrib/sample.env @@ -23,3 +23,5 @@ QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket # Options: ALL, DAILY, UNPROCESSED EXECUTION_MODE=DAILY + +SEED_HASH=querido-diario diff --git a/main/__main__.py b/main/__main__.py index e8dd5ec..5f9951c 100644 --- a/main/__main__.py +++ b/main/__main__.py @@ -1,4 +1,5 @@ from os import environ +import argparse import logging from data_extraction import create_apache_tika_text_extraction @@ -6,7 +7,9 @@ from storage import create_storage_interface from index import create_index_interface from tasks import ( + create_aggregates, create_gazettes_index, + create_aggregates_table, create_themed_excerpts_index, embedding_rerank_excerpts, extract_text_from_gazettes, @@ -35,9 +38,7 @@ def get_execution_mode(): return environ.get("EXECUTION_MODE", "DAILY") -def execute_pipeline(): - enable_debug_if_necessary() - +def gazette_texts_pipeline(): execution_mode = get_execution_mode() database = create_database_interface() storage = create_storage_interface() @@ -61,5 +62,27 @@ def execute_pipeline(): tag_entities_in_excerpts(theme, themed_excerpt_ids, index) +def aggregates_pipeline(): + database = create_database_interface() + storage = create_storage_interface() + + create_aggregates_table(database) + create_aggregates(database, storage) + + +def execute_pipeline(pipeline): + enable_debug_if_necessary() + + if not pipeline or pipeline == "gazette_texts": + gazette_texts_pipeline() + elif pipeline == "aggregates": + aggregates_pipeline() + else: + raise ValueError("Pipeline inválido.") + + if __name__ == "__main__": - execute_pipeline() + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--pipeline", help="Qual pipeline deve ser executado.") + args = parser.parse_args() + execute_pipeline(args.pipeline) diff --git a/storage/digital_ocean_spaces.py b/storage/digital_ocean_spaces.py index 3e18975..792abd5 100644 --- a/storage/digital_ocean_spaces.py +++ b/storage/digital_ocean_spaces.py @@ -1,7 +1,8 @@ import logging import os -from typing import Generator +from typing import Generator, Union from io import BytesIO +from pathlib import Path import boto3 @@ -68,18 +69,36 @@ def __init__( aws_secret_access_key=self._access_secret, ) - def get_file(self, file_key: str, destination) -> None: - logging.debug(f"Getting {file_key}") - self._client.download_fileobj(self._bucket, file_key, destination) + def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None: + logging.debug(f"Getting {file_to_be_downloaded}") + self._client.download_fileobj(self._bucket, str(file_to_be_downloaded), destination) def upload_content( self, file_key: str, - content_to_be_uploaded: str, + content_to_be_uploaded: Union[str, BytesIO], permission: str = "public-read", ) -> None: logging.debug(f"Uploading {file_key}") - f = BytesIO(content_to_be_uploaded.encode()) - self._client.upload_fileobj( - f, self._bucket, file_key, ExtraArgs={"ACL": permission} + + if isinstance(content_to_be_uploaded, str): + f = BytesIO(content_to_be_uploaded.encode()) + self._client.upload_fileobj( + f, self._bucket, file_key, ExtraArgs={"ACL": permission} + ) + else: + self._client.upload_fileobj( + content_to_be_uploaded, self._bucket, file_key, ExtraArgs={"ACL": permission} + ) + + def copy_file(self, source_file_key: str, destination_file_key: str) -> None: + logging.debug(f"Copying {source_file_key} to {destination_file_key}") + self._client.copy_object( + Bucket=self._bucket, + CopySource={'Bucket': self._bucket, 'Key': source_file_key}, + Key=destination_file_key ) + + def delete_file(self, file_key: str) -> None: + logging.debug(f"Deleting {file_key}") + self._client.delete_object(Bucket=self._bucket, Key=file_key) diff --git a/tasks/__init__.py b/tasks/__init__.py index 63fd625..643a257 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,9 +1,11 @@ from .create_index import create_gazettes_index, create_themed_excerpts_index +from .create_aggregates_table import create_aggregates_table from .gazette_excerpts_embedding_reranking import embedding_rerank_excerpts from .gazette_excerpts_entities_tagging import tag_entities_in_excerpts from .gazette_text_extraction import extract_text_from_gazettes from .gazette_themed_excerpts_extraction import extract_themed_excerpts_from_gazettes from .gazette_themes_listing import get_themes +from .gazette_txt_to_xml import create_aggregates from .interfaces import ( DatabaseInterface, StorageInterface, diff --git a/tasks/create_aggregates_table.py b/tasks/create_aggregates_table.py new file mode 100644 index 0000000..b28dc0e --- /dev/null +++ b/tasks/create_aggregates_table.py @@ -0,0 +1,18 @@ +from .interfaces import DatabaseInterface + + +def create_aggregates_table(database: DatabaseInterface): + database._commit_changes( + """ + CREATE TABLE IF NOT EXISTS aggregates ( + id SERIAL PRIMARY KEY , + territory_id VARCHAR, + state_code VARCHAR NOT NULL, + year INTEGER, + file_path VARCHAR(255) UNIQUE, + file_size_mb REAL, + hash_info VARCHAR(64), + last_updated TIMESTAMP + ); """) + + \ No newline at end of file diff --git a/tasks/gazette_txt_to_xml.py b/tasks/gazette_txt_to_xml.py new file mode 100644 index 0000000..80524ee --- /dev/null +++ b/tasks/gazette_txt_to_xml.py @@ -0,0 +1,254 @@ +import traceback +import xml.etree.cElementTree as ET +import logging +from datetime import datetime, timedelta +from io import BytesIO +from xml.dom import minidom +from zipfile import ZipFile, ZIP_DEFLATED +from pathlib import Path + +from .utils import hash_content, zip_needs_upsert, get_territory_slug +from .interfaces import StorageInterface,DatabaseInterface + +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) +br_timezone = timedelta(hours=-3) + + +def create_aggregates(database:DatabaseInterface, storage:StorageInterface): + """ + Create xml for all territories available in database + """ + logger.info("Agregando os arquivos TXT para XML de territórios e estados...") + + results_query_states = list(database.select("""SELECT + t.state_code AS code, + json_agg(json_build_object('id',t.id, 'name',t.name)) + FROM + territories t + WHERE + t.id in (SELECT DISTINCT + territory_id + FROM + gazettes + ) + GROUP BY + code + """ + )) + + for state, territories_list in results_query_states: + try: + create_aggregates_for_territories_and_states(territories_list, state, database, storage) + except Exception as e: + logger.error(f"Erro ao tentar processar municípios de {state}: {e}\n{traceback.format_exc()}") + + continue + + +def create_aggregates_for_territories_and_states(territories_list:list, state:str, database:DatabaseInterface, storage:StorageInterface): + """ + Create a .xml files for each year of gazettes for a territory + """ + + xml_files_dict = {} + arr_years_update = [] + + for territory in territories_list: + query_content_gazzetes_for_territory = list(database.select(f"""SELECT + date_part('Year', g.date) AS year, + json_agg(g.*) + FROM + gazettes g + WHERE + g.territory_id='{territory['id']}' + and g.processed=true + GROUP BY + year + """ + )) + + for year, list_gazzetes_content in query_content_gazzetes_for_territory: + year = str(int(year)) + + meta_xml = xml_content_generate(state, year, territory, list_gazzetes_content, storage) + + xml_files_dict.setdefault(year, []).append(meta_xml) + + territory_slug = get_territory_slug(meta_xml['territory_name'], state) + zip_path = f"aggregates/{meta_xml['state_code']}/{territory_slug}_{meta_xml['territory_id']}_{meta_xml['year']}.zip" + hx = hash_content(meta_xml['xml']) + + logger.debug(f"Content hash for xml file of {zip_path}: {hx}") + + need_update_territory_zip = zip_needs_upsert(hx, zip_path, database) + + if need_update_territory_zip: + if year not in arr_years_update: + arr_years_update.append(year) + create_zip_for_territory(hx, zip_path, meta_xml, database, storage) + + if arr_years_update: + create_zip_for_state(xml_files_dict, arr_years_update, state, database, storage) + + +def create_zip_for_state(xmls_years_dict:dict, arr_years_update:list, state_code, database:DatabaseInterface, storage:StorageInterface): + """ + Creating .zip files for the state with all its territories + """ + + for year in arr_years_update: + logger.info(f"Gerando ZIP do estado {state_code} no ano {year}") + + xmls = xmls_years_dict[year] + + zip_path = f"aggregates/{state_code}/{state_code}_{year}.zip" + + zip_buffer = BytesIO() + + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: + for xml_file in xmls: + territory_slug = get_territory_slug(xml_file['territory_name'], xml_file['state_code']) + zip_file.writestr(f"{territory_slug}_{xml_file['territory_id']}_{xml_file['year']}.xml", xml_file['xml']) + + zip_size = round(zip_buffer.getbuffer().nbytes / (1024 * 1024), 2) + zip_buffer.seek(0) + zip_buffer_copy = BytesIO(zip_buffer.getvalue()) + zip_buffer_copy.seek(0) + storage.upload_content(zip_path, zip_buffer) + + hx = hash_content(zip_buffer_copy.read()) + + logger.debug(f"Content hash for {zip_path}: {hx}") + + dict_query_info = { + "state_code" : state_code, + "territory_id" : None, + "file_path": zip_path, + "year": year, + "hash_info": hx, + "file_size_mb": zip_size, + "last_updated": datetime.utcnow() + br_timezone, + } + + database.insert("INSERT INTO aggregates \ + (territory_id, state_code, year, file_path, \ + file_size_mb, hash_info, last_updated) \ + VALUES (%(territory_id)s, %(state_code)s, \ + %(year)s, %(file_path)s, %(file_size_mb)s, \ + %(hash_info)s, %(last_updated)s) \ + ON CONFLICT(file_path) \ + DO UPDATE \ + SET state_code = EXCLUDED.state_code, last_updated=EXCLUDED.last_updated, \ + hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) + + zip_buffer.close() + + +def create_zip_for_territory(hx:str, zip_path:str, xml_file:dict, database:DatabaseInterface, storage:StorageInterface): + """ + Creating .zip files for the year's territories + """ + + logger.info(f"Gerando ZIP do municipio {xml_file['territory_name']}-{xml_file['territory_id']} no ano {xml_file['year']}") + + zip_buffer = BytesIO() + + with ZipFile(zip_buffer, 'w', ZIP_DEFLATED) as zip_file: + territory_slug = get_territory_slug(xml_file['territory_name'], xml_file['state_code']) + zip_file.writestr(f"{territory_slug}_{xml_file['territory_id']}_{xml_file['year']}.xml", xml_file['xml']) + + zip_size = round(zip_buffer.tell() / (1024 * 1024), 2) + zip_buffer.seek(0) + + try: + storage.upload_content(zip_path, zip_buffer) + except ClientError as e: + logger.error(f"Não foi possível fazer o upload do zip do município {xml_file['territory_id']}:\n{traceback.format_exc()}") + + zip_buffer.close() + + dict_query_info = { + "state_code" : xml_file['state_code'], + "territory_id" : xml_file['territory_id'], + "file_path": zip_path, + "year": xml_file['year'], + "hash_info": hx, + "file_size_mb": zip_size, + "last_updated": datetime.utcnow() + br_timezone, + } + + database.insert("INSERT INTO aggregates \ + (territory_id, state_code, year, file_path, \ + file_size_mb, hash_info, last_updated) \ + VALUES (%(territory_id)s, %(state_code)s, \ + %(year)s, %(file_path)s, %(file_size_mb)s, \ + %(hash_info)s, %(last_updated)s) \ + ON CONFLICT(file_path) \ + DO UPDATE \ + SET state_code=EXCLUDED.state_code, last_updated=EXCLUDED.last_updated, \ + hash_info=EXCLUDED.hash_info, file_size_mb=EXCLUDED.file_size_mb;", dict_query_info) + + zip_buffer.close() + + +def xml_content_generate(state:str, year:str, territory:dict, list_gazzetes_content:list, storage:StorageInterface): + """ + Generates xml file with gazzetes content + """ + + root = ET.Element("root") + xml_file = BytesIO() + + logger.info(f"Gerando XML para cidade {territory['name']}-{state} no ano {year}") + + meta_info_tag = ET.SubElement(root, "meta") + ET.SubElement(meta_info_tag, "uf").text = state + ET.SubElement(meta_info_tag, "ano_publicacao").text = str(year) + ET.SubElement(meta_info_tag, "municipio").text = territory['name'] + ET.SubElement(meta_info_tag, "municipio_codigo_ibge").text = territory['id'] + all_gazettes_tag = ET.SubElement(root, "diarios") + + for gazette in list_gazzetes_content: + file_gazette_txt = BytesIO() + path_arq_bucket = Path(gazette['file_path']).with_suffix(".txt") + + try: + storage.get_file(path_arq_bucket, file_gazette_txt) + except ClientError as e: + logger.warning(f"Erro na obtenção do conteúdo de texto do diário do territorio {path_arq_bucket}: {e}") + file_gazette_txt.close() + + continue + + gazette_tag = ET.SubElement(all_gazettes_tag, "diario") + meta_gazette = ET.SubElement(gazette_tag, "meta_diario") + ET.SubElement(meta_gazette, "url_arquivo_original").text = gazette['file_url'] + ET.SubElement(meta_gazette, "poder").text = gazette['power'] + ET.SubElement(meta_gazette, "edicao_extra").text = 'Sim' if gazette['is_extra_edition'] else 'Não' + ET.SubElement(meta_gazette, "numero_edicao").text = str(gazette['edition_number']) if str(gazette['edition_number']) is not None else "Não há" + ET.SubElement(meta_gazette, "data_publicacao").text = datetime.strftime((datetime.strptime(gazette['date'], "%Y-%m-%d").date()), "%d/%m") + ET.SubElement(gazette_tag, "conteudo").text = file_gazette_txt.getvalue().decode('utf-8') + + file_gazette_txt.close() + + # Format XML file + xml_str = ET.tostring(root, encoding='unicode') + format_xml = minidom.parseString(xml_str).toprettyxml(indent=" ") + xml_bytes = format_xml.encode('utf-8') + + xml_file.write(xml_bytes) + xml_file.seek(0) + + data = { + "xml":xml_file.getvalue(), + "territory_id":territory['id'], + "territory_name":territory['name'], + "state_code":state, + "year":year, + } + + xml_file.close() + + return data \ No newline at end of file diff --git a/tasks/interfaces.py b/tasks/interfaces.py index 06b81cb..444e59b 100644 --- a/tasks/interfaces.py +++ b/tasks/interfaces.py @@ -1,5 +1,7 @@ -from typing import Dict, Iterable, Tuple +from typing import Dict, Iterable, Tuple, Union +from pathlib import Path import abc +from io import BytesIO class DatabaseInterface(abc.ABC): @@ -45,17 +47,29 @@ class StorageInterface(abc.ABC): """ @abc.abstractmethod - def get_file(self, file_to_be_downloaded: str, destination) -> None: + def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None: """ Download the given file key in the destination on the host """ @abc.abstractmethod - def upload_content(self, file_key: str, content_to_be_uploaded: str) -> None: + def upload_content(self, file_key: str, content_to_be_uploaded: Union[str, BytesIO]) -> None: """ Upload the given content to the destination on the host """ + @abc.abstractmethod + def copy_file(self, source_file_key: str, destination_file_key: str) -> None: + """ + Copy the given source file to the destination place on the host + """ + + @abc.abstractmethod + def delete_file(self, file_key: str) -> None: + """ + Delete a file on the host. + """ + class IndexInterface(abc.ABC): """ diff --git a/tasks/utils/__init__.py b/tasks/utils/__init__.py index 129d85e..aa40fac 100644 --- a/tasks/utils/__init__.py +++ b/tasks/utils/__init__.py @@ -13,3 +13,9 @@ get_territory_slug, get_territory_data, ) +from .hash import ( + hash_content, +) +from .need_upsert import ( + zip_needs_upsert, +) diff --git a/tasks/utils/hash.py b/tasks/utils/hash.py new file mode 100644 index 0000000..f85536b --- /dev/null +++ b/tasks/utils/hash.py @@ -0,0 +1,11 @@ +import hashlib, os + + +def hash_content(content: bytes) -> str: + """ + Receives a content of byte format and returns its SHA-256 hash + """ + + result_hash = hashlib.sha256(content).hexdigest() + + return result_hash \ No newline at end of file diff --git a/tasks/utils/need_upsert.py b/tasks/utils/need_upsert.py new file mode 100644 index 0000000..5cc6a6b --- /dev/null +++ b/tasks/utils/need_upsert.py @@ -0,0 +1,18 @@ +from typing import Union +from ..interfaces import DatabaseInterface + + +def zip_needs_upsert(hx: Union[str, bytes], zip_path:str, database:DatabaseInterface): + """ + Verifies if zip need an upsert to the database (update or insert) + """ + + needs_update_or_inexists = True + + query_existing_aggregate = list(database.select(f"SELECT hash_info FROM aggregates \ + WHERE file_path='{zip_path}';")) + + if query_existing_aggregate: + needs_update_or_inexists = hx != query_existing_aggregate[0][0] + + return needs_update_or_inexists \ No newline at end of file