I am encountering a warning from Dask where it takes a long time to start a computation (roughly around 5 minutes) because of my large task graphs. Here is the full warning:
UserWarning: Sending large graph of size 29.88 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also .html#load-data-with-dask for more information.
Here is my code for replication purposes. It uses an extension of xarray called xee to pull data from Google Earth Engine into an xarray data structure. It's chunked to what xee considers a "safe" chunk size (one that does not exceed the request limit of Earth Engine queries).
import ee
import json
import xarray as xr
from dask.distributed import performance_report
import dask
# Although we authenticated Google Earth Engine on our Dask workers, we also need to authenticate
# Google Earth Engine on our local machine!
with open(json_key, 'r') as file:
data = json.load(file)
credentials = ee.ServiceAccountCredentials(data["client_email"], json_key)
ee.Initialize(credentials = credentials, opt_url='')
WSDemo = ee.FeatureCollection("projects/robust-raster/assets/boundaries/WSDemoSHP_Albers")
California = ee.FeatureCollection("projects/robust-raster/assets/boundaries/California")
#ic = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2').filterDate('2014-01-01', '2014-12-31')
ic = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2').filterDate('2020-05-01', '2020-08-31')
xarray_data = xr.open_dataset(ic,
engine='ee',
crs="EPSG:3310",
scale=30,
geometry=WSDemo.geometry())
xarray_data = xarray_data.chunk({"time": 48, "X": 512, "Y": 256})
def compute_ndvi(df):
# Perform your calculations
df['ndvi'] = (df['SR_B5'] - df['SR_B4']) / (df['SR_B5'] + df['SR_B4'])
return df
def user_function_wrapper(ds, user_func, *args, **kwargs):
df_input = ds.to_dataframe().reset_index()
df_output = user_func(df_input, *args, **kwargs)
df_output = df_output.set_index(list(ds.dims))
ds_output = df_output.to_xarray()
return ds_output
test = xr.map_blocks(user_function_wrapper,
xarray_data,
args=(compute_ndvi,))
# Create a Dask report of the single chunked run
with performance_report(filename="dask_report.html"):
testpute()
You can disregard the code that converts chunks into a data frame format. I am working on a way for users to write their own functions in a format compatible with pandas data frames. Is 29.88 MiB really too large of a graph? I'll be looking into futures or delayed objects next, but I wanted to ask the question here to see if there is a clear mistake in my code that is causing this warning (and long startup) to arise.