Skip to content

Commit

Permalink
Implementação da funcionalidade de agregação de diários oficiais por …
Browse files Browse the repository at this point in the history
…ano e município/estado (#78)

Implementação da rotina de agregação, responsável por fazer a extração
dos conteúdos txt de todos os diários oficiais de um município em um
período de um ano e transformá-lo em um arquivo XML para compactação em
formato ZIP e disponibilização no sistema de arquivos. Também
responsável por juntar todos os arquivos XML de todos os municípios de
um determinado estado em um período de um ano para agregação e
compactação em formato ZIP para disponibilização no sistema de arquivos.
Após as agregações e armazenagem dos arquivos ZIP, são disponibilizados
meta dados dos arquivos no banco de dados para buscas otimizadas na API.
  • Loading branch information
ogecece authored Aug 16, 2024
2 parents 119ca53 + 5593ac8 commit 5535062
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 15 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,17 @@ wait-opensearch:
publish-tag:
podman tag $(IMAGE_NAMESPACE)/$(IMAGE_NAME):${IMAGE_TAG} $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags)
podman push $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags)

.PHONY: stop-aggregate-gazettes
stop-aggregate-gazettes:
podman stop --ignore agg-gazettes
podman rm --force --ignore agg-gazettes

.PHONY: aggregate-gazettes
aggregate-gazettes: stop-aggregate-gazettes set-run-variable-values
podman run -ti --volume $(CURDIR):/mnt/code:rw \
--pod $(POD_NAME) \
--env PYTHONPATH=/mnt/code \
--env-file envvars \
--name agg-gazettes \
$(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(IMAGE_TAG) python main -p aggregates
2 changes: 2 additions & 0 deletions contrib/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket

# Options: ALL, DAILY, UNPROCESSED
EXECUTION_MODE=DAILY

SEED_HASH=querido-diario
31 changes: 27 additions & 4 deletions main/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from os import environ
import argparse
import logging

from data_extraction import create_apache_tika_text_extraction
from database import create_database_interface
from storage import create_storage_interface
from index import create_index_interface
from tasks import (
create_aggregates,
create_gazettes_index,
create_aggregates_table,
create_themed_excerpts_index,
embedding_rerank_excerpts,
extract_text_from_gazettes,
Expand Down Expand Up @@ -35,9 +38,7 @@ def get_execution_mode():
return environ.get("EXECUTION_MODE", "DAILY")


def execute_pipeline():
enable_debug_if_necessary()

def gazette_texts_pipeline():
execution_mode = get_execution_mode()
database = create_database_interface()
storage = create_storage_interface()
Expand All @@ -61,5 +62,27 @@ def execute_pipeline():
tag_entities_in_excerpts(theme, themed_excerpt_ids, index)


def aggregates_pipeline():
database = create_database_interface()
storage = create_storage_interface()

create_aggregates_table(database)
create_aggregates(database, storage)


def execute_pipeline(pipeline):
enable_debug_if_necessary()

if not pipeline or pipeline == "gazette_texts":
gazette_texts_pipeline()
elif pipeline == "aggregates":
aggregates_pipeline()
else:
raise ValueError("Pipeline inválido.")


if __name__ == "__main__":
execute_pipeline()
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--pipeline", help="Qual pipeline deve ser executado.")
args = parser.parse_args()
execute_pipeline(args.pipeline)
35 changes: 27 additions & 8 deletions storage/digital_ocean_spaces.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import os
from typing import Generator
from typing import Generator, Union
from io import BytesIO
from pathlib import Path

import boto3

Expand Down Expand Up @@ -68,18 +69,36 @@ def __init__(
aws_secret_access_key=self._access_secret,
)

def get_file(self, file_key: str, destination) -> None:
logging.debug(f"Getting {file_key}")
self._client.download_fileobj(self._bucket, file_key, destination)
def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None:
logging.debug(f"Getting {file_to_be_downloaded}")
self._client.download_fileobj(self._bucket, str(file_to_be_downloaded), destination)

def upload_content(
self,
file_key: str,
content_to_be_uploaded: str,
content_to_be_uploaded: Union[str, BytesIO],
permission: str = "public-read",
) -> None:
logging.debug(f"Uploading {file_key}")
f = BytesIO(content_to_be_uploaded.encode())
self._client.upload_fileobj(
f, self._bucket, file_key, ExtraArgs={"ACL": permission}

if isinstance(content_to_be_uploaded, str):
f = BytesIO(content_to_be_uploaded.encode())
self._client.upload_fileobj(
f, self._bucket, file_key, ExtraArgs={"ACL": permission}
)
else:
self._client.upload_fileobj(
content_to_be_uploaded, self._bucket, file_key, ExtraArgs={"ACL": permission}
)

def copy_file(self, source_file_key: str, destination_file_key: str) -> None:
logging.debug(f"Copying {source_file_key} to {destination_file_key}")
self._client.copy_object(
Bucket=self._bucket,
CopySource={'Bucket': self._bucket, 'Key': source_file_key},
Key=destination_file_key
)

def delete_file(self, file_key: str) -> None:
logging.debug(f"Deleting {file_key}")
self._client.delete_object(Bucket=self._bucket, Key=file_key)
2 changes: 2 additions & 0 deletions tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .create_index import create_gazettes_index, create_themed_excerpts_index
from .create_aggregates_table import create_aggregates_table
from .gazette_excerpts_embedding_reranking import embedding_rerank_excerpts
from .gazette_excerpts_entities_tagging import tag_entities_in_excerpts
from .gazette_text_extraction import extract_text_from_gazettes
from .gazette_themed_excerpts_extraction import extract_themed_excerpts_from_gazettes
from .gazette_themes_listing import get_themes
from .gazette_txt_to_xml import create_aggregates
from .interfaces import (
DatabaseInterface,
StorageInterface,
Expand Down
18 changes: 18 additions & 0 deletions tasks/create_aggregates_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from .interfaces import DatabaseInterface


def create_aggregates_table(database: DatabaseInterface):
database._commit_changes(
"""
CREATE TABLE IF NOT EXISTS aggregates (
id SERIAL PRIMARY KEY ,
territory_id VARCHAR,
state_code VARCHAR NOT NULL,
year INTEGER,
file_path VARCHAR(255) UNIQUE,
file_size_mb REAL,
hash_info VARCHAR(64),
last_updated TIMESTAMP
); """)


Loading

0 comments on commit 5535062

Please sign in to comment.