diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index fea7311a..a4d4528a 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -27,12 +27,17 @@ ) from cubed.runtime.pipeline import visit_node_generations, visit_nodes from cubed.runtime.types import Callback, DagExecutor -from cubed.runtime.utils import handle_callbacks, handle_operation_start_callbacks +from cubed.runtime.utils import ( + handle_callbacks, + handle_operation_start_callbacks, + profile_memray, +) from cubed.spec import Spec logger = logging.getLogger(__name__) +@profile_memray def run_func(input, func=None, config=None, name=None, compute_id=None): result = func(input, config=config) return result @@ -171,6 +176,7 @@ def execute_dag( ) -> None: use_backups = kwargs.pop("use_backups", True) wait_dur_sec = kwargs.pop("wait_dur_sec", None) + compute_id = kwargs.pop("compute_id") allowed_mem = spec.allowed_mem if spec is not None else None function_executor = FunctionExecutor(**kwargs) runtime_memory_mb = function_executor.config[function_executor.backend].get( @@ -199,6 +205,7 @@ def execute_dag( func=pipeline.function, config=pipeline.config, name=name, + compute_id=compute_id, ): handle_callbacks(callbacks, stats) else: @@ -224,7 +231,8 @@ def execute_dag( use_backups=use_backups, return_stats=True, wait_dur_sec=wait_dur_sec, - # TODO: kwargs + # TODO: other kwargs (func, config, name) + compute_id=compute_id, ): handle_callbacks(callbacks, stats) diff --git a/cubed/runtime/executors/local.py b/cubed/runtime/executors/local.py index 588cdd71..2eeddc0d 100644 --- a/cubed/runtime/executors/local.py +++ b/cubed/runtime/executors/local.py @@ -19,6 +19,7 @@ execution_stats, handle_callbacks, handle_operation_start_callbacks, + profile_memray, ) from cubed.spec import Spec @@ -59,6 +60,7 @@ def execute_dag( [callback.on_task_end(event) for callback in callbacks] +@profile_memray @execution_stats def run_func(input, func=None, config=None, name=None, compute_id=None): return func(input, config=config) diff --git a/cubed/runtime/utils.py b/cubed/runtime/utils.py index 894e6ba7..a3bbbf98 100644 --- a/cubed/runtime/utils.py +++ b/cubed/runtime/utils.py @@ -1,10 +1,17 @@ import time +from contextlib import nullcontext from functools import partial from itertools import islice +from pathlib import Path from cubed.runtime.types import OperationStartEvent, TaskEndEvent from cubed.utils import peak_measured_mem +try: + import memray +except ImportError: + memray = None + sym_counter = 0 @@ -39,6 +46,32 @@ def execution_stats(func): return partial(execute_with_stats, func) +def execute_with_memray(function, input, **kwargs): + # only run memray if installed, and only for first input (for operations that run on block locations) + if ( + memray is not None + and "compute_id" in kwargs + and isinstance(input, list) + and all(isinstance(i, int) for i in input) + and sum(input) == 0 + ): + compute_id = kwargs["compute_id"] + name = kwargs["name"] + memray_dir = Path(f"history/{compute_id}/memray") + memray_dir.mkdir(parents=True, exist_ok=True) + cm = memray.Tracker(memray_dir / f"{name}.bin") + else: + cm = nullcontext() + with cm: + result = result = function(input, **kwargs) + return result + + +def profile_memray(func): + """Decorator to profile a function call with memray.""" + return partial(execute_with_memray, func) + + def handle_operation_start_callbacks(callbacks, name): if callbacks is not None: event = OperationStartEvent(name) diff --git a/docs/images/memray-add.png b/docs/images/memray-add.png new file mode 100644 index 00000000..5749b4c9 Binary files /dev/null and b/docs/images/memray-add.png differ diff --git a/docs/user-guide/diagnostics.md b/docs/user-guide/diagnostics.md index 278a6f1a..22a8e82b 100644 --- a/docs/user-guide/diagnostics.md +++ b/docs/user-guide/diagnostics.md @@ -92,3 +92,41 @@ The timeline callback will write a graphic `timeline.svg` to a directory with th ### Examples in use See the [examples](/~https://github.com/cubed-dev/cubed/blob/main/examples/README.md) for more information about how to use them. + +## Memray + +[Memray](/~https://github.com/bloomberg/memray), a memory profiler for Python, can be used to track and view memory allocations when running a single task in a Cubed computation. + +This is not usually needed when using Cubed, but for developers writing new operations, improving projected memory sizes, or for debugging a memory issue, it can be very useful to understand how memory is actually allocated in Cubed. + +To enable Memray memory profiling in Cubed, simply install memray (`pip install memray`). Then use a local executor that runs tasks in separate processes, such as `processes` (Python 3.11 or later) or `lithops`. When you run a computation, Cubed will enable Memray for the first task in each operation (so if an array has 100 chunks it will only produce one Memray trace). + +Here is an example of a simple addition operation, with 200MB chunks. (It is adapted from [test_mem_utilization.py](/~https://github.com/cubed-dev/cubed/blob/main/cubed/tests/test_mem_utilization.py) in Cubed's test suite.) + +```python +import cubed.array_api as xp +import cubed.random + +a = cubed.random.random( + (10000, 10000), chunks=(5000, 5000), spec=spec +) # 200MB chunks +b = cubed.random.random( + (10000, 10000), chunks=(5000, 5000), spec=spec +) # 200MB chunks +c = xp.add(a, b) +c.compute(optimize_graph=False) +``` + +The optimizer is turned off so that generation of the random arrays is not fused with the add operation. This way we can see the memory allocations for that operation alone. + +After the computation is complete there will be a collection of `.bin` files in the `history/compute-{id}/memray` directory - with one for each operation. To view them we convert them to HTML flame graphs as follows: + +```shell +(cd $(ls -d history/compute-* | tail -1)/memray; for f in $(ls *.bin); do echo $f; python -m memray flamegraph --temporal -f -o $f.html $f; done) +``` + +Here is the flame graph for the add operation: + +![Memray temporal view of an 'add' operation](../images/memray-add.png) + +Annotations have been added to explain what is going on in this example. Note that reading a chunk from Zarr requires twice the chunk memory (400MB) since there is a buffer for the compressed Zarr block (200MB), as well as the resulting array (200MB). After the first chunk has been loaded the memory dips back to 200MB since the compressed buffer is no longer retained. diff --git a/setup.cfg b/setup.cfg index e59de308..0a761bc2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,6 +40,8 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-IPython.*] ignore_missing_imports = True +[mypy-memray.*] +ignore_missing_imports = True [mypy-modal.*] ignore_missing_imports = True [mypy-matplotlib.*]