diff --git a/backend/danswer/background/indexing/dask_utils.py b/backend/danswer/background/indexing/dask_utils.py new file mode 100644 index 00000000000..aae91e24db5 --- /dev/null +++ b/backend/danswer/background/indexing/dask_utils.py @@ -0,0 +1,30 @@ +import time + +import psutil +from dask.distributed import WorkerPlugin +from distributed import Worker + +from danswer.utils.logger import setup_logger + +logger = setup_logger() + + +class ResourceLogger(WorkerPlugin): + def __init__(self, log_interval: int = 60 * 5): + self.log_interval = log_interval + + def setup(self, worker: Worker) -> None: + """This method will be called when the plugin is attached to a worker.""" + self.worker = worker + worker.loop.add_callback(self.log_resources) + + def log_resources(self) -> None: + """Periodically log CPU and memory usage.""" + while True: + cpu_percent = psutil.cpu_percent(interval=None) + memory_available_gb = psutil.virtual_memory().available / (1024.0**3) + # You can now log these values or send them to a monitoring service + logger.debug( + f"Worker {self.worker.address}: CPU usage {cpu_percent}%, Memory available {memory_available_gb}GB" + ) + time.sleep(self.log_interval) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index bbb6fee3835..941fe4f8387 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -9,10 +9,12 @@ from distributed import LocalCluster from sqlalchemy.orm import Session +from danswer.background.indexing.dask_utils import ResourceLogger from danswer.background.indexing.job_client import SimpleJob from danswer.background.indexing.job_client import SimpleJobClient from danswer.background.indexing.run_indexing import run_indexing_entrypoint from danswer.configs.app_configs import EXPERIMENTAL_SIMPLE_JOB_CLIENT_ENABLED +from danswer.configs.app_configs import LOG_LEVEL from danswer.configs.app_configs import MODEL_SERVER_HOST from danswer.configs.app_configs import NUM_INDEXING_WORKERS from danswer.configs.model_configs import MIN_THREADS_ML_MODELS @@ -44,6 +46,9 @@ ) +"""Util funcs""" + + def _get_num_threads() -> int: """Get # of "threads" to use for ML models in an indexing job. By default uses the torch implementation, which returns the # of physical cores on the machine. @@ -51,19 +56,34 @@ def _get_num_threads() -> int: return max(MIN_THREADS_ML_MODELS, torch.get_num_threads()) -def should_create_new_indexing( +def _should_create_new_indexing( connector: Connector, last_index: IndexAttempt | None, db_session: Session ) -> bool: if connector.refresh_freq is None: return False if not last_index: return True + + # only one scheduled job per connector at a time + if last_index.status == IndexingStatus.NOT_STARTED: + return False + current_db_time = get_db_current_time(db_session) time_since_index = current_db_time - last_index.time_updated return time_since_index.total_seconds() >= connector.refresh_freq -def mark_run_failed( +def _is_indexing_job_marked_as_finished(index_attempt: IndexAttempt | None) -> bool: + if index_attempt is None: + return False + + return ( + index_attempt.status == IndexingStatus.FAILED + or index_attempt.status == IndexingStatus.SUCCESS + ) + + +def _mark_run_failed( db_session: Session, index_attempt: IndexAttempt, failure_reason: str ) -> None: """Marks the `index_attempt` row as failed + updates the ` @@ -89,6 +109,9 @@ def mark_run_failed( ) +"""Main funcs""" + + def create_indexing_jobs( db_session: Session, existing_jobs: dict[int, Future | SimpleJob] ) -> None: @@ -118,7 +141,7 @@ def create_indexing_jobs( continue last_attempt = get_last_attempt(connector.id, credential.id, db_session) - if not should_create_new_indexing(connector, last_attempt, db_session): + if not _should_create_new_indexing(connector, last_attempt, db_session): continue create_index_attempt(connector.id, credential.id, db_session) @@ -137,8 +160,12 @@ def cleanup_indexing_jobs( # clean up completed jobs for attempt_id, job in existing_jobs.items(): - # do nothing for ongoing jobs - if not job.done(): + index_attempt = get_index_attempt( + db_session=db_session, index_attempt_id=attempt_id + ) + + # do nothing for ongoing jobs that haven't been stopped + if not job.done() and not _is_indexing_job_marked_as_finished(index_attempt): continue if job.status == "error": @@ -146,9 +173,7 @@ def cleanup_indexing_jobs( job.release() del existing_jobs_copy[attempt_id] - index_attempt = get_index_attempt( - db_session=db_session, index_attempt_id=attempt_id - ) + if not index_attempt: logger.error( f"Unable to find IndexAttempt for ID '{attempt_id}' when cleaning " @@ -157,7 +182,7 @@ def cleanup_indexing_jobs( continue if index_attempt.status == IndexingStatus.IN_PROGRESS or job.status == "error": - mark_run_failed( + _mark_run_failed( db_session=db_session, index_attempt=index_attempt, failure_reason=_UNEXPECTED_STATE_FAILURE_REASON, @@ -171,7 +196,7 @@ def cleanup_indexing_jobs( ) for index_attempt in in_progress_indexing_attempts: if index_attempt.id in existing_jobs: - # check to see if the job has been updated in the 3 hours, if not + # check to see if the job has been updated in last hour, if not # assume it to frozen in some bad state and just mark it as failed. Note: this relies # on the fact that the `time_updated` field is constantly updated every # batch of documents indexed @@ -179,7 +204,7 @@ def cleanup_indexing_jobs( time_since_update = current_db_time - index_attempt.time_updated if time_since_update.total_seconds() > 60 * 60: existing_jobs[index_attempt.id].cancel() - mark_run_failed( + _mark_run_failed( db_session=db_session, index_attempt=index_attempt, failure_reason="Indexing run frozen - no updates in an hour. " @@ -187,7 +212,7 @@ def cleanup_indexing_jobs( ) else: # If job isn't known, simply mark it as failed - mark_run_failed( + _mark_run_failed( db_session=db_session, index_attempt=index_attempt, failure_reason=_UNEXPECTED_STATE_FAILURE_REASON, @@ -261,6 +286,8 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non silence_logs=logging.ERROR, ) client = Client(cluster) + if LOG_LEVEL.lower() == "debug": + client.register_worker_plugin(ResourceLogger()) existing_jobs: dict[int, Future | SimpleJob] = {} engine = get_sqlalchemy_engine() @@ -274,6 +301,10 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non start = time.time() start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") logger.info(f"Running update, current UTC time: {start_time_utc}") + logger.debug( + "Found existing indexing jobs: " + f"{[(attempt_id, job.status) for attempt_id, job in existing_jobs.items()]}" + ) try: with Session(engine, expire_on_commit=False) as db_session: existing_jobs = cleanup_indexing_jobs( diff --git a/backend/requirements/default.txt b/backend/requirements/default.txt index 9c7be4ce64e..96e8b06b5ca 100644 --- a/backend/requirements/default.txt +++ b/backend/requirements/default.txt @@ -30,6 +30,7 @@ docx2txt==0.8 openai==0.27.6 oauthlib==3.2.2 playwright==1.37.0 +psutil==5.9.5 psycopg2==2.9.6 psycopg2-binary==2.9.6 pycryptodome==3.19.0 diff --git a/backend/requirements/dev.txt b/backend/requirements/dev.txt index 113a7d47736..f0a338dedeb 100644 --- a/backend/requirements/dev.txt +++ b/backend/requirements/dev.txt @@ -10,6 +10,7 @@ types-beautifulsoup4==4.12.0.3 types-html5lib==1.1.11.13 types-oauthlib==3.2.0.9 types-setuptools==68.0.0.3 +types-psutil==5.9.5.17 types-psycopg2==2.9.21.10 types-python-dateutil==2.8.19.13 types-regex==2023.3.23.1