I want to make custom Dask task graphs, which consist of operations over Dask DataFrames.
Here, I see two interfaces with the one constructing a Delayed object from a dictionary being the preferred one, as I hope of being able to request computation of multiple keys which is executed in an optimal way by Dask, e.g. compute common dependencies only once.
After calling pute() of the Delayed object, the result still has lazy instead of materialised DataFrame results. The results can be materialised with a second pute() but I wonder if there is a way to avoid this and get all requested keys computed in the first step?
import dask.dataframe as dd
import pandas as pd
from dask.delayed import Delayed
from functools import partial
person = pd.DataFrame({
'name': ['John', 'Jane'],
'age': [30, 25]
})
task_graph = {
'df': (
partial(dd.from_pandas, npartitions=1),
person
),
'person_old': (
lambda x: x.assign(age=x.age*3),
'df'
),
}
request = Delayed(['df','person_old'], task_graph)
result = requestpute()
display(result[0])
display(result[0]pute())