From 9623ff3c4932a1589b582f49cf8afd46ea7d46a9 Mon Sep 17 00:00:00 2001 From: don Date: Wed, 28 Feb 2018 12:12:48 -0800 Subject: [PATCH] Revert changes to scancel as part of #726. This partially reverts commit 5a73c86 --- kive/fleet/slurmlib.py | 89 +++++++++--------------------------------- 1 file changed, 19 insertions(+), 70 deletions(-) diff --git a/kive/fleet/slurmlib.py b/kive/fleet/slurmlib.py index e670a7b64..e4385c250 100755 --- a/kive/fleet/slurmlib.py +++ b/kive/fleet/slurmlib.py @@ -449,32 +449,8 @@ def submit_job(cls, @classmethod def job_cancel(cls, jobhandle): """Cancel a given job given its jobhandle. - Log a warning if an error occurs, return nothing. - NOTE: For the version of kive using a wrapper script to launch docker images, - special consideration to cancelling jobs must be made. - The way scancel works is different depending on whether the -s option is used or not - (see slurm docs for scancel). Without -s option: scancel talks to slurmctld, with -s option - scancel talks to slurmd on the compute node that the job is running on. - - For us this means that: - If the slurm job is in the running state: - issuing 'scancel -sint' is required, because the wrapper script needs this to - clean up its docker volumes. - If the slurm job is in the pending state: - issuing 'scancel -sint' will hang until the jobs starts to run, so we must issue - scancel without the -s option. - - In order to ensure that the job of interest does not change state between the time - that we determine its status and cancel it, we first issue an 'scontrol hold' command. - (a running job will stay running, a pending job will stay pending). - The steps therefore are: - 1) scontrol hold XYZ - 2) runstate := 'get state of job XYZ' - 3) if runstate == 'running': - scancel -sint -f XYZ - else: - scancel XYZ + Log a warning if an error occurs, return nothing. """ accounting_info = cls.get_accounting_info([jobhandle]) if accounting_info: @@ -482,25 +458,19 @@ def job_cancel(cls, jobhandle): if job_info and job_info['end_time']: # Already finished, nothing to cancel. return - logger.info('Holding to cancel slurm job id %s.', jobhandle.job_id) - cls._hold_jobs([jobhandle.job_id]) - cur_state = None - accounting_info = cls.get_accounting_info([jobhandle]) - if accounting_info: - job_info = accounting_info.get(jobhandle.job_id) - cur_state = None if job_info is None else job_info[cls.ACC_STATE] - if cur_state is None: - raise RuntimeError("Cannot determine job state after holding") - logger.info('Slurm job state after hold: %s.', cur_state) - cmd_lst = ["scancel"] - docker_has_launched_states = set(cls.RUNNING) - if cur_state in docker_has_launched_states: - # -sint: Send interrupt instead of kill to allow cleanup. - # -f: also send signal to child processes. - cmd_lst.extend(["-sint", "-f"]) - cmd_lst.append("{}".format(jobhandle.job_id)) - logger.info("Cancelling slurm job with '{}'".format(" ".join(cmd_lst))) - cls._call_to_outstr(cmd_lst) + + logger.info('Cancelling Slurm job id %s.', jobhandle.job_id) + cmd_lst = ["scancel", + "-f", # Also send signal to child processes. + "{}".format(jobhandle.job_id)] + try: + sp.check_output(cmd_lst, stderr=sp.STDOUT) + except sp.CalledProcessError as ex: + logger.warn('scancel failed for job id %s', + jobhandle.job_id, + exc_info=True) + for line in ex.output.splitlines(False): + logger.info(line) @classmethod def slurm_is_alive(cls, skip_extras=False): @@ -550,10 +520,12 @@ def slurm_is_alive(cls, skip_extras=False): return is_alive @classmethod - def _call_to_outstr(cls, cmd_lst, num_retry=NUM_RETRY): + def _call_to_dict(cls, cmd_lst, splitchar=None, num_retry=NUM_RETRY): """ Helper routine: - Call a slurm command provided in cmd_lst and return the output string if it - is successful. + Call a slurm command provided in cmd_lst and parse the tabular output, returning + a list of dictionaries. + The first lines of the output should be the table headings, which are used + as the dictionary keys. """ logger.debug(" ".join(cmd_lst)) out_str = '' @@ -584,20 +556,6 @@ def _call_to_outstr(cls, cmd_lst, num_retry=NUM_RETRY): os.remove(stderr_path) except OSError: pass - return out_str - - @classmethod - def _call_to_dict(cls, cmd_lst, splitchar=None, num_retry=NUM_RETRY): - """ Helper routine: - Call a slurm command provided in cmd_lst and parse the tabular output, returning - a list of dictionaries. - The first lines of the output should be the table headings, which are used - as the dictionary keys. - """ - try: - out_str = cls._call_to_outstr(cmd_lst, num_retry=num_retry) - except (OSError, sp.CalledProcessError, IOError): - raise # NOTE: sinfo et al add an empty line to the end of its output. Remove that here. lns = [ln for ln in out_str.split('\n') if ln] @@ -749,15 +707,6 @@ def _do_squeue(cls, opts=None, job_id_iter=None): num_retry = NUM_RETRY return cls._call_to_dict(cmd_lst, splitchar=' ', num_retry=num_retry) - @classmethod - def _hold_jobs(cls, job_id_iter): - """Issue an 'scontrol hold jobids' command to slurm. - Any pending job will be held in its state, any running job will be unaffected. - """ - cmd_lst = ["scontrol", "hold"] - cmd_lst.append(",".join(job_id_iter)) - cls._call_to_outstr(cmd_lst) - @classmethod def get_accounting_info(cls, job_handle_iter=None): """ NOTE: sacct only provides information about jobs that are running or have completed.