Skip to content

Commit

Permalink
feat: add events API requests (#677)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgarel authored Apr 14, 2022
1 parent 3248e7d commit 1f212fd
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 4 deletions.
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ STATIC_OFF_DOMAIN=https://openfoodfacts.net
ROBOTOFF_SCHEME=http
# for dev only on localhost
ROBOTOFF_EXPOSE=127.0.0.1:5500
EVENTS_API_URL=

# ElasticSearch
ELASTICSEARCH_HOSTS=elasticsearch.webnet
Expand All @@ -41,7 +42,7 @@ INFLUXDB_PASSWORD=admin123
# on linux, this will work if you have an influxdb listening on 0.0.0.0
# INFLUXDB_HOST=host.docker.internal

# MongoDB (dev setting, using product owner network)
# MongoDB (dev setting, using product opener network)
MONGO_URI=mongodb://mongodb.po_default:27017

# OpenFoodFacts API
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/container-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jobs:
echo "INFLUXDB_PASSWORD=${{ secrets.INFLUXDB_PASSWORD }}" >> .env
echo "SLACK_TOKEN=${{ secrets.SLACK_TOKEN }}" >> .env
echo "GUNICORN_NUM_WORKERS=8"
echo "EVENTS_API_URL=https://event.${{ secrets.ROBOTOFF_DOMAIN }}" >> .env
- name: Create Docker volumes
Expand Down
37 changes: 36 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ types-protobuf = "^3.17.3"
types-certifi = "^2021.10.8"
types-setuptools = "^57.4.8"
types-toml = "^0.10.3"
pytest-httpserver = "^1.0.4"

[tool.poetry.scripts]
robotoff-cli = 'robotoff.cli.main:main'
1 change: 0 additions & 1 deletion robotoff/app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,6 @@ def on_post(self, req: falcon.Request, resp: falcon.Response):
barcode = req.get_param("barcode", required=True)
action = req.get_param("action", required=True)
server_domain = req.get_param("server_domain", required=True)

if server_domain != settings.OFF_SERVER_DOMAIN:
logger.info("Rejecting webhook event from {}".format(server_domain))
resp.media = {
Expand Down
8 changes: 7 additions & 1 deletion robotoff/app/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import peewee

from robotoff import settings
from robotoff.app import events
from robotoff.insights.annotate import (
ALREADY_ANNOTATED_RESULT,
SAVED_ANNOTATION_VOTE_RESULT,
Expand Down Expand Up @@ -208,4 +209,9 @@ def save_annotation(
return SAVED_ANNOTATION_VOTE_RESULT

annotator = InsightAnnotatorFactory.get(insight.type)
return annotator.annotate(insight, annotation, update, data=data, auth=auth)
result = annotator.annotate(insight, annotation, update, data=data, auth=auth)
username = auth.get_username() if auth else "unknown annotator"
events.event_processor.send_async(
"question_answered", username, device_id, insight.barcode
)
return result
62 changes: 62 additions & 0 deletions robotoff/app/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from multiprocessing import Process, SimpleQueue

import requests

from robotoff import settings
from robotoff.utils import get_logger

logger = get_logger(__name__)


class EventProcessor:
"""Send events in an outside process"""

# the process and queue to send events
process = None
queue = None

def get(self):
"""Start a process to handle events, but only when needed,
and return communication pipe
"""
if self.process is None:
self.queue = SimpleQueue()
# Create a daemonic process
self.process = Process(target=send_events, args=(self.queue,), daemon=True)
self.process.start()
return self.queue

def send_async(self, *args, **kwargs):
if settings.EVENTS_API_URL:
queue = self.get()
queue.put((settings.EVENTS_API_URL, args, kwargs))


# a singleton for event processor
event_processor = EventProcessor()


def send_events(queue):
"""Loop to send events in a specific process"""
while True:
api_url, args, kwargs = queue.get()
send_event(api_url, *args, **kwargs)


def send_event(
api_url: str,
event_type: str,
user_id: str,
device_id: str,
barcode: str = None,
):
event = {
"event_type": event_type,
"user_id": user_id,
"device_id": device_id,
"barcode": barcode,
}
logger.debug(f"Event: {event}")
response = requests.post(api_url, json=event)
logger.debug(f"Event API response: {response}")
return response
3 changes: 3 additions & 0 deletions robotoff/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def off_credentials() -> Dict[str, str]:


OFF_SERVER_DOMAIN = "api." + BaseURLProvider().domain
EVENTS_API_URL = os.environ.get(
"EVENTS_API_URL", "https://events." + BaseURLProvider().domain
)

# Taxonomies are huge JSON files that describe many concepts in OFF, in many languages, with synonyms. Those are the full version of taxos.

Expand Down
32 changes: 32 additions & 0 deletions tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pytest
from falcon import testing

from robotoff import settings
from robotoff.app import events
from robotoff.app.api import api
from robotoff.models import AnnotationVote, ProductInsight

Expand Down Expand Up @@ -354,3 +356,33 @@ def test_annotate_insight_majority_vote_overridden(client):
# The insight should be annoted with '0', with a None username since this was resolved with an
# anonymous vote.
assert insight.items() > {"annotation": 0, "username": None, "n_votes": 5}.items()


def test_annotation_event(client, monkeypatch, httpserver):
"""Test that annotation sends an event"""
monkeypatch.setattr(settings, "EVENTS_API_URL", httpserver.url_for("/"))
# setup a new event_processor, to be sure settings is taken into account
monkeypatch.setattr(events, "event_processor", events.EventProcessor())
# We expect to have a call to events server
expected_event = {
"event_type": "question_answered",
"user_id": "a",
"device_id": "test-device",
"barcode": "1",
}
httpserver.expect_oneshot_request(
"/", method="POST", json=expected_event
).respond_with_data("Done")
with httpserver.wait(raise_assertions=True, stop_on_nohandler=True, timeout=2):
result = client.simulate_post(
"/api/v1/insights/annotate",
params={
"insight_id": insight_id,
"annotation": -1,
"device_id": "test-device",
},
headers={
"Authorization": "Basic " + base64.b64encode(b"a:b").decode("ascii")
},
)
assert result.status_code == 200

0 comments on commit 1f212fd

Please sign in to comment.