Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task events #46

Merged
merged 5 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
omit =
*/tests/*
cubed/array_api/*
cubed/extensions/*
13 changes: 12 additions & 1 deletion cubed/core/array.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dataclasses import dataclass
from operator import mul
from typing import Optional

from dask.array.core import normalize_chunks
from toolz import map, reduce
Expand Down Expand Up @@ -121,5 +123,14 @@ def on_compute_start(self, arr):
def on_compute_end(self, arr):
pass # pragma: no cover

def on_task_end(self, name=None):
def on_task_end(self, event):
pass # pragma: no cover


@dataclass
class TaskEndEvent:
array_name: str
task_create_tstamp: Optional[float] = None
function_start_tstamp: Optional[float] = None
function_end_tstamp: Optional[float] = None
task_result_tstamp: Optional[float] = None
90 changes: 90 additions & 0 deletions cubed/extensions/timeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import time
from dataclasses import asdict

import matplotlib.patches as mpatches
import numpy as np
import pandas as pd
import pylab
import seaborn as sns

from cubed.core.array import Callback

sns.set_style("whitegrid")
pylab.switch_backend("Agg")


class TimelineVisualizationCallback(Callback):
def on_compute_start(self, arr):
self.start_tstamp = time.time()
self.stats = []

def on_task_end(self, event):
self.stats.append(asdict(event))

def on_compute_end(self, arr):
end_tstamp = time.time()
create_timeline(self.stats, self.start_tstamp, end_tstamp)


# copy of lithops function of the same name, and modified for different field names
def create_timeline(stats, start_tstamp, end_tstamp, dst=None):
stats_df = pd.DataFrame(stats)

total_calls = len(stats_df)

palette = sns.color_palette("deep", 6)

fig = pylab.figure(figsize=(10, 6))
ax = fig.add_subplot(1, 1, 1)

y = np.arange(total_calls)
point_size = 10

fields = [
("task create", stats_df.task_create_tstamp - start_tstamp),
("function start", stats_df.function_start_tstamp - start_tstamp),
("function end", stats_df.function_end_tstamp - start_tstamp),
("task result", stats_df.task_result_tstamp - start_tstamp),
]

patches = []
for f_i, (field_name, val) in enumerate(fields):
ax.scatter(val, y, c=[palette[f_i]], edgecolor="none", s=point_size, alpha=0.8)
patches.append(mpatches.Patch(color=palette[f_i], label=field_name))

ax.set_xlabel("Execution Time (sec)")
ax.set_ylabel("Function Call")

legend = pylab.legend(handles=patches, loc="upper right", frameon=True)
legend.get_frame().set_facecolor("#FFFFFF")

yplot_step = int(np.max([1, total_calls / 20]))
y_ticks = np.arange(total_calls // yplot_step + 2) * yplot_step
ax.set_yticks(y_ticks)
ax.set_ylim(-0.02 * total_calls, total_calls * 1.02)
for y in y_ticks:
ax.axhline(y, c="k", alpha=0.1, linewidth=1)

max_seconds = np.max(end_tstamp - start_tstamp) * 1.25
xplot_step = max(int(max_seconds / 8), 1)
x_ticks = np.arange(max_seconds // xplot_step + 2) * xplot_step
ax.set_xlim(0, max_seconds)

ax.set_xticks(x_ticks)
for x in x_ticks:
ax.axvline(x, c="k", alpha=0.2, linewidth=0.8)

ax.grid(False)
fig.tight_layout()

if dst is None:
os.makedirs("plots", exist_ok=True)
dst = os.path.join(
os.getcwd(), "plots", "{}_{}".format(int(time.time()), "timeline.png")
)
else:
dst = os.path.expanduser(dst) if "~" in dst else dst
dst = "{}_{}".format(os.path.realpath(dst), "timeline.png")

fig.savefig(dst)
4 changes: 2 additions & 2 deletions cubed/extensions/tqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def on_compute_end(self, arr):
for pbar in self.pbars.values():
pbar.close()

def on_task_end(self, name):
self.pbars[name].update()
def on_task_end(self, event):
self.pbars[event.array_name].update()


@contextlib.contextmanager
Expand Down
26 changes: 22 additions & 4 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from rechunker.types import ParallelPipelines, PipelineExecutor
from six import reraise

from cubed.core.array import TaskEndEvent
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.pipeline import already_computed
from cubed.runtime.types import DagExecutor
Expand Down Expand Up @@ -54,6 +55,7 @@ def map_unordered(
include_modules=[],
max_failures=3,
use_backups=False,
return_stats=False,
):
"""
Apply a function to items of an input list, yielding results as they are completed
Expand All @@ -67,8 +69,9 @@ def map_unordered(
:param include_modules: Modules to include.
:param max_failures: The number of task failures to allow before raising an exception.
:param use_backups: Whether to launch backup tasks to mitigate against slow-running tasks.
:param return_stats: Whether to return lithops stats.

:return: Function values as they are completed, not necessarily in the input order.
:return: Function values (and optionally stats) as they are completed, not necessarily in the input order.
"""
failures = 0
return_when = ALWAYS if use_backups else ANY_COMPLETED
Expand Down Expand Up @@ -106,7 +109,10 @@ def map_unordered(
failed.append(future)
else:
end_times[future] = time.monotonic()
yield future.result()
if return_stats:
yield future.result(), future.stats
else: # pragma: no cover
yield future.result()

if use_backups:
# remove backups
Expand Down Expand Up @@ -153,22 +159,34 @@ def map_unordered(
time.sleep(1)


def lithops_stats_to_task_end_event(name, stats):
return TaskEndEvent(
array_name=name,
task_create_tstamp=stats["host_job_create_tstamp"],
function_start_tstamp=stats["worker_func_start_tstamp"],
function_end_tstamp=stats["worker_func_end_tstamp"],
task_result_tstamp=stats["host_status_done_tstamp"],
)


def build_stage_mappable_func(
stage, config, name=None, callbacks=None, use_backups=False
):
def sf(mappable):
return stage.function(mappable, config=config)

def stage_func(lithops_function_executor):
for _ in map_unordered(
for _, stats in map_unordered(
lithops_function_executor,
sf,
list(stage.mappable),
include_modules=["cubed"],
use_backups=use_backups,
return_stats=True,
):
if callbacks is not None:
[callback.on_task_end(name) for callback in callbacks]
event = lithops_stats_to_task_end_event(name, stats)
[callback.on_task_end(event) for callback in callbacks]

return stage_func

Expand Down
7 changes: 5 additions & 2 deletions cubed/runtime/executors/python.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import networkx as nx
from tenacity import retry, stop_after_attempt

from cubed.core.array import TaskEndEvent
from cubed.runtime.pipeline import already_computed
from cubed.runtime.types import DagExecutor

Expand All @@ -22,8 +23,10 @@ def execute_dag(self, dag, callbacks=None, **kwargs):
for m in stage.mappable:
exec_stage_func(stage.function, m, config=pipeline.config)
if callbacks is not None:
[callback.on_task_end(node) for callback in callbacks]
event = TaskEndEvent(array_name=node)
[callback.on_task_end(event) for callback in callbacks]
else:
exec_stage_func(stage.function, config=pipeline.config)
if callbacks is not None:
[callback.on_task_end(node) for callback in callbacks]
event = TaskEndEvent(array_name=node)
[callback.on_task_end(event) for callback in callbacks]
10 changes: 9 additions & 1 deletion cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,15 @@ class TaskCounter(Callback):
def on_compute_start(self, arr):
self.value = 0

def on_task_end(self, name=None):
def on_task_end(self, event):
if event.task_create_tstamp is not None:
assert (
event.task_result_tstamp
>= event.function_end_tstamp
>= event.function_start_tstamp
>= event.task_create_tstamp
> 0
)
self.value += 1


Expand Down
4 changes: 3 additions & 1 deletion examples/lithops-add-random.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import cubed
import cubed.array_api as xp
import cubed.random
from cubed.extensions.timeline import TimelineVisualizationCallback
from cubed.extensions.tqdm import TqdmProgressBar
from cubed.runtime.executors.lithops import LithopsDagExecutor

Expand All @@ -30,10 +31,11 @@
c = xp.add(a, b)
with logging_redirect_tqdm():
progress = TqdmProgressBar()
timeline_viz = TimelineVisualizationCallback()
c.compute(
return_stored=False,
executor=executor,
callbacks=[progress],
callbacks=[progress, timeline_viz],
runtime=runtime,
runtime_memory=2000,
)