-
-
Notifications
You must be signed in to change notification settings - Fork 3
Don't pass the massive xr.Dataset
between processes!
#22
Comments
After chatting with @jacobbieker .... it turns out I was wrong! Evidence: from multiprocessing.pool import Pool
from time import sleep
import numpy as np
# task executed in a worker process
def task(identifier: int):
# generate an 160 MByte array of random values:
rng = np.random.default_rng(seed=identifier)
arr = rng.random((1000, 1000, 20))
sleep_time_secs = arr[0, 0, 0] * 4
print(f'Task {identifier} sleeping for {sleep_time_secs:.3f} secs...', flush=True)
sleep(sleep_time_secs)
print(f'Task {identifier} DONE!', flush=True)
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# issue tasks to the process pool
pool.imap(task, range(50))
# shutdown the process pool
pool.close()
# wait for all issued task to complete
pool.join() Produces this output:
|
Two solutions spring to mind:
# Run the processes!
with multiprocessing.Pool() as pool:
for netcdf_filename in pool.imap(convert_grib_files_to_netcdf, tasks):
append_netcdf_to_zarr(netcdf_filename, destination_zarr_path) |
Good: Option 1 (from the comment above) sounds viable. The xarray docs suggest that we can write to the Zarr in arbitrary order and in parallel if we first create the relevant zarr metadata. Some relevant quotes from the xarray docs:
|
But, before making this change, I'll run some experiments with the code as is, to get a feel for whether this is even a problem! |
Converting two NWP init times (using Wholesale1 & Wholesale2) takes 54 seconds on my NUC, and very almost runs out of RAM. Downcasting the dataset to |
Not passing anything back to the main process (and hence not writing anything to disk) takes 32 seconds. |
I've done some experiments using Each In the main process, before launching the pool of workers:
If we have to create new metadata or update existing metadata then, in the main process:
When we actually write data to disk, we can use We can write actual chunks like this: # The drop_vars is necessary otherwise Zarr will try to
# overwrite variable, step, y, and x coord arrays.
dataset.drop_vars(['variable', 'step', 'y', 'x']).to_zarr(
"test_regions.zarr",
region={"init_time": slice(10, 20)}, # integer index slice.
) |
On second thoughts... This isn't a priority for me. Especially if I downsample the NWPs in the worker process before passing it to the Zarr-writing process. The next task I plan to work on is down sampling the NWPs, ready for the National PV forecasting experiments. |
Describe the bug
The code works at the moment. But it does something that's slow, CPU-intensive, and memory-intensive: it passes massive
xr.Datasets
between processes.The main process does this:
Which requires every
ds
to be pickled and copied from the worker process to the main process through a pipe (which is very slow - multiple seconds - for large objects).Experiment
This minimal example takes 6 seconds to run, and uses a trivially small amount of RAM and CPU. Each worker process creates a 160 MB array. But, crucially, each worker process doesn't pass that array back to the main process:
(this code is adapted from here)
If we just append the line
return arr
at the end of thetask
function (so each worker process pickles the array and attempts to send it to the main process) then the script runs for 30 seconds using max CPU, and then consumes all the RAM on my desktop before crashing!Expected behavior
I think the fix is simple: We just tell each worker process to save the dataset. I'm as sure as I can be thatimap
will still guarantee that the processes run in order, even if the processes take different amounts of time to complete.UPDATE: I was wrong!
imap
runs tasks in arbitrary order, so we can't save to zarr in arbitrary order.Additional context
The code used to use a "chain of locks"... but that proved unreliable and so the "chain of locks" were replaced with
imap
in commit 33330bf. Replacing the "chain of locks" withimap
was definitely the right thing to do (much simpler code; much more stable!) We just need to make sure we don't pass massive datasets between processes 🙂 .The text was updated successfully, but these errors were encountered: