Skip to content

Commit

Permalink
feat: add two kind of workers: worker_high and worker_low
Browse files Browse the repository at this point in the history
for high and low priority jobs
  • Loading branch information
raphael0202 committed Dec 9, 2022
1 parent 6c3184e commit f5eddf0
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 21 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -173,26 +173,26 @@ health:
i18n-compile:
@echo "🥫 Compiling translations …"
# Note it's important to have --no-deps, to avoid launching a concurrent postgres instance
${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker -c "cd i18n && . compile.sh"
${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker_high -c "cd i18n && . compile.sh"

unit-tests:
@echo "🥫 Running tests …"
# run tests in worker to have more memory
# also, change project name to run in isolation
${DOCKER_COMPOSE_TEST} run --rm worker poetry run pytest --cov-report xml --cov=robotoff tests/unit
${DOCKER_COMPOSE_TEST} run --rm worker_high poetry run pytest --cov-report xml --cov=robotoff tests/unit

integration-tests:
@echo "🥫 Running integration tests …"
# run tests in worker to have more memory
# also, change project name to run in isolation
${DOCKER_COMPOSE_TEST} run --rm worker poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration
${DOCKER_COMPOSE_TEST} run --rm worker_high poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration
( ${DOCKER_COMPOSE_TEST} down -v || true )

# interactive testings
# usage: make pytest args='test/unit/my-test.py --pdb'
pytest: guard-args
@echo "🥫 Running test: ${args}"
${DOCKER_COMPOSE_TEST} run --rm worker poetry run pytest ${args}
${DOCKER_COMPOSE_TEST} run --rm worker_high poetry run pytest ${args}

#------------#
# Production #
Expand Down
27 changes: 19 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,33 @@ services:
<<: *robotoff-base
environment: *robotoff-base-env
mem_limit: 2g
depends_on:
- worker
ports:
- "${ROBOTOFF_EXPOSE:-5500}:5500"
networks:
- webnet

worker:
worker_high:
<<: *robotoff-base
deploy:
mode: replicated
replicas: 8
command: poetry run robotoff-cli run-worker
environment:
<<: *robotoff-base-env
REAL_TIME_IMAGE_PREDICTION: 1
replicas: 6
command: poetry run robotoff-cli run-worker robotoff-high
environment: *robotoff-base-env
depends_on:
- postgres
mem_limit: 8g
networks:
- webnet
extra_hosts:
- host.docker.internal:host-gateway

worker_low:
<<: *robotoff-base
deploy:
mode: replicated
replicas: 2
command: poetry run robotoff-cli run-worker robotoff-low robotoff-high
environment: *robotoff-base-env
depends_on:
- postgres
mem_limit: 8g
Expand Down
9 changes: 8 additions & 1 deletion docker/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ services:
- robotoff.openfoodfacts.localhost
- api
webnet:
worker:
worker_high:
<<: *robotoff-dev
<<: *networks-productopener-local
deploy:
mode: replicated
# Only 1 replica is easier to deal with for local dev
replicas: 1
worker_low:
<<: *robotoff-dev
<<: *networks-productopener-local
deploy:
Expand Down
13 changes: 11 additions & 2 deletions robotoff/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import pathlib
import sys
from pathlib import Path
Expand All @@ -19,15 +20,23 @@ def run_scheduler():
scheduler.run()


class WorkerQueue(enum.Enum):
robotoff_high = "robotoff-high"
robotoff_low = "robotoff-low"


@app.command()
def run_worker(
queues: list[WorkerQueue] = typer.Argument(
..., help="Names of the queues to listen to"
),
burst: bool = typer.Option(
False, help="Run in burst mode (quit after all work is done)"
)
),
):
from robotoff.workers.main import run

run(burst=burst)
run(queues=[x.value for x in queues], burst=burst)


@app.command()
Expand Down
6 changes: 3 additions & 3 deletions robotoff/workers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from robotoff import settings
from robotoff.models import with_db
from robotoff.utils import get_logger
from robotoff.workers.queues import queue_names, redis_conn
from robotoff.workers.queues import redis_conn

logger = get_logger()
settings.init_sentry()
Expand Down Expand Up @@ -41,11 +41,11 @@ def run_maintenance_tasks(self):
load_resources(refresh=True)


def run(burst: bool = False):
def run(queues: list[str], burst: bool = False):
load_resources()
try:
with Connection(connection=redis_conn):
w = CustomWorker(queues=queue_names)
w = CustomWorker(queues=queues)
w.work(logging_level="INFO", burst=burst)
except ConnectionError as e:
print(e)
Expand Down
12 changes: 9 additions & 3 deletions robotoff/workers/queues.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import threading
import time
from typing import Callable, Optional
Expand All @@ -7,9 +8,14 @@

from robotoff.redis import redis_conn

queue_names = ["robotoff-high", "robotoff-low"]
high_queue = Queue("robotoff-high", connection=redis_conn)
low_queue = Queue("robotoff-low", connection=redis_conn)

class AvailableQueue(enum.Enum):
robotoff_high = "robotoff-high"
robotoff_low = "robotoff-low"


high_queue = Queue(AvailableQueue.robotoff_high.value, connection=redis_conn)
low_queue = Queue(AvailableQueue.robotoff_low.value, connection=redis_conn)


def enqueue_in_job(
Expand Down

0 comments on commit f5eddf0

Please sign in to comment.