Skip to content

Commit

Permalink
Adjust checks for removal from existing_jobs dict + add more logging …
Browse files Browse the repository at this point in the history
…+ only one scheduled job for a connector at a time (#739)
  • Loading branch information
Weves authored Nov 19, 2023
1 parent 4fd55b8 commit b258ec1
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
30 changes: 30 additions & 0 deletions backend/danswer/background/indexing/dask_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time

import psutil
from dask.distributed import WorkerPlugin
from distributed import Worker

from danswer.utils.logger import setup_logger

logger = setup_logger()


class ResourceLogger(WorkerPlugin):
def __init__(self, log_interval: int = 60 * 5):
self.log_interval = log_interval

def setup(self, worker: Worker) -> None:
"""This method will be called when the plugin is attached to a worker."""
self.worker = worker
worker.loop.add_callback(self.log_resources)

def log_resources(self) -> None:
"""Periodically log CPU and memory usage."""
while True:
cpu_percent = psutil.cpu_percent(interval=None)
memory_available_gb = psutil.virtual_memory().available / (1024.0**3)
# You can now log these values or send them to a monitoring service
logger.debug(
f"Worker {self.worker.address}: CPU usage {cpu_percent}%, Memory available {memory_available_gb}GB"
)
time.sleep(self.log_interval)
55 changes: 43 additions & 12 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from distributed import LocalCluster
from sqlalchemy.orm import Session

from danswer.background.indexing.dask_utils import ResourceLogger
from danswer.background.indexing.job_client import SimpleJob
from danswer.background.indexing.job_client import SimpleJobClient
from danswer.background.indexing.run_indexing import run_indexing_entrypoint
from danswer.configs.app_configs import EXPERIMENTAL_SIMPLE_JOB_CLIENT_ENABLED
from danswer.configs.app_configs import LOG_LEVEL
from danswer.configs.app_configs import MODEL_SERVER_HOST
from danswer.configs.app_configs import NUM_INDEXING_WORKERS
from danswer.configs.model_configs import MIN_THREADS_ML_MODELS
Expand Down Expand Up @@ -44,26 +46,44 @@
)


"""Util funcs"""


def _get_num_threads() -> int:
"""Get # of "threads" to use for ML models in an indexing job. By default uses
the torch implementation, which returns the # of physical cores on the machine.
"""
return max(MIN_THREADS_ML_MODELS, torch.get_num_threads())


def should_create_new_indexing(
def _should_create_new_indexing(
connector: Connector, last_index: IndexAttempt | None, db_session: Session
) -> bool:
if connector.refresh_freq is None:
return False
if not last_index:
return True

# only one scheduled job per connector at a time
if last_index.status == IndexingStatus.NOT_STARTED:
return False

current_db_time = get_db_current_time(db_session)
time_since_index = current_db_time - last_index.time_updated
return time_since_index.total_seconds() >= connector.refresh_freq


def mark_run_failed(
def _is_indexing_job_marked_as_finished(index_attempt: IndexAttempt | None) -> bool:
if index_attempt is None:
return False

return (
index_attempt.status == IndexingStatus.FAILED
or index_attempt.status == IndexingStatus.SUCCESS
)


def _mark_run_failed(
db_session: Session, index_attempt: IndexAttempt, failure_reason: str
) -> None:
"""Marks the `index_attempt` row as failed + updates the `
Expand All @@ -89,6 +109,9 @@ def mark_run_failed(
)


"""Main funcs"""


def create_indexing_jobs(
db_session: Session, existing_jobs: dict[int, Future | SimpleJob]
) -> None:
Expand Down Expand Up @@ -118,7 +141,7 @@ def create_indexing_jobs(
continue

last_attempt = get_last_attempt(connector.id, credential.id, db_session)
if not should_create_new_indexing(connector, last_attempt, db_session):
if not _should_create_new_indexing(connector, last_attempt, db_session):
continue
create_index_attempt(connector.id, credential.id, db_session)

Expand All @@ -137,18 +160,20 @@ def cleanup_indexing_jobs(

# clean up completed jobs
for attempt_id, job in existing_jobs.items():
# do nothing for ongoing jobs
if not job.done():
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=attempt_id
)

# do nothing for ongoing jobs that haven't been stopped
if not job.done() and not _is_indexing_job_marked_as_finished(index_attempt):
continue

if job.status == "error":
logger.error(job.exception())

job.release()
del existing_jobs_copy[attempt_id]
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=attempt_id
)

if not index_attempt:
logger.error(
f"Unable to find IndexAttempt for ID '{attempt_id}' when cleaning "
Expand All @@ -157,7 +182,7 @@ def cleanup_indexing_jobs(
continue

if index_attempt.status == IndexingStatus.IN_PROGRESS or job.status == "error":
mark_run_failed(
_mark_run_failed(
db_session=db_session,
index_attempt=index_attempt,
failure_reason=_UNEXPECTED_STATE_FAILURE_REASON,
Expand All @@ -171,23 +196,23 @@ def cleanup_indexing_jobs(
)
for index_attempt in in_progress_indexing_attempts:
if index_attempt.id in existing_jobs:
# check to see if the job has been updated in the 3 hours, if not
# check to see if the job has been updated in last hour, if not
# assume it to frozen in some bad state and just mark it as failed. Note: this relies
# on the fact that the `time_updated` field is constantly updated every
# batch of documents indexed
current_db_time = get_db_current_time(db_session=db_session)
time_since_update = current_db_time - index_attempt.time_updated
if time_since_update.total_seconds() > 60 * 60:
existing_jobs[index_attempt.id].cancel()
mark_run_failed(
_mark_run_failed(
db_session=db_session,
index_attempt=index_attempt,
failure_reason="Indexing run frozen - no updates in an hour. "
"The run will be re-attempted at next scheduled indexing time.",
)
else:
# If job isn't known, simply mark it as failed
mark_run_failed(
_mark_run_failed(
db_session=db_session,
index_attempt=index_attempt,
failure_reason=_UNEXPECTED_STATE_FAILURE_REASON,
Expand Down Expand Up @@ -261,6 +286,8 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non
silence_logs=logging.ERROR,
)
client = Client(cluster)
if LOG_LEVEL.lower() == "debug":
client.register_worker_plugin(ResourceLogger())

existing_jobs: dict[int, Future | SimpleJob] = {}
engine = get_sqlalchemy_engine()
Expand All @@ -274,6 +301,10 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non
start = time.time()
start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"Running update, current UTC time: {start_time_utc}")
logger.debug(
"Found existing indexing jobs: "
f"{[(attempt_id, job.status) for attempt_id, job in existing_jobs.items()]}"
)
try:
with Session(engine, expire_on_commit=False) as db_session:
existing_jobs = cleanup_indexing_jobs(
Expand Down
1 change: 1 addition & 0 deletions backend/requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ docx2txt==0.8
openai==0.27.6
oauthlib==3.2.2
playwright==1.37.0
psutil==5.9.5
psycopg2==2.9.6
psycopg2-binary==2.9.6
pycryptodome==3.19.0
Expand Down
1 change: 1 addition & 0 deletions backend/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ types-beautifulsoup4==4.12.0.3
types-html5lib==1.1.11.13
types-oauthlib==3.2.0.9
types-setuptools==68.0.0.3
types-psutil==5.9.5.17
types-psycopg2==2.9.21.10
types-python-dateutil==2.8.19.13
types-regex==2023.3.23.1
Expand Down

1 comment on commit b258ec1

@vercel
Copy link

@vercel vercel bot commented on b258ec1 Nov 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.