Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor lithops to be a bit more like modal. #102

Merged
merged 1 commit into from
Aug 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def execute_plan(self, plan: Task, **kwargs):
plan(executor)


def run_func(input, func=None, config=None, name=None):
result = func(input, config=config)
return result


def map_unordered(
lithops_function_executor,
map_function,
Expand All @@ -58,6 +63,7 @@ def map_unordered(
max_failures=3,
use_backups=False,
return_stats=False,
**kwargs,
):
"""
Apply a function to items of an input list, yielding results as they are completed
Expand Down Expand Up @@ -85,10 +91,12 @@ def map_unordered(
backups = {}
pending = []

# can't use functools.partial here as we get an error in lithops
# also, lithops extra_args doesn't work for this case
partial_map_function = lambda x: map_function(x, **kwargs)

futures = lithops_function_executor.map(
map_function,
inputs,
include_modules=include_modules,
partial_map_function, inputs, include_modules=include_modules
)
tasks.update({k: v for (k, v) in zip(futures, inputs)})
start_times.update({k: time.monotonic() for k in futures})
Expand Down Expand Up @@ -130,7 +138,7 @@ def map_unordered(
inputs = [v for (fut, v) in tasks.items() if fut in failed]
# TODO: de-duplicate code from above
futures = lithops_function_executor.map(
map_function,
partial_map_function,
inputs,
include_modules=include_modules,
)
Expand All @@ -147,7 +155,7 @@ def map_unordered(
inputs = [v for (fut, v) in tasks.items() if fut == future]
logger.info("Running backup task for %s", inputs)
futures = lithops_function_executor.map(
map_function,
partial_map_function,
inputs,
include_modules=include_modules,
)
Expand All @@ -161,6 +169,31 @@ def map_unordered(
time.sleep(1)


def execute_dag(dag, callbacks=None, **kwargs):
use_backups = kwargs.pop("use_backups", False)
with FunctionExecutor(**kwargs) as executor:
for name, node in visit_nodes(dag):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
for _, stats in map_unordered(
executor,
run_func,
list(stage.mappable),
func=stage.function,
config=pipeline.config,
name=name,
include_modules=["cubed"],
use_backups=use_backups,
return_stats=True,
):
if callbacks is not None:
event = lithops_stats_to_task_end_event(name, stats)
[callback.on_task_end(event) for callback in callbacks]
else:
raise NotImplementedError()


def lithops_stats_to_task_end_event(name, stats):
return TaskEndEvent(
array_name=name,
Expand Down Expand Up @@ -221,21 +254,4 @@ def __init__(self, **kwargs):

def execute_dag(self, dag, callbacks=None, **kwargs):
merged_kwargs = {**self.kwargs, **kwargs}
use_backups = merged_kwargs.pop("use_backups", False)
with FunctionExecutor(**merged_kwargs) as executor:
for name, node in visit_nodes(dag):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
stage_func = build_stage_mappable_func(
stage,
pipeline.config,
name=name,
callbacks=callbacks,
use_backups=use_backups,
)
else:
stage_func = build_stage_func(stage, pipeline.config)

# execute each stage in series
stage_func(executor)
execute_dag(dag, callbacks=callbacks, **merged_kwargs)