Skip to content

Commit

Permalink
Revert changes to scancel as part of #726.
Browse files Browse the repository at this point in the history
This partially reverts commit 5a73c86
  • Loading branch information
donkirkby committed Feb 28, 2018
1 parent 8bb8fa0 commit 9623ff3
Showing 1 changed file with 19 additions and 70 deletions.
89 changes: 19 additions & 70 deletions kive/fleet/slurmlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,58 +449,28 @@ def submit_job(cls,
@classmethod
def job_cancel(cls, jobhandle):
"""Cancel a given job given its jobhandle.
Log a warning if an error occurs, return nothing.
NOTE: For the version of kive using a wrapper script to launch docker images,
special consideration to cancelling jobs must be made.
The way scancel works is different depending on whether the -s option is used or not
(see slurm docs for scancel). Without -s option: scancel talks to slurmctld, with -s option
scancel talks to slurmd on the compute node that the job is running on.
For us this means that:
If the slurm job is in the running state:
issuing 'scancel -sint' is required, because the wrapper script needs this to
clean up its docker volumes.
If the slurm job is in the pending state:
issuing 'scancel -sint' will hang until the jobs starts to run, so we must issue
scancel without the -s option.
In order to ensure that the job of interest does not change state between the time
that we determine its status and cancel it, we first issue an 'scontrol hold' command.
(a running job will stay running, a pending job will stay pending).
The steps therefore are:
1) scontrol hold XYZ
2) runstate := 'get state of job XYZ'
3) if runstate == 'running':
scancel -sint -f XYZ
else:
scancel XYZ
Log a warning if an error occurs, return nothing.
"""
accounting_info = cls.get_accounting_info([jobhandle])
if accounting_info:
job_info = accounting_info.get(jobhandle.job_id)
if job_info and job_info['end_time']:
# Already finished, nothing to cancel.
return
logger.info('Holding to cancel slurm job id %s.', jobhandle.job_id)
cls._hold_jobs([jobhandle.job_id])
cur_state = None
accounting_info = cls.get_accounting_info([jobhandle])
if accounting_info:
job_info = accounting_info.get(jobhandle.job_id)
cur_state = None if job_info is None else job_info[cls.ACC_STATE]
if cur_state is None:
raise RuntimeError("Cannot determine job state after holding")
logger.info('Slurm job state after hold: %s.', cur_state)
cmd_lst = ["scancel"]
docker_has_launched_states = set(cls.RUNNING)
if cur_state in docker_has_launched_states:
# -sint: Send interrupt instead of kill to allow cleanup.
# -f: also send signal to child processes.
cmd_lst.extend(["-sint", "-f"])
cmd_lst.append("{}".format(jobhandle.job_id))
logger.info("Cancelling slurm job with '{}'".format(" ".join(cmd_lst)))
cls._call_to_outstr(cmd_lst)

logger.info('Cancelling Slurm job id %s.', jobhandle.job_id)
cmd_lst = ["scancel",
"-f", # Also send signal to child processes.
"{}".format(jobhandle.job_id)]
try:
sp.check_output(cmd_lst, stderr=sp.STDOUT)
except sp.CalledProcessError as ex:
logger.warn('scancel failed for job id %s',
jobhandle.job_id,
exc_info=True)
for line in ex.output.splitlines(False):
logger.info(line)

@classmethod
def slurm_is_alive(cls, skip_extras=False):
Expand Down Expand Up @@ -550,10 +520,12 @@ def slurm_is_alive(cls, skip_extras=False):
return is_alive

@classmethod
def _call_to_outstr(cls, cmd_lst, num_retry=NUM_RETRY):
def _call_to_dict(cls, cmd_lst, splitchar=None, num_retry=NUM_RETRY):
""" Helper routine:
Call a slurm command provided in cmd_lst and return the output string if it
is successful.
Call a slurm command provided in cmd_lst and parse the tabular output, returning
a list of dictionaries.
The first lines of the output should be the table headings, which are used
as the dictionary keys.
"""
logger.debug(" ".join(cmd_lst))
out_str = ''
Expand Down Expand Up @@ -584,20 +556,6 @@ def _call_to_outstr(cls, cmd_lst, num_retry=NUM_RETRY):
os.remove(stderr_path)
except OSError:
pass
return out_str

@classmethod
def _call_to_dict(cls, cmd_lst, splitchar=None, num_retry=NUM_RETRY):
""" Helper routine:
Call a slurm command provided in cmd_lst and parse the tabular output, returning
a list of dictionaries.
The first lines of the output should be the table headings, which are used
as the dictionary keys.
"""
try:
out_str = cls._call_to_outstr(cmd_lst, num_retry=num_retry)
except (OSError, sp.CalledProcessError, IOError):
raise

# NOTE: sinfo et al add an empty line to the end of its output. Remove that here.
lns = [ln for ln in out_str.split('\n') if ln]
Expand Down Expand Up @@ -749,15 +707,6 @@ def _do_squeue(cls, opts=None, job_id_iter=None):
num_retry = NUM_RETRY
return cls._call_to_dict(cmd_lst, splitchar=' ', num_retry=num_retry)

@classmethod
def _hold_jobs(cls, job_id_iter):
"""Issue an 'scontrol hold jobids' command to slurm.
Any pending job will be held in its state, any running job will be unaffected.
"""
cmd_lst = ["scontrol", "hold"]
cmd_lst.append(",".join(job_id_iter))
cls._call_to_outstr(cmd_lst)

@classmethod
def get_accounting_info(cls, job_handle_iter=None):
""" NOTE: sacct only provides information about jobs that are running or have completed.
Expand Down

0 comments on commit 9623ff3

Please sign in to comment.