Skip to content

Commit

Permalink
fix: close #13
Browse files Browse the repository at this point in the history
  • Loading branch information
notdodo committed Sep 21, 2024
1 parent 11ba0e3 commit a9c1979
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 80 deletions.
4 changes: 2 additions & 2 deletions app/erfiume/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

from .apis import Stazione, Valore, enrich_data, fetch_latest_time, fetch_stations_data
from .logging import logger
from .storage import DynamoClient
from .storage import AsyncDynamoDB
from .tgbot import bot

__all__ = [
"AsyncDynamoDB",
"Stazione",
"Valore",
"DynamoClient",
"enrich_data",
"fetch_latest_time",
"fetch_stations_data",
Expand Down
74 changes: 37 additions & 37 deletions app/erfiume/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,76 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Self

import aioboto3
from boto3.dynamodb.conditions import Key
import aioboto3.resources
from botocore.exceptions import ClientError

from .apis import Stazione
from .logging import logger

if TYPE_CHECKING:
from types_aiobotocore_dynamodb import DynamoDBServiceResource
from types import TracebackType


class DynamoClient:
class AsyncDynamoDB:
"""
Asynchronous DynamoDB client that can be used for various operations on DynamoDB tables.
This class is designed to be instantiated and used in other asynchronous methods.
"""

def __init__(self, client: DynamoDBServiceResource):
"""
Wrap the class in async context.
"""
self.client = client

@classmethod
async def create(cls) -> DynamoClient:
"""
Factory method to initialize the DynamoDB client.
This method is asynchronous and sets up the connection based on environment.
"""
def __init__(self, table_name: str) -> None:
environment = os.getenv("ENVIRONMENT", "staging")
session = aioboto3.Session()
self.endpoint_url = (
"http://localhost:4566" if environment != "production" else None
)
self.table_name = table_name

async def __aenter__(self) -> Self:
"""Set up the client and table."""
self.session = aioboto3.Session()
self.dynamodb = await self.session.resource(
service_name="dynamodb",
endpoint_url=self.endpoint_url,
).__aenter__()
self.table = await self.dynamodb.Table(self.table_name)
return self

async with session.resource(
"dynamodb",
endpoint_url=(
"http://localhost:4566" if environment != "production" else None
),
) as client:
return cls(client)
async def __aexit__(
self,
exc_type: type[Exception] | None, # noqa: PYI036
exc_val: Exception | None, # noqa: PYI036
exc_tb: TracebackType | None,
) -> None:
"""Close the client on exit."""
await self.dynamodb.__aexit__(exc_type, exc_val, exc_tb)

async def check_and_update_stazioni(self, station: Stazione) -> None:
"""
Check if the station data in DynamoDB is outdated compared to the given station object.
If outdated or non-existent, update it with the new data.
"""
try:
table = await self.client.Table("Stazioni")
response = await table.query(
KeyConditionExpression=Key("nomestaz").eq(station.nomestaz),
response = await self.table.get_item(
Key={"nomestaz": station.nomestaz},
)

# Get the latest timestamp from the DynamoDB response
latest_timestamp = (
int(response["Items"][0].get("timestamp")) # type: ignore[arg-type]
if response["Count"] > 0
int(response["Item"].get("timestamp")) # type: ignore[arg-type]
if response["Item"]
else 0
)

# If the provided station has newer data or the record doesn't exist, update DynamoDB
if station.timestamp > latest_timestamp or response["Count"] == 0:
if station.timestamp > latest_timestamp or not response["Item"]:
logger.info(
"Updating data for station %s (%s)",
station.nomestaz,
station.idstazione,
)
await table.put_item(Item=station.to_dict())
await self.table.put_item(Item=station.to_dict())
except ClientError as e:
logger.exception(
"Error while checking or updating station %s: %s", station.nomestaz, e
Expand All @@ -88,14 +90,12 @@ async def get_matching_station(self, station_name: str) -> Stazione | None:
Returns the station data as a dictionary, or None if not found.
"""
try:
table = await self.client.Table("Stazioni")
response = await table.query(
Limit=1,
KeyConditionExpression=Key("nomestaz").eq(station_name),
stazione = await self.table.get_item(
Key={"nomestaz": station_name},
)

if response["Count"] > 0:
return Stazione(**response["Items"][0]) # type: ignore[arg-type]
if stazione["Item"]:
return Stazione(**stazione["Item"]) # type: ignore[arg-type]
logger.info("Station %s not found in DynamoDB.", station_name)
except ClientError as e:
logger.exception("Error while retrieving station %s: %s", station_name, e)
Expand Down
48 changes: 21 additions & 27 deletions app/erfiume/tgbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
from __future__ import annotations

import json
import os
from datetime import datetime
from inspect import cleandoc
from typing import TYPE_CHECKING, Any

import boto3
from telegram import Update
from telegram.ext import (
Application,
Expand All @@ -24,21 +22,17 @@
if TYPE_CHECKING:
from .apis import Stazione

from aws_lambda_powertools.utilities import parameters

from .logging import logger
from .storage import DynamoClient
from .storage import AsyncDynamoDB


async def fetch_bot_token() -> str:
"""
Fetch the Telegram Bot token from AWS SM
"""
environment = os.getenv("ENVIRONMENT", "staging")
return boto3.client(
service_name="secretsmanager",
endpoint_url=("http://localhost:4566" if environment != "production" else None),
).get_secret_value(
SecretId="telegram-bot-token",
)["SecretString"]
return parameters.get_secret("telegram-bot-token")


async def start(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
Expand Down Expand Up @@ -84,15 +78,15 @@ def create_station_message(station: Stazione) -> str:

async def cesena(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /cesena is issued."""
db_client = await DynamoClient.create()
stazione = await db_client.get_matching_station("Cesena")
if stazione:
if update.message:
await update.message.reply_html(create_station_message(stazione))
elif update.message:
await update.message.reply_html(
"Nessun stazione trovata!",
)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station("Cesena")
if stazione:
if update.message:
await update.message.reply_html(create_station_message(stazione))
elif update.message:
await update.message.reply_html(
"Nessun stazione trovata!",
)


async def handle_private_message(
Expand All @@ -109,14 +103,14 @@ async def handle_private_message(
)
if update.message and update.effective_chat and update.message.text:
logger.info("Received private message: %s", update.message.text)
db_client = await DynamoClient.create()
stazione = await db_client.get_matching_station(update.message.text)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message,
)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station(update.message.text)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message,
)


async def bot(event: dict[str, Any]) -> None:
Expand Down
9 changes: 5 additions & 4 deletions app/erfiume_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from aws_lambda_powertools.utilities.typing import LambdaContext

from erfiume import (
DynamoClient,
AsyncDynamoDB,
enrich_data,
fetch_latest_time,
fetch_stations_data,
Expand All @@ -25,14 +25,15 @@ async def update() -> None:
"""
Run main.
"""
db_client = await DynamoClient.create()
async with httpx.AsyncClient() as http_client:
async with httpx.AsyncClient() as http_client, AsyncDynamoDB(
table_name="Stazioni"
) as dynamo:
try:
latest_time = await fetch_latest_time(http_client)
stations = await fetch_stations_data(http_client, latest_time)
await enrich_data(http_client, stations)
for stazione in stations:
await db_client.check_and_update_stazioni(stazione)
await dynamo.check_and_update_stazioni(stazione)
except httpx.HTTPStatusError as e:
logger.exception("HTTP error occurred: %d", e.response.status_code)
except httpx.ConnectTimeout:
Expand Down
2 changes: 1 addition & 1 deletion app/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ output-format = "github"

[tool.ruff.lint]
select = ["ALL"]
ignore = ["D2", "D4", "ANN", "COM812", "ISC001"]
ignore = ["D2", "D4", "ANN", "COM812", "ISC001", "D105", "D107"]
fixable = ["ALL"]
unfixable = []

Expand Down
9 changes: 5 additions & 4 deletions app/standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import httpx

from erfiume import (
DynamoClient,
AsyncDynamoDB,
bot,
enrich_data,
fetch_latest_time,
Expand All @@ -22,15 +22,16 @@ async def update() -> None:
"""
Run main.
"""
db_client = await DynamoClient.create()
async with httpx.AsyncClient() as http_client:
async with httpx.AsyncClient() as http_client, AsyncDynamoDB(
table_name="Stazioni"
) as dynamo:
while True:
try:
latest_time = await fetch_latest_time(http_client)
stations = await fetch_stations_data(http_client, latest_time)
await enrich_data(http_client, stations)
for stazione in stations:
await db_client.check_and_update_stazioni(stazione)
await dynamo.check_and_update_stazioni(stazione)
except httpx.HTTPStatusError as e:
logger.exception("HTTP error occurred: %d", e.response.status_code)
except httpx.ConnectTimeout:
Expand Down
7 changes: 2 additions & 5 deletions pulumi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@
name="Stazioni",
billing_mode="PAY_PER_REQUEST",
hash_key="nomestaz",
range_key="ordinamento",
attributes=[
dynamodb.TableAttributeArgs(
name="nomestaz",
type="S",
),
dynamodb.TableAttributeArgs(
name="ordinamento",
type="N",
),
],
)

Expand Down Expand Up @@ -72,6 +67,7 @@
"Actions": [
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:GetItem",
],
"Resources": [stazioni_table.arn],
}
Expand Down Expand Up @@ -110,6 +106,7 @@
"Effect": "Allow",
"Actions": [
"dynamodb:Query",
"dynamodb:GetItem",
],
"Resources": [
f"arn:aws:dynamodb:eu-west-1:{get_caller_identity().account_id}:table/Stazioni"
Expand Down

0 comments on commit a9c1979

Please sign in to comment.