Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel evaluation of tasks #1020

Merged
merged 10 commits into from
Feb 18, 2021
Next Next commit
Black fix + removal of untested unit test
  • Loading branch information
Neeratyoy committed Feb 1, 2021
commit c72c9828d40850ffbfc5cdf706253f820947a22a
12 changes: 12 additions & 0 deletions openml/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def set_file_log_level(file_output_level: int):
"avoid_duplicate_runs": "True",
"connection_n_retries": 10,
"max_retries": 20,
"n_jobs": 4,
}

config_file = os.path.expanduser(os.path.join("~", ".openml", "config"))
Expand Down Expand Up @@ -118,6 +119,7 @@ def get_server_base_url() -> str:
# Number of retries if the connection breaks
connection_n_retries = _defaults["connection_n_retries"]
max_retries = _defaults["max_retries"]
n_jobs = _defaults["n_jobs"]


class ConfigurationForExamples:
Expand Down Expand Up @@ -170,6 +172,12 @@ def stop_using_configuration_for_example(cls):
apikey = cls._last_used_key
cls._start_last_called = False

@classmethod
def set_n_jobs_for_parallel_runs(cls, n=4):
""" Set the number of workers to be used while running a flow/model on a task. """
global n_jobs
n_jobs = n


def _setup():
"""Setup openml package. Called on first import.
Expand All @@ -186,6 +194,7 @@ def _setup():
global avoid_duplicate_runs
global connection_n_retries
global max_retries
global n_jobs

# read config file, create cache directory
try:
Expand Down Expand Up @@ -288,12 +297,15 @@ def set_cache_directory(cachedir):
ConfigurationForExamples.start_using_configuration_for_example
)
stop_using_configuration_for_example = ConfigurationForExamples.stop_using_configuration_for_example
set_n_jobs_for_parallel_runs = ConfigurationForExamples.set_n_jobs_for_parallel_runs


__all__ = [
"get_cache_directory",
"set_cache_directory",
"start_using_configuration_for_example",
"stop_using_configuration_for_example",
"set_n_jobs_for_parallel_runs",
]

_setup()
Expand Down
129 changes: 86 additions & 43 deletions openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import xmltodict
import numpy as np
import pandas as pd
from joblib.parallel import Parallel, delayed

import openml
import openml.utils
Expand Down Expand Up @@ -447,55 +448,33 @@ def _run_task_get_arffcontent(
# methods, less maintenance, less confusion)
num_reps, num_folds, num_samples = task.get_split_dimensions()

jobs = []
for n_fit, (rep_no, fold_no, sample_no) in enumerate(
itertools.product(range(num_reps), range(num_folds), range(num_samples),), start=1
):

train_indices, test_indices = task.get_train_test_split_indices(
repeat=rep_no, fold=fold_no, sample=sample_no
)
if isinstance(task, OpenMLSupervisedTask):
x, y = task.get_X_and_y(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
train_y = y.iloc[train_indices]
test_x = x.iloc[test_indices]
test_y = y.iloc[test_indices]
else:
train_x = x[train_indices]
train_y = y[train_indices]
test_x = x[test_indices]
test_y = y[test_indices]
elif isinstance(task, OpenMLClusteringTask):
x = task.get_X(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
else:
train_x = x[train_indices]
train_y = None
test_x = None
test_y = None
else:
raise NotImplementedError(task.task_type)

config.logger.info(
"Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.",
flow.name,
task.task_id,
rep_no,
fold_no,
sample_no,
)

pred_y, proba_y, user_defined_measures_fold, trace = extension._run_model_on_fold(
jobs.append((n_fit, rep_no, fold_no, sample_no))

# Execute runs in parallel
# assuming the same number of tasks as workers (n_jobs), the total compute time for this
# statement will be similar to the slowest run
job_rvals = Parallel(verbose=0, n_jobs=openml.config.n_jobs)(
delayed(_run_task_get_arffcontent_parallel_helper)(
extension=extension,
flow=flow,
fold_no=fold_no,
model=model,
task=task,
X_train=train_x,
y_train=train_y,
rep_no=rep_no,
fold_no=fold_no,
X_test=test_x,
sample_no=sample_no,
task=task,
dataset_format=dataset_format,
)
for n_fit, rep_no, fold_no, sample_no in jobs
) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs`

for n_fit, rep_no, fold_no, sample_no in jobs:
pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold = job_rvals[
n_fit - 1
]
if trace is not None:
traces.append(trace)

Expand Down Expand Up @@ -615,6 +594,70 @@ def _calculate_local_measure(sklearn_fn, openml_name):
)


def _run_task_get_arffcontent_parallel_helper(
extension: "Extension",
flow: OpenMLFlow,
fold_no: int,
model: Any,
rep_no: int,
sample_no: int,
task: OpenMLTask,
dataset_format: str,
) -> Tuple[
np.ndarray,
Optional[pd.DataFrame],
np.ndarray,
Optional[pd.DataFrame],
Optional[OpenMLRunTrace],
"OrderedDict[str, float]",
]:
train_indices, test_indices = task.get_train_test_split_indices(
repeat=rep_no, fold=fold_no, sample=sample_no
)

if isinstance(task, OpenMLSupervisedTask):
x, y = task.get_X_and_y(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
train_y = y.iloc[train_indices]
test_x = x.iloc[test_indices]
test_y = y.iloc[test_indices]
else:
train_x = x[train_indices]
train_y = y[train_indices]
test_x = x[test_indices]
test_y = y[test_indices]
elif isinstance(task, OpenMLClusteringTask):
x = task.get_X(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
else:
train_x = x[train_indices]
train_y = None
test_x = None
test_y = None
else:
raise NotImplementedError(task.task_type)
config.logger.info(
"Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.",
flow.name,
task.task_id,
rep_no,
fold_no,
sample_no,
)
(pred_y, proba_y, user_defined_measures_fold, trace,) = extension._run_model_on_fold(
mfeurer marked this conversation as resolved.
Show resolved Hide resolved
model=model,
task=task,
X_train=train_x,
y_train=train_y,
rep_no=rep_no,
fold_no=fold_no,
X_test=test_x,
)
return pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold


def get_runs(run_ids):
"""Gets all runs in run_ids list.

Expand Down
50 changes: 50 additions & 0 deletions tests/test_runs/test_run_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import unittest.mock

import numpy as np
from joblib import parallel_backend

import openml
import openml.exceptions
Expand Down Expand Up @@ -1575,3 +1576,52 @@ def test_format_prediction_task_regression(self):
ignored_input = [0] * 5
res = format_prediction(regression, *ignored_input)
self.assertListEqual(res, [0] * 5)

@unittest.mock.patch("openml.runs.functions._run_task_get_arffcontent_parallel_helper")
def test__run_task_get_arffcontent_2(self, mock):
# Unit test style test
def side_effect(*args, **kwargs):
return (
np.array([0, 1]),
np.array([[0.8, 0.2], [0.2, 0.8]]),
np.array([1, 2]),
np.array([1, 1]),
None,
{},
)

mock.side_effect = side_effect

task = openml.tasks.get_task(7)

flow = unittest.mock.Mock()
flow.name = "dummy"
clf = SGDClassifier(loss="log", random_state=1)

# Unit test doesn't work with loky and multiprocessing backend
for n_jobs, backend, call_count in (
(1, "sequential", 10),
(1, "threading", 20),
(-1, "threading", 30),
(2, "threading", 40),
(None, "threading", 50),
):
with parallel_backend(backend, n_jobs=n_jobs):
mfeurer marked this conversation as resolved.
Show resolved Hide resolved
res = openml.runs.functions._run_task_get_arffcontent(
flow=flow,
extension=self.extension,
model=clf,
task=task,
add_local_measures=True,
dataset_format="dataframe",
)
assert len(res) == 4, len(res) # General function interface
assert len(res[0]) == 20 # 10 folds x 2 predictions returned
assert res[1] is None # No trace
assert len(res[2]) == 1
assert len(res[2]["predictive_accuracy"]) == 1
assert len(res[2]["predictive_accuracy"][0]) == 10
assert len(res[3]) == 1
assert len(res[3]["predictive_accuracy"]) == 1
assert len(res[3]["predictive_accuracy"][0]) == 10
assert mock.call_count == call_count, (mock.call_count, call_count)