I want to apply a function to each 2D slice of a 4D Numpy array using Dask. The output should be a 2D matrix (the function applied to each 2D slice returns a single value). I would like to do this in parallel. Problem: I'm not sure I understand correctly how Dask implements the parallel calculation. Currently, I am always running out of RAM. My naive understanding is that the entire input array and all currently processed chunks must fit in RAM (+ some extra space for all other applications to work). But it seems that each chunk gets a copy of the input array?
Here's some example code (data
will be roughly ~ 20 GiB big). I know there are more efficient ways to compute the sum over the last two dimensions. This is just an example function to illustrate the problem (aka. the question is not about computing the sum over the last two dimensions of a 4D array).
import numpy as np
import dask.array as da
# set seed
np.random.RandomState(42)
# Create example data array
array_shape = (1000,300,50,200)
data = np.random.random(array_shape) # Create a large 4D NumPy array
# how big is whole array and how big is each chunk?
array_gib = data.nbytes / (1024 ** 3)
chunk_gib = data[0,0,:,:].nbytes / (1024 ** 3)
print(f"Memory occupied by array: {array_gib} GiB, Memory occupied by chunk: {chunk_gib} GiB")
# Define an example function that operates on a 2D slice and returns a single value
def sum_of_2d_slice(chunk):
return chunk.sum(axis=None)[None,None]
# Define dask array with chunks. We want to iterate over the first two dimensions
# so each chunk is a 2D matrix
data = da.from_array(data, chunks=(1,1,data.shape[2],data.shape[3]))
# Map function to each chunk
result = data.map_blocks(sum_of_2d_slice,drop_axis=[2,3])
# Compute the final result
final_result = resultpute(num_workers=5,processes=True,memory_limit='1GB')
# Print the result
print(final_result)