最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Time Series Forecasting Model with XGBoost and Dask Large Datasets Crashing - Stack Overflow

programmeradmin1浏览0评论

I'm building a time series forecasting model in Python to predict hourly kWh loads for different customer types at a utility company. The dataset contains ~81 million rows, with hourly load data for ~2,300 customers over 2-4 years. Customer type is represented by binary columns: EV, HP, Solar, and TOU. The dataset has the following variables:

  - read_date: datetime64[us]
  - meter: string
  - kwh: float64
  - city: string
  - temperature: float64
  - ev: int64
  - solar: int64
  - hp: int64
  - tou: int64
  - hour: int32
  - day: int32
  - month: int32
  - year: Int64
  - day_of_week: int32
  - season: string
  - customer_type: string
  - hour_sin: float64
  - hour_cos: float64
  - month_sin: float64
  - month_cos: float64
  - day_of_week_sin: float64
  - day_of_week_cos: float64
  - day_sin: float64
  - day_cos: float64
  - is_holiday: int64
  - city_reading: int64
  - city_lynnfield: int64
  - city_northreading: int64
  - city_wilmington: int64
  - season_winter: int64
  - season_spring: int64
  - season_summer: int64
  - season_fall: int64

After cleaning the data, I dropped the following features from both the training and test datasets: meter, customer_type, season, read_date, city, day, month, hour, day_of_week. My target variable is hourly kWh load.

I attempted to build an XGBoost model using Dask for distribution, but it keeps crashing with errors like:

AssertionError: error
2025-03-31 14:12:26,995 - distributed.nanny - WARNING - Restarting worker

I’m working on a local machine with 128GB of RAM and an Intel i7-14700K 3.40 GHz processor. I'm looking for guidance on how to handle time series forecasting with this large dataset and how to avoid crashes while using Dask for distribution. Here is my sample code:

# Import necessary libraries
import numpy as np
import dask.dataframe as dd
import dask.array as da
import xgboost as xgb
from dask.distributed import Client
from dask.diagnostics import ProgressBar 
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import warnings
import matplotlib.pyplot as plt
from tqdm import tqdm

# Load the data using Dask (efficient for large Parquet files)
some_feats_dd = dd.read_parquet("pre_ml_some_features.parquet")

# Rename DataFrame
df_processed = some_feats_dd

# Filter the data based on the read_date for training and testing
df_train = df_processed[df_processed["year"] < 2025]  # Keep rows before 2025
df_test = df_processed[df_processed["year"] == 2025]  # Keep rows from 2025 onwards

# Exclude columns and prepare features and target variables for training
exclude_cols = ["kwh", "meter", "customer_type", "season", "read_date", "city", 
                "day", "month", "hour", "day_of_week"]

# Prepare training features (X) and target variable (y)
X_train = df_train.drop(columns=exclude_cols)
y_train = df_train["kwh"]

# Compute total lengths and ensure exact 3 chunks
train_size = len(y_trainpute())
test_size = len(df_test)  # No need to compute, Dask can infer

# Convert y_train and y_test to Dask arrays with forced 3 chunks
y_train = da.from_array(y_trainpute(), chunks=(train_size // 3,))
y_test = da.from_array(df_test["kwh"]pute(), chunks=(test_size // 2,))

# Ensure partitions match for X_train and X_test
X_train = X_train.repartition(npartitions=3)
X_test = X_test.repartition(npartitions=3)

# Start Dask client for parallel processing
client = Client()

# Print the Dask dashboard URL
print(f"Dask dashboard is available at: {client.dashboard_link}")

# Use DaskDMatrix from xgboost.dask
dask_train_data = xgb.dask.DaskDMatrix(client, X_train, y_train)

# Set up parameters for XGBoost
params = {
    'objective': 'reg:squarederror',  # Regression task
    'eval_metric': 'rmse',
    'tree_method': 'hist',  # Use histogram-based method for faster training
    'verbosity': 1,  # Enables basic logging
}

# Initialize Dask-XGBoost model
dask_gbr = xgb.dask.DaskXGBRegressor(**params)

# Train the model using Dask (this will automatically parallelize)
with ProgressBar():  # Shows progress during training
    dask_gbr.fit(dask_train_data)

I'm building a time series forecasting model in Python to predict hourly kWh loads for different customer types at a utility company. The dataset contains ~81 million rows, with hourly load data for ~2,300 customers over 2-4 years. Customer type is represented by binary columns: EV, HP, Solar, and TOU. The dataset has the following variables:

  - read_date: datetime64[us]
  - meter: string
  - kwh: float64
  - city: string
  - temperature: float64
  - ev: int64
  - solar: int64
  - hp: int64
  - tou: int64
  - hour: int32
  - day: int32
  - month: int32
  - year: Int64
  - day_of_week: int32
  - season: string
  - customer_type: string
  - hour_sin: float64
  - hour_cos: float64
  - month_sin: float64
  - month_cos: float64
  - day_of_week_sin: float64
  - day_of_week_cos: float64
  - day_sin: float64
  - day_cos: float64
  - is_holiday: int64
  - city_reading: int64
  - city_lynnfield: int64
  - city_northreading: int64
  - city_wilmington: int64
  - season_winter: int64
  - season_spring: int64
  - season_summer: int64
  - season_fall: int64

After cleaning the data, I dropped the following features from both the training and test datasets: meter, customer_type, season, read_date, city, day, month, hour, day_of_week. My target variable is hourly kWh load.

I attempted to build an XGBoost model using Dask for distribution, but it keeps crashing with errors like:

AssertionError: error
2025-03-31 14:12:26,995 - distributed.nanny - WARNING - Restarting worker

I’m working on a local machine with 128GB of RAM and an Intel i7-14700K 3.40 GHz processor. I'm looking for guidance on how to handle time series forecasting with this large dataset and how to avoid crashes while using Dask for distribution. Here is my sample code:

# Import necessary libraries
import numpy as np
import dask.dataframe as dd
import dask.array as da
import xgboost as xgb
from dask.distributed import Client
from dask.diagnostics import ProgressBar 
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import warnings
import matplotlib.pyplot as plt
from tqdm import tqdm

# Load the data using Dask (efficient for large Parquet files)
some_feats_dd = dd.read_parquet("pre_ml_some_features.parquet")

# Rename DataFrame
df_processed = some_feats_dd

# Filter the data based on the read_date for training and testing
df_train = df_processed[df_processed["year"] < 2025]  # Keep rows before 2025
df_test = df_processed[df_processed["year"] == 2025]  # Keep rows from 2025 onwards

# Exclude columns and prepare features and target variables for training
exclude_cols = ["kwh", "meter", "customer_type", "season", "read_date", "city", 
                "day", "month", "hour", "day_of_week"]

# Prepare training features (X) and target variable (y)
X_train = df_train.drop(columns=exclude_cols)
y_train = df_train["kwh"]

# Compute total lengths and ensure exact 3 chunks
train_size = len(y_trainpute())
test_size = len(df_test)  # No need to compute, Dask can infer

# Convert y_train and y_test to Dask arrays with forced 3 chunks
y_train = da.from_array(y_trainpute(), chunks=(train_size // 3,))
y_test = da.from_array(df_test["kwh"]pute(), chunks=(test_size // 2,))

# Ensure partitions match for X_train and X_test
X_train = X_train.repartition(npartitions=3)
X_test = X_test.repartition(npartitions=3)

# Start Dask client for parallel processing
client = Client()

# Print the Dask dashboard URL
print(f"Dask dashboard is available at: {client.dashboard_link}")

# Use DaskDMatrix from xgboost.dask
dask_train_data = xgb.dask.DaskDMatrix(client, X_train, y_train)

# Set up parameters for XGBoost
params = {
    'objective': 'reg:squarederror',  # Regression task
    'eval_metric': 'rmse',
    'tree_method': 'hist',  # Use histogram-based method for faster training
    'verbosity': 1,  # Enables basic logging
}

# Initialize Dask-XGBoost model
dask_gbr = xgb.dask.DaskXGBRegressor(**params)

# Train the model using Dask (this will automatically parallelize)
with ProgressBar():  # Shows progress during training
    dask_gbr.fit(dask_train_data)
Share Improve this question edited Mar 31 at 20:19 desertnaut 60.5k32 gold badges155 silver badges181 bronze badges asked Mar 31 at 18:33 JaredJared 115 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Didn't tried your code obviously with no access to the dataset, but here are some advice and a proposition of code update:

  • you should first read only the columns you need, even if with dask-expr it might be optimized by Dask.

  • I'm not sure why you want excalty 3 chunks, but with no reason this is a bad idea, that will cause very big sized chunks probably. Maybe why your Worker are failing (stack trace is not detailed)

  • You don't need to compute to now the len, actually you usually never want to compute.

  • y_train = da.from_array(y_trainpute(), chunks=(train_size // 3,)) --> This is a really bad operation, loading all y_train data into memory of the Client, and then resending it to the Workers.

So you might want to test a code that looks like this:

# Import necessary libraries
import numpy as np
import dask.dataframe as dd
import dask.array as da
import xgboost as xgb
from dask.distributed import Client
from dask.diagnostics import ProgressBar 
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import warnings
import matplotlib.pyplot as plt
from tqdm import tqdm

# Include columns 
inlude_cols = ["...", "..."]

# Load the data using Dask (efficient for large Parquet files)
df_processed = dd.read_parquet("pre_ml_some_features.parquet", columns=inlude_cols)


# Filter the data based on the read_date for training and testing
df_train = df_processed[df_processed["year"] < 2025]  # Keep rows before 2025
df_test = df_processed[df_processed["year"] == 2025]  # Keep rows from 2025 onwards


# Prepare training features (X) and target variable (y)
X_train = df_train
y_train = df_train["kwh"]

#X_test was not defined in your snippet
X_test = df_test 
y_test = df_test ["kwh"]

# If you need arrays, you might want to use df.to_dask_arrays()

# Start Dask client for parallel processing
client = Client()

# Print the Dask dashboard URL
print(f"Dask dashboard is available at: {client.dashboard_link}")

# Use DaskDMatrix from xgboost.dask
dask_train_data = xgb.dask.DaskDMatrix(client, X_train, y_train)

# Set up parameters for XGBoost
params = {
    'objective': 'reg:squarederror',  # Regression task
    'eval_metric': 'rmse',
    'tree_method': 'hist',  # Use histogram-based method for faster training
    'verbosity': 1,  # Enables basic logging
}

# Initialize Dask-XGBoost model
dask_gbr = xgb.dask.DaskXGBRegressor(**params)

# Train the model using Dask (this will automatically parallelize)
with ProgressBar():  # Shows progress during training
    dask_gbr.fit(dask_train_data)
发布评论

评论列表(0)

  1. 暂无评论