From fc3665529c96265339fd78c8ae8db604d8c9b246 Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Mon, 24 Jun 2024 07:43:37 -0700 Subject: [PATCH 01/13] rough pass --- .../idseq_utils/batch_run_helpers.py | 52 +++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 0aa081c1..6057ab4b 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -1,3 +1,4 @@ +import hashlib import json import logging import os @@ -9,7 +10,7 @@ from os import listdir from multiprocessing import Pool from subprocess import run -from typing import Dict, List +from typing import Dict, List, Optional from urllib.parse import urlparse from idseq_utils.diamond_scatter import blastx_join @@ -83,25 +84,51 @@ def _get_job_status(job_id, use_batch_api=False): raise e +def _hash_dict(d: dict) -> str: + hash = hashlib.sha256() + hash.update(json.dumps(d, sort_keys=True).encode()) + return hash.hexdigest() + + +def _get_cached_job_id(bucket: str, prefix: str, d: dict) -> Optional[str]: + try: + _s3_client.get_object(Bucket=bucket, Key=f"{prefix}/{_hash_dict(d)}")["Body"].read() + except ClientError as e: + if e.response["Error"]["Code"] == "NoSuchKey": + return None + else: + raise e + + +def _put_cached_job_id(bucket: str, prefix: str, d: dict, job_id: str): + _s3_client.put_object(Bucket=bucket, Key=f"{prefix}/{_hash_dict(d)}", Body=job_id.encode()) + + def _run_batch_job( job_name: str, job_queue: str, job_definition: str, environment: Dict[str, str], retries: int, + cache_bucket: str, + cache_prefix: str, ): - response = _batch_client.submit_job( - jobName=job_name, - jobQueue=job_queue, - jobDefinition=job_definition, - containerOverrides={ + submit_args = { + "jobName": job_name, + "jobQueue": job_queue, + "jobDefinition": job_definition, + "containerOverrides": { "environment": [{"name": k, "value": v} for k, v in environment.items()], "memory": 130816, "vcpus": 24, }, - retryStrategy={"attempts": retries}, - ) - job_id = response["jobId"] + "retryStrategy": {"attempts": retries}, + } + job_id = _get_cached_job_id(cache_bucket, cache_prefix, submit_args) + if not job_id: + response = _batch_client.submit_job(**submit_args) + job_id = response["jobId"] + _put_cached_job_id(cache_bucket, cache_prefix, submit_args, job_id) def _log_status(status: str): level = logging.INFO if status != "FAILED" else logging.ERROR @@ -198,6 +225,9 @@ def _job_queue(provisioning_model: str): wdl_workflow_uri = f"s3://idseq-workflows/{aligner}-{aligner_wdl_version}/{aligner}.wdl" + cache_prefix_uri = os.path.join(chunk_dir, "batch_job_cache/") + cache_bucket, cache_prefix = _bucket_and_key(cache_prefix_uri) + # if this job fails we don't want to re-run chunks that have already been processed # the presence of the output file means the chunk has already been processed try: @@ -231,6 +261,8 @@ def _job_queue(provisioning_model: str): job_definition=job_definition, environment=environment, retries=2, + cache_bucket=cache_bucket, + cache_prefix=cache_prefix, ) except BatchJobFailed: _run_batch_job( @@ -239,6 +271,8 @@ def _job_queue(provisioning_model: str): job_definition=job_definition, environment=environment, retries=1, + cache_bucket=cache_bucket, + cache_prefix=cache_prefix, ) From 76dc1ca54e3676fc46bcffd5cb357c75f4000de8 Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Mon, 24 Jun 2024 09:09:23 -0700 Subject: [PATCH 02/13] added inputs --- .../idseq_utils/batch_run_helpers.py | 72 +++++++++---------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 6057ab4b..752b4c1a 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -84,24 +84,36 @@ def _get_job_status(job_id, use_batch_api=False): raise e -def _hash_dict(d: dict) -> str: - hash = hashlib.sha256() - hash.update(json.dumps(d, sort_keys=True).encode()) - return hash.hexdigest() - - -def _get_cached_job_id(bucket: str, prefix: str, d: dict) -> Optional[str]: - try: - _s3_client.get_object(Bucket=bucket, Key=f"{prefix}/{_hash_dict(d)}")["Body"].read() - except ClientError as e: - if e.response["Error"]["Code"] == "NoSuchKey": - return None - else: - raise e - +class BatchJobCache: + """ + BatchJobCache saves job IDs so the coordinator can re-attach to running batch jobs when the coordinator fails + + The output should always be the same if the inputs are the same, however we also incorporate the batch_args + into the cache because a retry on spot vs on demand will result in a different batch queue. + """ + def __init__(self, bucket: str, prefix: str, inputs: dict[str, str]): + self.bucket = bucket + self.prefix = prefix + self.inputs = inputs + + + def _key(self, batch_args: dict) -> str: + hash = hashlib.sha256() + cache_dict = { "inputs": self.inputs, "batch_args": batch_args } + hash.update(json.dumps(cache_dict, sort_keys=True).encode()) + return os.path.join(self.prefix, hash.hexdigest()) + + def get(self, batch_args: dict) -> Optional[str]: + try: + _s3_client.get_object(Bucket=self.bucket, Key=self._key(batch_args))["Body"].read() + except ClientError as e: + if e.response["Error"]["Code"] == "NoSuchKey": + return None + else: + raise e -def _put_cached_job_id(bucket: str, prefix: str, d: dict, job_id: str): - _s3_client.put_object(Bucket=bucket, Key=f"{prefix}/{_hash_dict(d)}", Body=job_id.encode()) + def put(self, batch_args: dict, job_id: str): + _s3_client.put_object(Bucket=self.bucket, Key=self._key(batch_args), Body=job_id.encode()) def _run_batch_job( @@ -110,8 +122,7 @@ def _run_batch_job( job_definition: str, environment: Dict[str, str], retries: int, - cache_bucket: str, - cache_prefix: str, + cache: BatchJobCache, ): submit_args = { "jobName": job_name, @@ -124,11 +135,11 @@ def _run_batch_job( }, "retryStrategy": {"attempts": retries}, } - job_id = _get_cached_job_id(cache_bucket, cache_prefix, submit_args) + job_id = cache.get(submit_args) if not job_id: response = _batch_client.submit_job(**submit_args) job_id = response["jobId"] - _put_cached_job_id(cache_bucket, cache_prefix, submit_args, job_id) + cache.put(submit_args, job_id) def _log_status(status: str): level = logging.INFO if status != "FAILED" else logging.ERROR @@ -221,23 +232,12 @@ def _job_queue(provisioning_model: str): input_bucket, input_key = _bucket_and_key(wdl_input_uri) wdl_output_uri = os.path.join(chunk_dir, f"{chunk_id}-output.json") - output_bucket, output_key = _bucket_and_key(wdl_output_uri) wdl_workflow_uri = f"s3://idseq-workflows/{aligner}-{aligner_wdl_version}/{aligner}.wdl" cache_prefix_uri = os.path.join(chunk_dir, "batch_job_cache/") cache_bucket, cache_prefix = _bucket_and_key(cache_prefix_uri) - - # if this job fails we don't want to re-run chunks that have already been processed - # the presence of the output file means the chunk has already been processed - try: - _s3_client.head_object(Bucket=output_bucket, Key=output_key) - log.info(f"skipping chunk, output already exists: {wdl_output_uri}") - return - except ClientError as e: - # raise the error if it is anything other than "not found" - if e.response["Error"]["Code"] != "404": - raise e + cache = BatchJobCache(cache_bucket, cache_prefix, inputs) _s3_client.put_object( Bucket=input_bucket, @@ -261,8 +261,7 @@ def _job_queue(provisioning_model: str): job_definition=job_definition, environment=environment, retries=2, - cache_bucket=cache_bucket, - cache_prefix=cache_prefix, + cache=cache, ) except BatchJobFailed: _run_batch_job( @@ -271,8 +270,7 @@ def _job_queue(provisioning_model: str): job_definition=job_definition, environment=environment, retries=1, - cache_bucket=cache_bucket, - cache_prefix=cache_prefix, + cache=cache, ) From 99215160cc81a26355e4a3ffab5654f4ecfc9d46 Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Mon, 24 Jun 2024 09:20:00 -0700 Subject: [PATCH 03/13] tagging and lint --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 752b4c1a..2cb56f13 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -96,16 +96,16 @@ def __init__(self, bucket: str, prefix: str, inputs: dict[str, str]): self.prefix = prefix self.inputs = inputs - def _key(self, batch_args: dict) -> str: hash = hashlib.sha256() - cache_dict = { "inputs": self.inputs, "batch_args": batch_args } + cache_dict = {"inputs": self.inputs, "batch_args": batch_args} hash.update(json.dumps(cache_dict, sort_keys=True).encode()) return os.path.join(self.prefix, hash.hexdigest()) def get(self, batch_args: dict) -> Optional[str]: try: - _s3_client.get_object(Bucket=self.bucket, Key=self._key(batch_args))["Body"].read() + resp = _s3_client.get_object(Bucket=self.bucket, Key=self._key(batch_args)) + resp["Body"].read().decode() except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": return None @@ -113,7 +113,12 @@ def get(self, batch_args: dict) -> Optional[str]: raise e def put(self, batch_args: dict, job_id: str): - _s3_client.put_object(Bucket=self.bucket, Key=self._key(batch_args), Body=job_id.encode()) + _s3_client.put_object( + Bucket=self.bucket, + Key=self._key(batch_args), + Body=job_id.encode(), + Tagging="AlignmentCoordination=True", + ) def _run_batch_job( @@ -244,6 +249,7 @@ def _job_queue(provisioning_model: str): Key=input_key, Body=json.dumps(inputs).encode(), ContentType="application/json", + Tagging="AlignmentCoordination=True", ) environment = { From 06008f0f40c32b33af48076a2951208c954a1db5 Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Mon, 24 Jun 2024 09:25:40 -0700 Subject: [PATCH 04/13] complete tags --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 2cb56f13..49ff63d1 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -319,9 +319,16 @@ def run_alignment( run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True) if os.path.exists(os.path.join("chunks", "cache")): shutil.rmtree(os.path.join("chunks", "cache")) + if os.path.exists(os.path.join("chunks", "batch_job_cache")): + shutil.rmtree(os.path.join("chunks", "batch_job_cache")) for fn in listdir("chunks"): if fn.endswith("json"): os.remove(os.path.join("chunks", fn)) + _s3_client.put_object_tagging( + Bucket=bucket, + Key=os.path.join(chunk_dir, fn), + Tagging={"TagSet": [{"Key": "AlignmentCoordination", "Value": "True"}]}, + ) if aligner == "diamond": blastx_join("chunks", result_path, aligner_args, *queries) else: From 20e5630f0b920730a60bbc19eecb2075bca939db Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Mon, 1 Jul 2024 10:14:39 -0700 Subject: [PATCH 05/13] python version issues --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 49ff63d1..2e0ba359 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -91,18 +91,18 @@ class BatchJobCache: The output should always be the same if the inputs are the same, however we also incorporate the batch_args into the cache because a retry on spot vs on demand will result in a different batch queue. """ - def __init__(self, bucket: str, prefix: str, inputs: dict[str, str]): + def __init__(self, bucket: str, prefix: str, inputs: Dict[str, str]): self.bucket = bucket self.prefix = prefix self.inputs = inputs - def _key(self, batch_args: dict) -> str: + def _key(self, batch_args: Dict) -> str: hash = hashlib.sha256() cache_dict = {"inputs": self.inputs, "batch_args": batch_args} hash.update(json.dumps(cache_dict, sort_keys=True).encode()) return os.path.join(self.prefix, hash.hexdigest()) - def get(self, batch_args: dict) -> Optional[str]: + def get(self, batch_args: Dict) -> Optional[str]: try: resp = _s3_client.get_object(Bucket=self.bucket, Key=self._key(batch_args)) resp["Body"].read().decode() @@ -112,7 +112,7 @@ def get(self, batch_args: dict) -> Optional[str]: else: raise e - def put(self, batch_args: dict, job_id: str): + def put(self, batch_args: Dict, job_id: str): _s3_client.put_object( Bucket=self.bucket, Key=self._key(batch_args), From 5c806532da45793eb1f741a2ac8a65f884205f1b Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Mon, 1 Jul 2024 10:57:04 -0700 Subject: [PATCH 06/13] exp --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 2e0ba359..9f46413d 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -21,6 +21,7 @@ from botocore.config import Config log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) MAX_CHUNKS_IN_FLIGHT = 30 # TODO: remove this constant, currently does nothing since we have at most 30 index chunks @@ -324,6 +325,7 @@ def run_alignment( for fn in listdir("chunks"): if fn.endswith("json"): os.remove(os.path.join("chunks", fn)) + log.debug(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})") _s3_client.put_object_tagging( Bucket=bucket, Key=os.path.join(chunk_dir, fn), From c08cedd2d87f0c24a72a30c080b881c620391687 Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Fri, 5 Jul 2024 11:54:41 -0700 Subject: [PATCH 07/13] exp --- .../idseq_utils/batch_run_helpers.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 9f46413d..13f2ce47 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -7,6 +7,7 @@ import requests import shutil import time +import sys from os import listdir from multiprocessing import Pool from subprocess import run @@ -20,9 +21,6 @@ from botocore.exceptions import ClientError from botocore.config import Config -log = logging.getLogger(__name__) -log.setLevel(logging.DEBUG) - MAX_CHUNKS_IN_FLIGHT = 30 # TODO: remove this constant, currently does nothing since we have at most 30 index chunks # mitigation for TooManyRequestExceptions @@ -65,7 +63,7 @@ def _get_job_status(job_id, use_batch_api=False): if use_batch_api: jobs = _batch_client.describe_jobs(jobs=[job_id])["jobs"] if not jobs: - log.debug(f"missing_job_description_from_api: {job_id}") + print(f"missing_job_description_from_api: {job_id}", file=sys.stderr) return "SUBMITTED" return jobs[0]["status"] batch_job_desc_bucket = boto3.resource("s3").Bucket( @@ -78,7 +76,7 @@ def _get_job_status(job_id, use_batch_api=False): except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": # Warn that the object is missing so any issue with the s3 mechanism can be identified - log.debug(f"missing_job_description_object key: {key}") + print(f"missing_job_description_object key: {key}", file=sys.stderr) # Return submitted because a missing job status probably means it hasn't been added yet return "SUBMITTED" else: @@ -148,9 +146,7 @@ def _run_batch_job( cache.put(submit_args, job_id) def _log_status(status: str): - level = logging.INFO if status != "FAILED" else logging.ERROR - log.log( - level, + print( "batch_job_status " + json.dumps( { "job_id": job_id, @@ -161,6 +157,7 @@ def _log_status(status: str): "environment": environment, } ), + file=sys.stderr, ) _log_status("SUBMITTED") @@ -177,11 +174,12 @@ def _log_status(status: str): except ClientError as e: # If we get throttled, randomly wait to de-synchronize the requests if e.response["Error"]["Code"] == "TooManyRequestsException": - log.warn(f"describe_jobs_rate_limit_error for job_id: {job_id}") + print(f"describe_jobs_rate_limit_error for job_id: {job_id}", file=sys.stderr) # Possibly implement a backoff here if throttling becomes an issue else: - log.error( + print( f"unexpected_client_error_while_polling_job_status for job_id: {job_id}", + file=sys.stderr, ) raise e @@ -284,7 +282,7 @@ def _job_queue(provisioning_model: str): def _db_chunks(bucket: str, prefix): s3_client = boto3.client("s3") paginator = s3_client.get_paginator("list_objects_v2") - log.debug("db chunks") + print("db chunks", file=sys.stderr) for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page["Contents"]: @@ -325,7 +323,7 @@ def run_alignment( for fn in listdir("chunks"): if fn.endswith("json"): os.remove(os.path.join("chunks", fn)) - log.debug(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})") + print(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})", file=sys.stderr) _s3_client.put_object_tagging( Bucket=bucket, Key=os.path.join(chunk_dir, fn), From ca5b912909b0df65c570b010da79d4d9eee479ec Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Fri, 5 Jul 2024 11:57:42 -0700 Subject: [PATCH 08/13] Revert "exp" This reverts commit c08cedd2d87f0c24a72a30c080b881c620391687. --- .../idseq_utils/batch_run_helpers.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 13f2ce47..9f46413d 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -7,7 +7,6 @@ import requests import shutil import time -import sys from os import listdir from multiprocessing import Pool from subprocess import run @@ -21,6 +20,9 @@ from botocore.exceptions import ClientError from botocore.config import Config +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) + MAX_CHUNKS_IN_FLIGHT = 30 # TODO: remove this constant, currently does nothing since we have at most 30 index chunks # mitigation for TooManyRequestExceptions @@ -63,7 +65,7 @@ def _get_job_status(job_id, use_batch_api=False): if use_batch_api: jobs = _batch_client.describe_jobs(jobs=[job_id])["jobs"] if not jobs: - print(f"missing_job_description_from_api: {job_id}", file=sys.stderr) + log.debug(f"missing_job_description_from_api: {job_id}") return "SUBMITTED" return jobs[0]["status"] batch_job_desc_bucket = boto3.resource("s3").Bucket( @@ -76,7 +78,7 @@ def _get_job_status(job_id, use_batch_api=False): except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": # Warn that the object is missing so any issue with the s3 mechanism can be identified - print(f"missing_job_description_object key: {key}", file=sys.stderr) + log.debug(f"missing_job_description_object key: {key}") # Return submitted because a missing job status probably means it hasn't been added yet return "SUBMITTED" else: @@ -146,7 +148,9 @@ def _run_batch_job( cache.put(submit_args, job_id) def _log_status(status: str): - print( + level = logging.INFO if status != "FAILED" else logging.ERROR + log.log( + level, "batch_job_status " + json.dumps( { "job_id": job_id, @@ -157,7 +161,6 @@ def _log_status(status: str): "environment": environment, } ), - file=sys.stderr, ) _log_status("SUBMITTED") @@ -174,12 +177,11 @@ def _log_status(status: str): except ClientError as e: # If we get throttled, randomly wait to de-synchronize the requests if e.response["Error"]["Code"] == "TooManyRequestsException": - print(f"describe_jobs_rate_limit_error for job_id: {job_id}", file=sys.stderr) + log.warn(f"describe_jobs_rate_limit_error for job_id: {job_id}") # Possibly implement a backoff here if throttling becomes an issue else: - print( + log.error( f"unexpected_client_error_while_polling_job_status for job_id: {job_id}", - file=sys.stderr, ) raise e @@ -282,7 +284,7 @@ def _job_queue(provisioning_model: str): def _db_chunks(bucket: str, prefix): s3_client = boto3.client("s3") paginator = s3_client.get_paginator("list_objects_v2") - print("db chunks", file=sys.stderr) + log.debug("db chunks") for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page["Contents"]: @@ -323,7 +325,7 @@ def run_alignment( for fn in listdir("chunks"): if fn.endswith("json"): os.remove(os.path.join("chunks", fn)) - print(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})", file=sys.stderr) + log.debug(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})") _s3_client.put_object_tagging( Bucket=bucket, Key=os.path.join(chunk_dir, fn), From 7cc5e32ae279e9b435942a37c07554d79744814a Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Fri, 5 Jul 2024 12:01:10 -0700 Subject: [PATCH 09/13] log config --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 9f46413d..44c2d917 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -20,8 +20,12 @@ from botocore.exceptions import ClientError from botocore.config import Config +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', +) log = logging.getLogger(__name__) -log.setLevel(logging.DEBUG) MAX_CHUNKS_IN_FLIGHT = 30 # TODO: remove this constant, currently does nothing since we have at most 30 index chunks From 248af3ebf841609dffa9cc2d3755717dbca68b77 Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Fri, 5 Jul 2024 12:41:45 -0700 Subject: [PATCH 10/13] fixes --- .../idseq_utils/batch_run_helpers.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index 44c2d917..b6052396 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -21,7 +21,7 @@ from botocore.config import Config logging.basicConfig( - level=logging.DEBUG, + level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) @@ -145,11 +145,6 @@ def _run_batch_job( }, "retryStrategy": {"attempts": retries}, } - job_id = cache.get(submit_args) - if not job_id: - response = _batch_client.submit_job(**submit_args) - job_id = response["jobId"] - cache.put(submit_args, job_id) def _log_status(status: str): level = logging.INFO if status != "FAILED" else logging.ERROR @@ -167,7 +162,14 @@ def _log_status(status: str): ), ) - _log_status("SUBMITTED") + job_id = cache.get(submit_args) + if job_id: + log.info(f"reattach to batch job: {job_id}") + else: + response = _batch_client.submit_job(**submit_args) + job_id = response["jobId"] + cache.put(submit_args, job_id) + _log_status("SUBMITTED") delay = 60 + random.randint( -60 // 2, 60 // 2 @@ -306,6 +308,7 @@ def run_alignment( ): bucket, prefix = _bucket_and_key(db_path) chunk_dir = os.path.join(input_dir, f"{aligner}-chunks") + _, chunk_prefix = _bucket_and_key(chunk_dir) chunks = ( [ input_dir, @@ -329,10 +332,9 @@ def run_alignment( for fn in listdir("chunks"): if fn.endswith("json"): os.remove(os.path.join("chunks", fn)) - log.debug(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})") _s3_client.put_object_tagging( Bucket=bucket, - Key=os.path.join(chunk_dir, fn), + Key=os.path.join(chunk_prefix, fn), Tagging={"TagSet": [{"Key": "AlignmentCoordination", "Value": "True"}]}, ) if aligner == "diamond": From 5410ba5e92bf13fa6e5b2b7e22f24579d4ce47bd Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Fri, 5 Jul 2024 13:03:16 -0700 Subject: [PATCH 11/13] exp --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index b6052396..f045f70a 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -101,15 +101,19 @@ def __init__(self, bucket: str, prefix: str, inputs: Dict[str, str]): self.prefix = prefix self.inputs = inputs + def _cache_value(self, batch_args: Dict) -> str: + return json.dumps({"inputs": self.inputs, "batch_args": batch_args}, sort_keys=True) + def _key(self, batch_args: Dict) -> str: hash = hashlib.sha256() - cache_dict = {"inputs": self.inputs, "batch_args": batch_args} - hash.update(json.dumps(cache_dict, sort_keys=True).encode()) + hash.update(self._cache_value(batch_args).encode()) return os.path.join(self.prefix, hash.hexdigest()) def get(self, batch_args: Dict) -> Optional[str]: + key = self._key(batch_args) + log.info(f"cache_get ({key}): {self._cache_value(batch_args)}") try: - resp = _s3_client.get_object(Bucket=self.bucket, Key=self._key(batch_args)) + resp = _s3_client.get_object(Bucket=self.bucket, Key=key) resp["Body"].read().decode() except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": @@ -118,9 +122,11 @@ def get(self, batch_args: Dict) -> Optional[str]: raise e def put(self, batch_args: Dict, job_id: str): + key = self._key(batch_args) + log.info(f"cache_put ({key}, {job_id}): {self._cache_value(batch_args)}") _s3_client.put_object( Bucket=self.bucket, - Key=self._key(batch_args), + Key=key, Body=job_id.encode(), Tagging="AlignmentCoordination=True", ) From 903fe014c9f797b32c3b2b56a0c516888bad3a38 Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Fri, 5 Jul 2024 13:19:28 -0700 Subject: [PATCH 12/13] Revert "exp" This reverts commit 5410ba5e92bf13fa6e5b2b7e22f24579d4ce47bd. --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index f045f70a..b6052396 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -101,19 +101,15 @@ def __init__(self, bucket: str, prefix: str, inputs: Dict[str, str]): self.prefix = prefix self.inputs = inputs - def _cache_value(self, batch_args: Dict) -> str: - return json.dumps({"inputs": self.inputs, "batch_args": batch_args}, sort_keys=True) - def _key(self, batch_args: Dict) -> str: hash = hashlib.sha256() - hash.update(self._cache_value(batch_args).encode()) + cache_dict = {"inputs": self.inputs, "batch_args": batch_args} + hash.update(json.dumps(cache_dict, sort_keys=True).encode()) return os.path.join(self.prefix, hash.hexdigest()) def get(self, batch_args: Dict) -> Optional[str]: - key = self._key(batch_args) - log.info(f"cache_get ({key}): {self._cache_value(batch_args)}") try: - resp = _s3_client.get_object(Bucket=self.bucket, Key=key) + resp = _s3_client.get_object(Bucket=self.bucket, Key=self._key(batch_args)) resp["Body"].read().decode() except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": @@ -122,11 +118,9 @@ def get(self, batch_args: Dict) -> Optional[str]: raise e def put(self, batch_args: Dict, job_id: str): - key = self._key(batch_args) - log.info(f"cache_put ({key}, {job_id}): {self._cache_value(batch_args)}") _s3_client.put_object( Bucket=self.bucket, - Key=key, + Key=self._key(batch_args), Body=job_id.encode(), Tagging="AlignmentCoordination=True", ) From a204fda8fa09a550c970c8184e44855a31c86ccc Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Fri, 5 Jul 2024 13:21:25 -0700 Subject: [PATCH 13/13] fix --- lib/idseq_utils/idseq_utils/batch_run_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/idseq_utils/idseq_utils/batch_run_helpers.py b/lib/idseq_utils/idseq_utils/batch_run_helpers.py index b6052396..313f93dc 100644 --- a/lib/idseq_utils/idseq_utils/batch_run_helpers.py +++ b/lib/idseq_utils/idseq_utils/batch_run_helpers.py @@ -110,7 +110,7 @@ def _key(self, batch_args: Dict) -> str: def get(self, batch_args: Dict) -> Optional[str]: try: resp = _s3_client.get_object(Bucket=self.bucket, Key=self._key(batch_args)) - resp["Body"].read().decode() + return resp["Body"].read().decode() except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": return None