Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel evaluation of tasks #1020

Merged
merged 10 commits into from
Feb 18, 2021
Prev Previous commit
Next Next commit
Black fix
  • Loading branch information
Neeratyoy committed Feb 3, 2021
commit 06b1fa4dae47170e7b0d18f47168f7d0bd7a7a14
43 changes: 34 additions & 9 deletions openml/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def set_n_jobs_for_parallel_runs(cls, n=4):
n_jobs = n


def _setup():
def _setup(config=None):
"""Setup openml package. Called on first import.

Reads the config file and sets up apikey, server, cache appropriately.
Expand All @@ -203,23 +203,35 @@ def _setup():
# For other errors, we want to propagate the error as openml does not work without cache
pass

config = _parse_config()
apikey = config.get("FAKE_SECTION", "apikey")
server = config.get("FAKE_SECTION", "server")
if config is None:
config = _parse_config()

short_cache_dir = config.get("FAKE_SECTION", "cachedir")
cache_directory = os.path.expanduser(short_cache_dir)
def _get(config, key):
return config.get("FAKE_SECTION", key)

avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs")
else:

def _get(config, key):
return config.get(key)

avoid_duplicate_runs = config.get("avoid_duplicate_runs")

apikey = _get(config, "apikey")
server = _get(config, "server")
short_cache_dir = _get(config, "cachedir")
connection_n_retries = _get(config, "connection_n_retries")
max_retries = _get(config, "max_retries")
n_jobs = _get(config, "n_jobs")

cache_directory = os.path.expanduser(short_cache_dir)
# create the cache subdirectory
try:
os.mkdir(cache_directory)
except FileExistsError:
# For other errors, we want to propagate the error as openml does not work without cache
pass

avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs")
connection_n_retries = config.get("FAKE_SECTION", "connection_n_retries")
max_retries = config.get("FAKE_SECTION", "max_retries")
if connection_n_retries > max_retries:
raise ValueError(
"A higher number of retries than {} is not allowed to keep the "
Expand Down Expand Up @@ -255,6 +267,18 @@ def _parse_config():
return config


def get_config_as_dict():
config = dict()
config["apikey"] = apikey
config["server"] = server
config["cachedir"] = cache_directory
config["avoid_duplicate_runs"] = avoid_duplicate_runs
config["connection_n_retries"] = connection_n_retries
config["max_retries"] = max_retries
config["n_jobs"] = n_jobs
return config


def get_cache_directory():
"""Get the current cache directory.

Expand Down Expand Up @@ -306,6 +330,7 @@ def set_cache_directory(cachedir):
"start_using_configuration_for_example",
"stop_using_configuration_for_example",
"set_n_jobs_for_parallel_runs",
"get_config_as_dict",
]

_setup()
Expand Down
11 changes: 10 additions & 1 deletion openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,13 @@ def _run_task_get_arffcontent(
):
jobs.append((n_fit, rep_no, fold_no, sample_no))

# The forked child process may not copy the configuration state of OpenML from the parent.
# Current configuration setup needs to be copied and passed to the child processes.
_config = config.get_config_as_dict()
# Execute runs in parallel
# assuming the same number of tasks as workers (n_jobs), the total compute time for this
# statement will be similar to the slowest run
job_rvals = Parallel(verbose=0, n_jobs=openml.config.n_jobs)(
job_rvals = Parallel(verbose=0, n_jobs=config.n_jobs)(
delayed(_run_task_get_arffcontent_parallel_helper)(
extension=extension,
flow=flow,
Expand All @@ -467,6 +470,7 @@ def _run_task_get_arffcontent(
sample_no=sample_no,
task=task,
dataset_format=dataset_format,
configuration=_config,
)
for n_fit, rep_no, fold_no, sample_no in jobs
) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs`
Expand Down Expand Up @@ -603,6 +607,7 @@ def _run_task_get_arffcontent_parallel_helper(
sample_no: int,
task: OpenMLTask,
dataset_format: str,
configuration: Dict = None,
) -> Tuple[
np.ndarray,
Optional[pd.DataFrame],
Expand All @@ -611,6 +616,10 @@ def _run_task_get_arffcontent_parallel_helper(
Optional[OpenMLRunTrace],
"OrderedDict[str, float]",
]:
# Sets up the OpenML instantiated in the child process to match that of the parent's
# if configuration=None, loads the default
config._setup(configuration)

train_indices, test_indices = task.get_train_test_split_indices(
repeat=rep_no, fold=fold_no, sample=sample_no
)
Expand Down
28 changes: 28 additions & 0 deletions tests/test_openml/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,34 @@ def test_config_loading(self):
self.assertTrue(os.path.exists(openml.config.config_file))
self.assertTrue(os.path.isdir(os.path.expanduser("~/.openml")))

def test_get_config_as_dict(self):
""" Checks if the current configuration is returned accurately as a dict. """
config = openml.config.get_config_as_dict()
self.assertIsInstance(config, dict)
self.assertEqual(len(config), 7)
self.assertEqual(config.get("server"), "https://test.openml.org/api/v1/xml")
self.assertEqual(config.get("apikey"), "610344db6388d9ba34f6db45a3cf71de")
self.assertEqual(config.get("cachedir"), self.workdir)
self.assertEqual(config.get("avoid_duplicate_runs"), False)
self.assertEqual(config.get("max_retries"), 20)
mfeurer marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(config.get("n_jobs"), 4)

def test_setup_with_config(self):
""" Checks if the OpenML configuration can be updated using _setup(). """
_config = dict()
_config["apikey"] = "610344db6388d9ba34f6db45a3cf71de"
_config["server"] = "https://www.openml.org/api/v1/xml"
_config["cachedir"] = self.workdir
_config["avoid_duplicate_runs"] = True
_config["connection_n_retries"] = 100
_config["max_retries"] = 1000
_config["n_jobs"] = 64
orig_config = openml.config.get_config_as_dict()
openml.config._setup(_config)
updated_config = openml.config.get_config_as_dict()
openml.config._setup(orig_config) # important to not affect other unit tests
self.assertDictEqual(_config, updated_config)


class TestConfigurationForExamples(openml.testing.TestBase):
def test_switch_to_example_configuration(self):
Expand Down
65 changes: 20 additions & 45 deletions tests/test_runs/test_run_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,51 +1577,26 @@ def test_format_prediction_task_regression(self):
res = format_prediction(regression, *ignored_input)
self.assertListEqual(res, [0] * 5)

@unittest.mock.patch("openml.runs.functions._run_task_get_arffcontent_parallel_helper")
def test__run_task_get_arffcontent_2(self, mock):
# Unit test style test
def side_effect(*args, **kwargs):
return (
np.array([0, 1]),
np.array([[0.8, 0.2], [0.2, 0.8]]),
np.array([1, 2]),
np.array([1, 1]),
None,
{},
)

mock.side_effect = side_effect

task = openml.tasks.get_task(7)

def test__run_task_get_arffcontent_2(self):
""" Tests if a run executed in parallel is collated correctly. """
task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp
x, y = task.get_X_and_y(dataset_format="dataframe")
num_instances = x.shape[0]
line_length = 6 + len(task.class_labels)
flow = unittest.mock.Mock()
flow.name = "dummy"
clf = SGDClassifier(loss="log", random_state=1)

# Unit test doesn't work with loky and multiprocessing backend
for n_jobs, backend, call_count in (
(1, "sequential", 10),
(1, "threading", 20),
(-1, "threading", 30),
(2, "threading", 40),
(None, "threading", 50),
):
with parallel_backend(backend, n_jobs=n_jobs):
res = openml.runs.functions._run_task_get_arffcontent(
flow=flow,
extension=self.extension,
model=clf,
task=task,
add_local_measures=True,
dataset_format="dataframe",
)
assert len(res) == 4, len(res) # General function interface
assert len(res[0]) == 20 # 10 folds x 2 predictions returned
assert res[1] is None # No trace
assert len(res[2]) == 1
assert len(res[2]["predictive_accuracy"]) == 1
assert len(res[2]["predictive_accuracy"][0]) == 10
assert len(res[3]) == 1
assert len(res[3]["predictive_accuracy"]) == 1
assert len(res[3]["predictive_accuracy"][0]) == 10
assert mock.call_count == call_count, (mock.call_count, call_count)
with parallel_backend("loky", n_jobs=2):
res = openml.runs.functions._run_task_get_arffcontent(
flow=flow,
extension=self.extension,
model=clf,
task=task,
add_local_measures=True,
dataset_format="array",
)
self.assertEqual(type(res[0]), list)
self.assertEqual(len(res[0]), num_instances)
self.assertEqual(len(res[0][0]), line_length)
self.assertEqual(len(res[2]), 7)
self.assertEqual(len(res[3]), 7)
mfeurer marked this conversation as resolved.
Show resolved Hide resolved