最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Sending large Dask graphs causing major slowdown before computation - Stack Overflow

programmeradmin4浏览0评论

I am encountering a warning from Dask where it takes a long time to start a computation (roughly around 5 minutes) because of my large task graphs. Here is the full warning:

UserWarning: Sending large graph of size 29.88 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also .html#load-data-with-dask for more information.

Here is my code for replication purposes. It uses an extension of xarray called xee to pull data from Google Earth Engine into an xarray data structure. It's chunked to what xee considers a "safe" chunk size (one that does not exceed the request limit of Earth Engine queries).

import ee
import json
import xarray as xr
from dask.distributed import performance_report
import dask

# Although we authenticated Google Earth Engine on our Dask workers, we also need to authenticate
# Google Earth Engine on our local machine!
with open(json_key, 'r') as file:
    data = json.load(file)
credentials = ee.ServiceAccountCredentials(data["client_email"], json_key)
ee.Initialize(credentials = credentials, opt_url='')


WSDemo = ee.FeatureCollection("projects/robust-raster/assets/boundaries/WSDemoSHP_Albers")
California = ee.FeatureCollection("projects/robust-raster/assets/boundaries/California")

#ic = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2').filterDate('2014-01-01', '2014-12-31')
ic = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2').filterDate('2020-05-01', '2020-08-31')

xarray_data = xr.open_dataset(ic,
            engine='ee', 
            crs="EPSG:3310", 
            scale=30,
            geometry=WSDemo.geometry())

xarray_data = xarray_data.chunk({"time": 48, "X": 512, "Y": 256})

def compute_ndvi(df):
    # Perform your calculations
    df['ndvi'] = (df['SR_B5'] - df['SR_B4']) / (df['SR_B5'] + df['SR_B4'])
    return df


def user_function_wrapper(ds, user_func, *args, **kwargs):
    df_input = ds.to_dataframe().reset_index()
    df_output = user_func(df_input, *args, **kwargs)
    df_output = df_output.set_index(list(ds.dims))
    ds_output = df_output.to_xarray()
    return ds_output


test = xr.map_blocks(user_function_wrapper, 
                    xarray_data, 
                    args=(compute_ndvi,))

# Create a Dask report of the single chunked run
with performance_report(filename="dask_report.html"):
    testpute()

You can disregard the code that converts chunks into a data frame format. I am working on a way for users to write their own functions in a format compatible with pandas data frames. Is 29.88 MiB really too large of a graph? I'll be looking into futures or delayed objects next, but I wanted to ask the question here to see if there is a clear mistake in my code that is causing this warning (and long startup) to arise.

发布评论

评论列表(0)

  1. 暂无评论