From c0f8feea745a4a97f44898a68effccb2b99834df Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 6 Aug 2024 09:49:26 +0200 Subject: [PATCH] feat: multicluster (#56) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Putative fix for issue #53 --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Johannes Köster --- docs/further.md | 56 +++++++++++++++++++++ snakemake_executor_plugin_slurm/__init__.py | 29 ++++++++--- 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/docs/further.md b/docs/further.md index dd763ab..4f10561 100644 --- a/docs/further.md +++ b/docs/further.md @@ -119,6 +119,7 @@ You can use the following specifications: | `--ntasks` | `tasks` | number of concurrent tasks / ranks | | `--cpus-per-task` | `cpus_per_task` | number of cpus per task (in case of SMP, rather use `threads`) | | `--nodes` | `nodes` | number of nodes | +| `--clusters` | `cluster` | comma separated string of clusters | Each of these can be part of a rule, e.g.: @@ -159,6 +160,10 @@ set-resources: cpus_per_task: 40 ``` +## Multicluster Support + +For reasons of scheduling multicluster support is provided by the `clusters` flag in resources sections. Note, that you have to write `clusters`, not `cluster`! + ## Additional Custom Job Configuration SLURM installations can support custom plugins, which may add support @@ -323,6 +328,57 @@ Some environments provide a shell within a SLURM job, for instance, IDEs started If the plugin detects to be running within a job, it will therefore issue a warning and stop for 5 seconds. +## Retries - Or Trying again when a Job failed + +Some cluster jobs may fail. In this case Snakemake can be instructed to try another submit before the entire workflow fails, in this example up to 3 times: + +```console +snakemake --retries=3 +``` + +If a workflow fails entirely (e.g. when there are cluster failures), it can be resumed as any other Snakemake workflow: + +```console +snakemake --rerun-incomplete +``` + +To prevent failures due to faulty parameterization, we can dynamically adjust the runtime behaviour: + +## Dynamic Parameterization + +Using dynamic parameterization we can react on different different inputs and prevent our HPC jobs from failing. + +### Adjusting Memory Requirements + +Input size of files may vary. [If we have an estimate for the RAM requirement due to varying input file sizes, we can use this to dynamically adjust our jobs.](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#dynamic-resources) + +### Adjusting Runtime + +Runtime adjustments can be made in a Snakefile: + +```Python +def get_time(wildcards, attempt): + return f"{1 * attempt}h" + +rule foo: + input: ... + output: ... + resources: + runtime=get_time + ... +``` + +or in a workflow profile + +```YAML +set-resources: + foo: + runtime: f"{1 * attempt}h" +``` + +Be sure to use sensible settings for your cluster and make use of parallel execution (e.g. threads) and [global profiles](#using-profiles) to avoid I/O contention. + + ## Summary: When put together, a frequent command line looks like: diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 2448365..703a071 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -7,6 +7,7 @@ from io import StringIO import os import re +import shlex import subprocess import time from dataclasses import dataclass, field @@ -136,6 +137,9 @@ def run_job(self, job: JobExecutorInterface): call += self.get_account_arg(job) call += self.get_partition_arg(job) + if job.resources.get("clusters"): + call += f" --clusters {job.resources.clusters}" + if job.resources.get("runtime"): call += f" -t {job.resources.runtime}" else: @@ -200,7 +204,11 @@ def run_job(self, job: JobExecutorInterface): f"SLURM job submission failed. The error message was {e.output}" ) - slurm_jobid = out.split(" ")[-1] + # multicluster submissions yield submission infos like + # "Submitted batch job on cluster ". + # To extract the job id in this case we need to match any number + # in between a string - which might change in future versions of SLURM. + slurm_jobid = re.search(r"\d+", out).group() slurm_logfile = slurm_logfile.replace("%j", slurm_jobid) self.logger.info( f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} " @@ -264,15 +272,22 @@ async def check_active_jobs( # in line 218 - once v20.11 is definitively not in use any more, # the more readable version ought to be re-adapted + # -X: only show main job, no substeps + sacct_command = f"""sacct -X --parsable2 \ + --clusters all \ + --noheader --format=JobIdRaw,State \ + --starttime {sacct_starttime} \ + --endtime now --name {self.run_uuid}""" + + # for better redability in verbose output + sacct_command = " ".join(shlex.split(sacct_command)) + # this code is inspired by the snakemake profile: # /~https://github.com/Snakemake-Profiles/slurm for i in range(status_attempts): async with self.status_rate_limiter: (status_of_jobs, sacct_query_duration) = await self.job_stati( - # -X: only show main job, no substeps - f"sacct -X --parsable2 --noheader --format=JobIdRaw,State " - f"--starttime {sacct_starttime} " - f"--endtime now --name {self.run_uuid}" + sacct_command ) if status_of_jobs is None and sacct_query_duration is None: self.logger.debug(f"could not check status of job {self.run_uuid}") @@ -364,8 +379,10 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): # about 30 sec, but can be longer in extreme cases. # Under 'normal' circumstances, 'scancel' is executed in # virtually no time. + scancel_command = f"scancel {jobids} --clusters=all" + subprocess.check_output( - f"scancel {jobids}", + scancel_command, text=True, shell=True, timeout=60,