Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

378 test queue parameter #380

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
129 changes: 129 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os
import random

import logging
from pathlib import Path
from typing import List

import pytest

from psij import Job, JobSpec, JobAttributes, JobExecutor, ResourceSpecV1
from tempfile import TemporaryDirectory

from executor_test_params import ExecutorTestParams
from _test_tools import _get_executor_instance, _get_timeout, assert_completed, _make_test_dir


SCHEDULER_COMMANDS = {
"slurm": {
"get_queues": "mdiag -c",
"get_user_jobs": "squeue -u $(whoami)",
"kill_command": "scancel"
},
"lsf": {
"get_queues": "bqueues -u $(whoami)",
"get_user_jobs": "bjobs",
"kill_command": "bkill"
}
}


def get_slurm_queues() -> List[str]:
out = os.popen("mdiag -c").read().split("\n")
return [line.split("=")[-1] for line in out if "PartitionName" in line]


def get_lsf_queues() -> List[str]:
valid_queues = []
out = "".join(os.popen("bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'").read()).split("\n")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bqueues appears both here and in SCHEDULER_COMMANDS. Is the latter not used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, now using the dictionary as intended

out = [l for l in out if len(l) != 0]
out = [l.split(" ") for l in out]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out seems to be both List[str] and List[List[str]] here. It might be best if we didn't overload it like that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if len(out) == 0:
return []
out.pop(0) # popping headers

for queue_info in out:
name = queue_info[0]
njobs = int(queue_info[1])
pend = int(queue_info[2])
run = int(queue_info[3])
susp = int(queue_info[4])
status = str(queue_info[5])

if "active" not in status.lower():
continue

if (njobs + pend + run + susp) > 10:
valid_queues.append(name)

return valid_queues


def get_queue_info(executor: str) -> List[str]:
res = []
command = SCHEDULER_COMMANDS[executor]["get_user_jobs"]
res.extend(os.popen(command).read().split("\n"))
return res


def kill_job(scheduler: str, job: Job) -> None:
command = f"{SCHEDULER_COMMANDS[scheduler]['kill_command']} {job._native_id}"
print("Kill command:", command)
os.system(command)


def make_job(queue:str) -> Job:
return Job(
JobSpec(
executable="/bin/date",
attributes=JobAttributes(
queue_name=queue,
),
)
)


def test_queue(execparams: ExecutorTestParams) -> None:
scheduler = ""
queues = []
slurm_queues = get_slurm_queues()
lsf_queues = get_lsf_queues()

queues.extend(slurm_queues)
queues.extend(lsf_queues)
Comment on lines +91 to +95
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a general note, but we know what scheduler we have from execparams.

That's not the point I wanted to make though. The idea of running all possible get_*_qeues() and merging the results with the assumption that at most one of them will return non-empty results probably works. But it does so in an unnecessarily twisted way and it does rely on an assumption that isn't necessary to make or reason through.


if len(slurm_queues) != 0:
scheduler = "slurm"
elif len(lsf_queues) != 0:
scheduler = "lsf"
Comment on lines +97 to +100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

So execparams is there to parametrize executors when multiple executors are available on a system. For example, on a SLURM system, a test with an execparams parameter will be invoked multiple times for all combinations of executor in ["local", "batch-test", "slurm"] \crossproduct launcher in ["single", "multiple", "mpirun", "srun"}.

If you ignore execparams and detect what's installed the way it's done here, it will work, but it will run the same test multiple times for no good reason.

Instead, we should run this test on only one of the launchers (the launcher doesn't matter because we don't actually care about launching a job in this test) and using all executors. So something like if execparams.launcher == 'single' then do what we need to do with the assumption that our scheduler is execparams.executor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you take a look at my implementation? I kept the way I was detecting it, but am now only running the test when execparams.launcher == "single"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about detecting the LRM on the system but the fact that we test multiple executors on that system. So even if you restrict it to the single launcher, it will still be repeated for the local, batch-test, and whatever PSI/J detected to be the scheduler.

You could remove execparams, but then you risk not having access to other necessary parameters that might be set by the users that set up the tests. By the way, you may want to use execparams.custom_attributes, since some systems require setting various things, like an account or project.


if len(queues) < 2:
pytest.raises(Exception("Need at least two queues to perform this test"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are looking for pytest.skip instead.
pytest.raises is used to test that a block of code throws a specific exception. For example,

def test_that_division_by_zero_correctly_raises_exception():
    with pytest.raises(ZeroDivisionError):
        1 / 0

In other words you use it to check that you test throws an exception. If you had 1 / 1 instead of 1 / 0, pytest.raises would actually cause the test to fail because it did not throw the exception that was expected.

return

print("available queues:", queues)
test_queues = random.sample(queues, 2)
print("test queues:", test_queues)

executor = JobExecutor.get_instance(scheduler)

job1 = make_job(test_queues[0])
executor.submit(job1)
qstat = get_queue_info(scheduler)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these commands accept a job id as an argument to only return info about a specific job.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, updated the qstat equivalents to query just for the job.

job1_qstat_entry = [l for l in qstat if job1._native_id in l][0]
assert test_queues[0] in job1_qstat_entry
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to go a bit further than just checking if the queue name is somewhere in the qstat output. It's quite possible that we might have a queue named "test" and the word "test" appearing in an unrelated place in the qstat output.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I applied some custom formatting on the qstat equivalents, so they now only report the queue name. This allows me to keep the existing assert logic

Ex:

$ bjobs -o "queue" 4775749
QUEUE
pbatch


job2 = make_job(test_queues[1])
executor.submit(job2)
qstat = get_queue_info(scheduler)
job2_qstat_entry = [l for l in qstat if job2._native_id in l][0]
assert test_queues[1] in job2_qstat_entry

qstat = get_queue_info(scheduler)
print("qstat = ", "\n".join(qstat))

kill_job(scheduler, job1)
kill_job(scheduler, job2)

qstat = get_queue_info(scheduler)
print("qstat = ", "\n".join(qstat))