Skip to content

Commit

Permalink
Merge pull request #152 from madeline-scyphers/feature/async-opt
Browse files Browse the repository at this point in the history
Feature/async opt
  • Loading branch information
madeline-scyphers authored Sep 9, 2023
2 parents c67cf30 + 21d1afb commit 563cff8
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 39 deletions.
11 changes: 5 additions & 6 deletions boa/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from boa.config import BOAScriptOptions
from boa.controller import Controller
from boa.storage import scheduler_from_json_file
from boa.wrappers.script_wrapper import ScriptWrapper
from boa.wrappers.wrapper_utils import cd_and_cd_back, load_jsonlike

Expand Down Expand Up @@ -52,7 +53,8 @@
help="Modify/add to the config file a temporary directory as the experiment_dir that will get deleted after running"
" (useful for testing)."
" This requires your Wrapper to have the ability to take experiment_dir as an argument"
" to ``load_config``. The default ``load_config`` does support this.",
" to ``load_config``. The default ``load_config`` does support this."
" This is also only done for initial run, not for reloading from scheduler json file.",
)
@click.option(
"--rel-to-config/--rel-to-here", # more cli friendly name for config option of rel_to_launch
Expand Down Expand Up @@ -141,11 +143,8 @@ def run(config_path, scheduler_path, rel_to_config, wrapper_path=None, wrapper_n

with cd_and_cd_back(options["working_dir"]):
if scheduler_path:
print(options["working_dir"])
options = dict(
scheduler_path=scheduler_path, working_dir=options["working_dir"], wrapper_path=options["wrapper_path"]
)
controller = Controller.from_scheduler_path(**options)
scheduler = scheduler_from_json_file(filepath=scheduler_path, wrapper_path=options["wrapper_path"])
controller = Controller.from_scheduler(scheduler=scheduler, **options)
else:
if options["wrapper_path"] and Path(options["wrapper_path"]).exists():
options["wrapper"] = options["wrapper_path"]
Expand Down
241 changes: 241 additions & 0 deletions boa/async_opt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# weird file name with dash in it because CLI conventions
import dataclasses
import os
import sys
import tempfile
from pathlib import Path

import click
import pandas as pd
from attrs import fields_dict
from ax import Data
from ax.storage.json_store.decoder import object_from_json

from boa.config import BOAConfig, BOAScriptOptions, MetricType
from boa.controller import Controller
from boa.storage import scheduler_from_json_file
from boa.utils import check_min_package_version
from boa.wrappers.synthetic_wrapper import SyntheticWrapper
from boa.wrappers.wrapper_utils import load_jsonlike


@click.command()
@click.option(
"-c",
"--config-path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="Path to configuration YAML file.",
)
@click.option(
"-sp",
"--scheduler-path",
type=click.Path(),
default="",
help="Path to scheduler json file.",
)
@click.option(
"-n",
"--num-trials",
type=int,
help="Number of trials to run. Overrides trials in config file.",
)
@click.option(
"-td",
"--temporary-dir",
is_flag=True,
show_default=True,
default=False,
help="Modify/add to the config file a temporary directory as the experiment_dir that will get deleted after running"
" (useful for testing)."
" This requires your Wrapper to have the ability to take experiment_dir as an argument"
" to ``load_config``. The default ``load_config`` does support this."
" This is also only done for initial run, not for reloading from scheduler json file.",
)
def main(config_path, scheduler_path, num_trials, temporary_dir):
"""Asynchronous optimization script. Asynchronously run your optimization.
With this script, you can pass in a configuration file that specifies your
optimization parameters and objective and BOA will output a
optimization.csv file with your parameters.
BLAH BLAH BLAH
Parameters
----------
config_path
Path to configuration YAML file.
scheduler_path
Path to scheduler json file.
num_trials
Number of trials to run. Overrides trials in config file.
Returns
-------
Scheduler
"""
if temporary_dir:
with tempfile.TemporaryDirectory() as temp_dir:
experiment_dir = Path(temp_dir)
return run(
config_path=config_path,
scheduler_path=scheduler_path,
num_trials=num_trials,
experiment_dir=experiment_dir,
)
return run(
config_path=config_path,
scheduler_path=scheduler_path,
num_trials=num_trials,
)


def run(config_path, scheduler_path, num_trials, experiment_dir=None):
if experiment_dir:
experiment_dir = Path(experiment_dir).resolve()
# set num_trials before loading config because scheduler options is frozen
config_kw = (
dict(
n_trials=num_trials,
scheduler=dict(total_trials=None, n_trials=None),
)
if num_trials
else {}
)

config = None
if config_path:
config = BOAConfig.from_jsonlike(config_path, **config_kw)
if scheduler_path:
scheduler_path = Path(scheduler_path).resolve()
if not config:
sch_jsn = load_jsonlike(scheduler_path)
config = BOAConfig(**{**object_from_json(sch_jsn["wrapper"]["config"]), **config_kw})
if "steps" in config.generation_strategy:
for step in config.generation_strategy["steps"]:
step.max_parallelism = None
else:
config.generation_strategy["max_parallelism_override"] = -1
for metric in config.objective.metrics:
metric.metric = "passthrough"
metric.metric_type = MetricType.PASSTHROUGH
if experiment_dir:
config.script_options.experiment_dir = experiment_dir

if scheduler_path:
scheduler = scheduler_from_json_file(filepath=scheduler_path)
if num_trials:
scheduler.wrapper.config.scheduler = dataclasses.replace(
scheduler.wrapper.config.scheduler, total_trials=num_trials
)
scheduler.wrapper.config.n_trials = num_trials
scheduler.options = dataclasses.replace(scheduler.options, total_trials=num_trials)
else:
controller = Controller(config_path=config_path, wrapper=SyntheticWrapper(config=config))
controller.initialize_scheduler()
scheduler = controller.scheduler

if not scheduler.opt_csv.exists() and scheduler.experiment.trials:
controller.logger.warning(
"No optimization CSV found, but previous trials exist. "
"\nLikely cause was a previous run was moved with out the CSV."
)

if scheduler.opt_csv.exists():
exp_attach_data_from_opt_csv(list(config.objective.metric_names), scheduler)

generator_runs = scheduler.generation_strategy._gen_multiple(
experiment=scheduler.experiment, num_generator_runs=scheduler.wrapper.config.trials
)

for generator_run in generator_runs:
trial = scheduler.experiment.new_trial(
generator_run=generator_run,
)
trial.runner = scheduler.runner
trial.mark_running()

if scheduler.experiment.fetch_data().df.empty:
trials = scheduler.experiment.trials
metrics = scheduler.experiment.metrics
for metric in metrics.keys():
scheduler.experiment.attach_data(
Data(
df=pd.DataFrame.from_records(
dict(
trial_index=list(trials.keys()),
arm_name=[f"{i}_0" for i in trials.keys()],
metric_name=metric,
mean=None,
sem=0.0,
)
)
)
)

scheduler.save_data(metrics_to_end=True, ax_kwargs=dict(always_include_field_columns=True))
return scheduler


def exp_attach_data_from_opt_csv(metric_names, scheduler):
df = pd.read_csv(scheduler.opt_csv)
isin = df.columns.isin(metric_names).sum() == len(metric_names)
if not isin:
return

exp_df = scheduler.experiment.fetch_data().df
nan_rows = exp_df["mean"].isna()
nan_trials = exp_df.loc[nan_rows]["trial_index"].unique()
new_data = df.loc[df["trial_index"].isin(nan_trials)]
if new_data.empty:
return
metric_data = new_data[metric_names].to_dict()
if check_min_package_version("ax-platform", "0.3.3"):
kw = dict(combine_with_last_data=True)
else:
kw = dict(overwrite_existing_data=True)
scheduler.experiment.attach_data(
Data(
df=pd.DataFrame.from_records(
dict(
trial_index=[idx for trial_results in metric_data.values() for idx in trial_results.keys()],
arm_name=[f"{idx}_0" for trial_results in metric_data.values() for idx in trial_results.keys()],
metric_name=[metric for metric, trial_results in metric_data.items() for _ in trial_results],
mean=[val for trial_results in metric_data.values() for val in trial_results.values()],
sem=0.0,
)
)
),
**kw,
)


def get_config_options(script_options: dict = None):
script_options = script_options if script_options is not None else {}
append_timestamp = (
script_options.get("append_timestamp", None)
if script_options.get("append_timestamp", None) is not None
else fields_dict(BOAScriptOptions)["append_timestamp"].default
)

working_dir = script_options.get("working_dir", fields_dict(BOAScriptOptions)["working_dir"].default)
working_dir = _prepend_rel_path(os.getcwd(), working_dir)

if working_dir:
sys.path.append(str(working_dir))

return dict(
append_timestamp=append_timestamp,
working_dir=working_dir,
)


def _prepend_rel_path(rel_path, path):
if not path:
return path
path = Path(path)
if not path.is_absolute():
path = rel_path / path
return path.resolve()


if __name__ == "__main__":
main()
35 changes: 31 additions & 4 deletions boa/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import asdict as dc_asdict
from dataclasses import is_dataclass
from enum import Enum
from typing import ClassVar, Optional, Union
from typing import TYPE_CHECKING, ClassVar, Optional, Union

import attr
import ruamel.yaml
Expand All @@ -29,6 +29,9 @@
from boa.utils import StrEnum, deprecation
from boa.wrappers.wrapper_utils import load_jsonlike

if TYPE_CHECKING:
from boa.metrics.modular_metric import ModularMetric

__all__ = [
"BOAConfig",
"BOAObjective",
Expand Down Expand Up @@ -106,11 +109,12 @@ class MetricType(StrEnum):
SKLEARN_METRIC = "sklearn_metric"
SYNTHETIC_METRIC = "synthetic_metric"
PASSTHROUGH = "pass_through"
INSTANTIATED = "instantiated"


@define(kw_only=True)
class BOAMetric(_Utils):
metric: Optional[str] = field(
metric: Optional[str | ModularMetric] = field(
default=None,
metadata={
"doc": """metrics to be used for optimization. You can use list any metric in built into BOA.
Expand Down Expand Up @@ -211,6 +215,15 @@ def __init__(self, *args, lower_is_better: Optional[bool] = None, **kwargs):
def __attrs_post_init__(self):
if not self.metric and not self.name:
raise TypeError("Must specify at least metric name or metric")
from boa.metrics.modular_metric import ModularMetric

if isinstance(self.metric, ModularMetric):
self.metric_type = MetricType.INSTANTIATED
# if a passed in name, override the name of the metric
if self.name:
self.metric._name = self.name
else:
self.name = self.metric.name
if self.name is None:
self.name = self.metric
elif self.metric is None:
Expand Down Expand Up @@ -274,6 +287,10 @@ def __init__(self, **config):

self.__attrs_init__(**config)

@property
def metric_names(self):
return (metric.name for metric in self.metrics)


@define
class BOAScriptOptions(_Utils):
Expand Down Expand Up @@ -579,7 +596,8 @@ def __init__(self, **config):
scheduler = config.get("scheduler", {})
n_trials = config.get("n_trials", None)
if isinstance(scheduler, dict):
n_trials = scheduler.pop("n_trials", n_trials) # n_trials is not a valid scheduler option so we pop it
sch_n_trials = scheduler.pop("n_trials", None)
n_trials = sch_n_trials or n_trials # n_trials is not a valid scheduler option so we pop it
total_trials = scheduler.get("total_trials", None)
else:
total_trials = scheduler.total_trials
Expand Down Expand Up @@ -609,7 +627,7 @@ def __init__(self, **config):
self.__attrs_init__(**config)

@classmethod
def from_jsonlike(cls, file, rel_to_config: Optional[bool] = None, template_kw: Optional[dict] = None):
def from_jsonlike(cls, file, rel_to_config: Optional[bool] = None, template_kw: Optional[dict] = None, **kwargs):
config_path = pathlib.Path(file).resolve()
config = load_jsonlike(config_path, template_kw=template_kw)

Expand All @@ -627,6 +645,7 @@ def from_jsonlike(cls, file, rel_to_config: Optional[bool] = None, template_kw:
config["script_options"]["rel_to_launch"] = False
config["script_options"]["base_path"] = config_path.parent

update_dict(config, kwargs)
return cls(**config, config_path=file)

# @classmethod
Expand Down Expand Up @@ -882,6 +901,14 @@ def add_comment_recurse(
return d


def update_dict(original: dict, param: dict):
for key in param.keys():
if isinstance(param[key], dict) and key in original:
update_dict(original[key], param[key])
else:
original[key] = param[key]


if __name__ == "__main__": # pragma: no cover
from tests.conftest import TEST_CONFIG_DIR

Expand Down
Loading

0 comments on commit 563cff8

Please sign in to comment.