Skip to content

Commit

Permalink
feat: sleep now happens in thread (#653)
Browse files Browse the repository at this point in the history
Avoid having a worker stalled for x seconds. Important in case of massive updates
  • Loading branch information
alexgarel authored Mar 18, 2022
1 parent a40d755 commit d4489cd
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
13 changes: 11 additions & 2 deletions robotoff/app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,10 @@ def on_post(self, req: falcon.Request, resp: falcon.Response):


class WebhookProductResource:
"""This handles requests from product opener
that act as webhooks on product update or deletion.
"""

def on_post(self, req: falcon.Request, resp: falcon.Response):
barcode = req.get_param("barcode", required=True)
action = req.get_param("action", required=True)
Expand All @@ -810,7 +814,6 @@ def on_post(self, req: falcon.Request, resp: falcon.Response):
"New webhook event received for product {} (action: {}, "
"domain: {})".format(barcode, action, server_domain)
)

if action not in ("updated", "deleted"):
raise falcon.HTTPBadRequest(
title="invalid_action",
Expand All @@ -819,7 +822,13 @@ def on_post(self, req: falcon.Request, resp: falcon.Response):

if action == "updated":
send_ipc_event(
"product_updated", {"barcode": barcode, "server_domain": server_domain}
"product_updated",
{
"barcode": barcode,
"server_domain": server_domain,
# add some latency
"task_delay": settings.UPDATED_PRODUCT_WAIT,
},
)

elif action == "deleted":
Expand Down
24 changes: 21 additions & 3 deletions robotoff/workers/listener.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import threading
import time
from multiprocessing.connection import Listener
from multiprocessing.pool import Pool
from typing import Dict
Expand All @@ -13,7 +15,17 @@
logger = get_logger()


def send_task_to_pool(pool, event_type, event_kwargs, delay):
"""Simply pass the task to a worker in the pool, while eventually applying a delay"""
if delay:
time.sleep(delay)
logger.debug("Sending task to pool...")
pool.apply_async(run_task, (event_type, event_kwargs))
logger.debug("Task sent")


def run():
"""This is the event listener, it will receive task requests and launch them"""
pool: Pool = Pool(settings.WORKER_COUNT, maxtasksperchild=30)

logger.info("Starting listener server on {}:{}".format(*settings.IPC_ADDRESS))
Expand All @@ -32,8 +44,14 @@ def run():
logger.info(f"New '{event_type}' event received")
event_kwargs: Dict = event.get("meta", {})

logger.debug("Sending task to pool...")
pool.apply_async(run_task, (event_type, event_kwargs))
logger.debug("Task sent")
delay = event_kwargs.pop("task_delay", None)
args = [pool, event_type, event_kwargs, delay]
if delay:
# we have a delay, so spend it in a thread instead of listener main thread
threading.Thread(target=send_task_to_pool, args=args).start()
else:
# direct call, it's fast
send_task_to_pool(*args)

except Exception:
capture_exception()
4 changes: 0 additions & 4 deletions robotoff/workers/tasks/product_updated.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import time

import requests

from robotoff import settings
from robotoff.elasticsearch.category.predict import (
predict_from_product as predict_category_from_product_es,
)
Expand All @@ -25,7 +22,6 @@ def update_insights(barcode: str, server_domain: str):
# to finish
logger.info(f"Running `update_insights` for product {barcode} ({server_domain})")

time.sleep(settings.UPDATED_PRODUCT_WAIT)
product_dict = get_product(barcode)

if product_dict is None:
Expand Down

0 comments on commit d4489cd

Please sign in to comment.