-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
378 test queue parameter #380
base: main
Are you sure you want to change the base?
Changes from 10 commits
9ba059e
ef484f4
d8aaf0a
7d95799
88b7cf6
a110cad
b958212
64ccf90
0e3f372
71c5fb3
b53cc83
d69f568
180fd10
bd072eb
7a219d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
import os | ||
import random | ||
|
||
import logging | ||
from pathlib import Path | ||
from typing import List | ||
|
||
import pytest | ||
|
||
from psij import Job, JobSpec, JobAttributes, JobExecutor, ResourceSpecV1 | ||
from tempfile import TemporaryDirectory | ||
|
||
from executor_test_params import ExecutorTestParams | ||
from _test_tools import _get_executor_instance, _get_timeout, assert_completed, _make_test_dir | ||
|
||
|
||
SCHEDULER_COMMANDS = { | ||
"slurm": { | ||
"get_queues": "mdiag -c", | ||
"get_user_jobs": "squeue -u $(whoami)", | ||
"kill_command": "scancel" | ||
}, | ||
"lsf": { | ||
"get_queues": "bqueues -u $(whoami)", | ||
"get_user_jobs": "bjobs", | ||
"kill_command": "bkill" | ||
} | ||
} | ||
|
||
|
||
def get_slurm_queues() -> List[str]: | ||
out = os.popen("mdiag -c").read().split("\n") | ||
return [line.split("=")[-1] for line in out if "PartitionName" in line] | ||
|
||
|
||
def get_lsf_queues() -> List[str]: | ||
valid_queues = [] | ||
out = "".join(os.popen("bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'").read()).split("\n") | ||
out = [l for l in out if len(l) != 0] | ||
out = [l.split(" ") for l in out] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if len(out) == 0: | ||
return [] | ||
out.pop(0) # popping headers | ||
|
||
for queue_info in out: | ||
name = queue_info[0] | ||
njobs = int(queue_info[1]) | ||
pend = int(queue_info[2]) | ||
run = int(queue_info[3]) | ||
susp = int(queue_info[4]) | ||
status = str(queue_info[5]) | ||
|
||
if "active" not in status.lower(): | ||
continue | ||
|
||
if (njobs + pend + run + susp) > 10: | ||
valid_queues.append(name) | ||
|
||
return valid_queues | ||
|
||
|
||
def get_queue_info(executor: str) -> List[str]: | ||
res = [] | ||
command = SCHEDULER_COMMANDS[executor]["get_user_jobs"] | ||
res.extend(os.popen(command).read().split("\n")) | ||
return res | ||
|
||
|
||
def kill_job(scheduler: str, job: Job) -> None: | ||
command = f"{SCHEDULER_COMMANDS[scheduler]['kill_command']} {job._native_id}" | ||
print("Kill command:", command) | ||
os.system(command) | ||
|
||
|
||
def make_job(queue:str) -> Job: | ||
return Job( | ||
JobSpec( | ||
executable="/bin/date", | ||
attributes=JobAttributes( | ||
queue_name=queue, | ||
), | ||
) | ||
) | ||
|
||
|
||
def test_queue(execparams: ExecutorTestParams) -> None: | ||
scheduler = "" | ||
queues = [] | ||
slurm_queues = get_slurm_queues() | ||
lsf_queues = get_lsf_queues() | ||
|
||
queues.extend(slurm_queues) | ||
queues.extend(lsf_queues) | ||
Comment on lines
+91
to
+95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is kind of a general note, but we know what scheduler we have from That's not the point I wanted to make though. The idea of running all possible |
||
|
||
if len(slurm_queues) != 0: | ||
scheduler = "slurm" | ||
elif len(lsf_queues) != 0: | ||
scheduler = "lsf" | ||
Comment on lines
+97
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. So If you ignore Instead, we should run this test on only one of the launchers (the launcher doesn't matter because we don't actually care about launching a job in this test) and using all executors. So something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you take a look at my implementation? I kept the way I was detecting it, but am now only running the test when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not about detecting the LRM on the system but the fact that we test multiple executors on that system. So even if you restrict it to the single launcher, it will still be repeated for the local, batch-test, and whatever PSI/J detected to be the scheduler. You could remove execparams, but then you risk not having access to other necessary parameters that might be set by the users that set up the tests. By the way, you may want to use execparams.custom_attributes, since some systems require setting various things, like an account or project. |
||
|
||
if len(queues) < 2: | ||
pytest.raises(Exception("Need at least two queues to perform this test")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you are looking for def test_that_division_by_zero_correctly_raises_exception():
with pytest.raises(ZeroDivisionError):
1 / 0 In other words you use it to check that you test throws an exception. If you had |
||
return | ||
|
||
print("available queues:", queues) | ||
test_queues = random.sample(queues, 2) | ||
print("test queues:", test_queues) | ||
|
||
executor = JobExecutor.get_instance(scheduler) | ||
|
||
job1 = make_job(test_queues[0]) | ||
executor.submit(job1) | ||
qstat = get_queue_info(scheduler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of these commands accept a job id as an argument to only return info about a specific job. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, updated the |
||
job1_qstat_entry = [l for l in qstat if job1._native_id in l][0] | ||
assert test_queues[0] in job1_qstat_entry | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to go a bit further than just checking if the queue name is somewhere in the qstat output. It's quite possible that we might have a queue named "test" and the word "test" appearing in an unrelated place in the qstat output. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, I applied some custom formatting on the Ex:
|
||
|
||
job2 = make_job(test_queues[1]) | ||
executor.submit(job2) | ||
qstat = get_queue_info(scheduler) | ||
job2_qstat_entry = [l for l in qstat if job2._native_id in l][0] | ||
assert test_queues[1] in job2_qstat_entry | ||
|
||
qstat = get_queue_info(scheduler) | ||
print("qstat = ", "\n".join(qstat)) | ||
|
||
kill_job(scheduler, job1) | ||
kill_job(scheduler, job2) | ||
|
||
qstat = get_queue_info(scheduler) | ||
print("qstat = ", "\n".join(qstat)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bqueues
appears both here and inSCHEDULER_COMMANDS
. Is the latter not used?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, now using the dictionary as intended