I have built a hybrid model that combines a Vector Autoregressive (VAR) model and a Long Short-Term Memory (LSTM) network. The VAR model is used to capture linear dependencies between macroeconomic indicators, while the LSTM model is trained on the residuals of the VAR model to capture non-linear patterns.
Steps in My Hybrid Model:
VAR Model:
Fit a VAR model on differenced time series data to determine the optimal lag order. Obtain fitted values and compute residuals. Evaluate the performance of the VAR model using Mean Absolute Error (MAE). Forecast future values using the fitted VAR model.
LSTM Model:
Use the residuals from the VAR model as the input series for LSTM. Normalize the residuals and split the dataset into training and test sets. Train an LSTM model using hyperparameter tuning (grid search) with early stopping. Forecast residuals using the trained LSTM model. Add the forecasted residuals to the VAR forecast to get the final prediction.
Now I am struggling with implementing out-of-sample forecasting for this hybrid model
My code :
A. VECTOR AUTOREGRESSIVE
Lag Order Optimum
Find the optimal lag sequence for var
for i in [1,2,3,4,5,6]:
model = VAR(data_diff)
results = model.fit(i)
print('Order =', i)
print('AIC: ', results.aic)
print()
x = model.select_order(maxlags=4)
x.summary()
Model
optimal_lag = x.aic
print(f"Optimal Lag Order: {optimal_lag}")
Fit the VAR model
fitted_model = model.fit(optimal_lag)
fitted_model.summary()
fitted_values = fitted_model.fittedvalues
fitted_values_df = fitted_values.cumsum().add(data.iloc[optimal_lag-1][['CCPI_index', 'Brent Crude', 'Exchange Rates', 'MS']])
residuals = data[optimal_lag:] - fitted_values_df
Evaluation & Visualization
valid_data = pd.DataFrame({
'CCPI Actual': data['CCPI_index'][optimal_lag:],
'CCPI Fitted': fitted_values_df['CCPI_index'],
'CCPI Residual': residuals['CCPI_index'],
'ER Actual': data['Exchange Rates'][optimal_lag:],
'ER Fitted': fitted_values_df['Exchange Rates'],
'ER Residual': residuals['Exchange Rates'],
'Brent Actual': data['Brent Crude'][optimal_lag:],
'Brent Fitted': fitted_values_df['Brent Crude'],
'Brent Residual': residuals['Brent Crude'],
'MS Actual': data['MS'][optimal_lag:],
'MS Fitted': fitted_values_df['MS'],
'MS Residual': residuals['MS']
}).dropna()
mae_ccpi_train = mean_absolute_error(valid_data['CCPI Actual'], valid_data['CCPI Fitted'])
mae_er_train = mean_absolute_error(valid_data['ER Actual'], valid_data['ER Fitted'])
mae_brent_train = mean_absolute_error(valid_data['Brent Actual'], valid_data['Brent Fitted'])
mae_ms_train = mean_absolute_error(valid_data['MS Actual'], valid_data['MS Fitted'])
Forecasting
Forecasting (linear)
forecast_steps = 12
forecast = fitted_model.forecast(data_diff.values[-fitted_model.k_ar:], steps=forecast_steps)
forecast_df = pd.DataFrame(forecast, columns=['CCPI Forecast', 'ER Forecast', 'Brent Forecast', 'MS Forecast'])
Convert differenced forecast back to original scale
last_values = data.iloc[-1]
forecast_df['CCPI Forecast'] = forecast_df['CCPI Forecast'].cumsum() + last_values['CCPI_index']
forecast_df['ER Forecast'] = forecast_df['ER Forecast'].cumsum() + last_values['Exchange Rates']
forecast_df['Brent Forecast'] = forecast_df['Brent Forecast'].cumsum() + last_values['Brent Crude']
forecast_df['MS Forecast'] = forecast_df['MS Forecast'].cumsum() + last_values['MS']
forecast_df.index = df_update.index
df_forecast = pd.merge(df_update, forecast_df, left_index=True, right_index=True)
df_forecast
y_true = df_forecast['CCPI_index']
y_pred = df_forecast['CCPI Forecast']
mae = mean_absolute_error(y_true, y_pred)
print(f"Mean Absolute Error (MAE) for VAR CCPI: {mae}")
y_true = df_forecast['Exchange Rates']
y_pred = df_forecast['ER Forecast']
mae = mean_absolute_error(y_true, y_pred)
print(f"Mean Absolute Error (MAE) for VAR ER: {mae}")
y_true = df_forecast['Brent Crude']
y_pred = df_forecast['Brent Forecast']
mae = mean_absolute_error(y_true, y_pred)
print(f"Mean Absolute Error (MAE) for VAR Brent: {mae}")
y_true = df_forecast['MS']
y_pred = df_forecast['MS Forecast']
mae = mean_absolute_error(y_true, y_pred)
print(f"Mean Absolute Error (MAE) for VAR MS: {mae}")
B. LONG SHORT TERM MEMORY
Splitting & Scaling
Select only the 'CCPI Residual' column for input and output
df_residual = valid_data['CCPI Residual']
data2 = df_residual.copy()
data2
seed_value = 42
np.random.seed(seed_value)
tf.random.set_seed(seed_value)
scaler2 = MinMaxScaler(feature_range=(0, 1))
scaled_data2 = scaler2.fit_transform(data2.values.reshape(-1, 1))
def create_dataset(data, time_step=1):
dataX, dataY = [], []
for i in range(len(data)-time_step-1):
a = data[i:(i+time_step), 0] # Use only the first column
dataX.append(a)
dataY.append(data[i + time_step, 0]) # Use only the first column
return np.array(dataX), np.array(dataY)
Set the time step
time_step = 12
X2, y2 = create_dataset(scaled_data2, time_step)
X2 = X2.reshape(X2.shape[0], X2.shape[1], 1)
train_size2 = int(len(X2) * 0.8)
test_size2 = len(X2) - train_size2
X2_train, X2_test = X2[:train_size2], X2[train_size2:]
y2_train, y2_test = y2[:train_size2], y2[train_size2:]
time_step = 12
X2, y2 = create_dataset(scaled_data2, time_step)
X2 = X2.reshape(X2.shape[0], X2.shape[1], 1)
train_size2 = int(len(X2) * 0.8)
test_size2 = len(X2) - train_size2
X2_train, X2_test = X2[:train_size2], X2[train_size2:]
y2_train, y2_test = y2[:train_size2], y2[train_size2:]
Function to create model for LSTM
def create_model_lstm2(units1, units2, dropout_rate, learning_rate):
model = Sequential()
model.add(LSTM(units=units1, return_sequences=True, input_shape=(time_step, 1)))
model.add(Dropout(rate=dropout_rate))
model.add(LSTM(units=units2))
model.add(Dropout(rate=dropout_rate))
model.add(Dense(1, activation='linear')) # Single output
modelpile(optimizer=Adam(learning_rate=learning_rate), loss='mean_squared_error')
return model
Define the hyperparameter grid for LSTM
param_grid_lstm2 = { 'units1': [50, 100], 'units2': [50, 100], 'dropout_rate': [0.2, 0.3], 'learning_rate': [0.01, 0.001], 'batch_size': [32], 'epochs': [50, 100] }
Convert param_grid to a list of dictionaries
param_list_lstm2 = list(product(param_grid_lstm2['units1'],
param_grid_lstm2['units2'],
param_grid_lstm2['dropout_rate'],
param_grid_lstm2['learning_rate'],
param_grid_lstm2['batch_size'],
param_grid_lstm2['epochs']))
Initialize variables to track the best model and the best score for LSTM2
best_score_lstm2 = float('inf')
best_params_lstm2 = None
best_model_lstm2 = None
Perform manual grid search for LSTM
for params in param_list_lstm2:
units1, units2, dropout_rate, learning_rate, batch_size, epochs = params
print(f"Training LSTM model with parameters: units1={units1}, units2={units2}, dropout_rate={dropout_rate}, learning_rate={learning_rate}, batch_size={batch_size}, epochs={epochs}")
model_lstm2 = create_model_lstm2(units1, units2, dropout_rate, learning_rate)
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
history_lstm2 = model_lstm2.fit(X2_train, y2_train, batch_size=batch_size, epochs=epochs, validation_data=(X2_test, y2_test), callbacks=[early_stopping], verbose=0)
val_loss = min(history_lstm2.history['val_loss'])
if val_loss < best_score_lstm2:
best_score_lstm2 = val_loss
best_params_lstm2 = {
'units1': units1,
'units2': units2,
'dropout_rate': dropout_rate,
'learning_rate': learning_rate,
'batch_size': batch_size,
'epochs': epochs
}
best_model_lstm2 = model_lstm2
print(f"Best LSTM score: {best_score_lstm2} with parameters: {best_params_lstm2}")
manual_units1_lstm2 = best_params_lstm2['units1']
manual_units2_lstm2 = best_params_lstm2['units2']
manual_dropout_rate_lstm2 = best_params_lstm2['dropout_rate']
manual_learning_rate_lstm2 = best_params_lstm2['learning_rate']
manual_batch_size_lstm2 = best_params_lstm2['batch_size']
manual_epochs_lstm2 = best_params_lstm2['epochs']
model_manual_lstm2 = create_model_lstm2(manual_units1_lstm2, manual_units2_lstm2, manual_dropout_rate_lstm2, manual_learning_rate_lstm2)
early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)
history_manual_lstm2 = model_manual_lstm2.fit(X2_train, y2_train,
batch_size=manual_batch_size_lstm2,
epochs=manual_epochs_lstm2,
validation_data=(X2_test, y2_test),
callbacks=[early_stopping],
verbose=1)
model_manual_lstm2.summary()
train_predict_lstm2 = model_manual_lstm2.predict(X2_train)
test_predict_lstm2 = model_manual_lstm2.predict(X2_test)
Inverse transform the predictions for LSTM
train_predict_lstm2 = scaler2.inverse_transform(train_predict_lstm2)
test_predict_lstm2 = scaler2.inverse_transform(test_predict_lstm2)
y2_train = scaler2.inverse_transform(y2_train.reshape(-1, 1))
y2_test = scaler2.inverse_transform(y2_test.reshape(-1, 1))
Evaluation & Visualization
train_mae_ccpi_lstm2 = mean_absolute_error(y2_train, train_predict_lstm2)
test_mae_ccpi_lstm2 = mean_absolute_error(y2_test, test_predict_lstm2)
print(f'Training MAE CCPI LSTM: {train_mae_ccpi_lstm2:.4f}')
print(f'Testing MAE CCPI LSTM: {test_mae_ccpi_lstm2:.4f}')
Forecasting
Forecasting for the next steps
predictions_lstm2 = []
Prepare the input data for forecasting
input_data_lstm2 = scaled_data2[-time_step:].reshape(1, time_step, 1)
Loop to predict the next steps
for _ in range(12):
pred = model_manual_lstm2.predict(input_data_lstm2)
predictions_lstm2.append(pred[0])
input_data_lstm2 = np.append(input_data_lstm2[:, 1:, :], [[pred[0]]], axis=1)
Inverse transform the predictions to original scale
predictions_lstm2 = scaler2.inverse_transform(predictions_lstm2)
df_future_predictions_lstm2 = pd.DataFrame(predictions_lstm2, columns=['Predicted Residual ccpi'])
df_future_predictions_lstm2.index = df_update.index
C. HYBRID VAR-LSTM
Model
df_linear = df_forecast[['CCPI Forecast']]
df_linear
df_nonlinear = df_future_predictions_lstm2.copy()
df_nonlinear
df_hybrid = pd.DataFrame({
'Hybrid CCPI': df_linear['CCPI Forecast'] + df_nonlinear['Predicted Residual ccpi'],
})
df_hybrid
df_hybrid_final = pd.merge(df_update['CCPI_index'], df_hybrid, left_index=True, right_index=True)
Evaluation & Visualization
y_true = df_hybrid_final['CCPI_index']
y_pred = df_hybrid_final['Hybrid CCPI']
Calculate MAE for 'CCPI' predictions
mae = mean_absolute_error(y_true, y_pred)
print(f"Mean Absolute Error (MAE) for Hybrid CCPI: {mae}")
This is what I did, but I don’t think it’s accurate because the forecasted numbers are out of the expected range
future_predictions_lstm = []
input_data_lstm = scaled_data2[-time_step:].reshape(1, time_step, 1)
for _ in range(forecast_steps):
pred = model_manual_lstm2.predict(input_data_lstm)
future_predictions_lstm.append(pred[0])
input_data_lstm = np.append(input_data_lstm[:, 1:, :], [[pred[0]]], axis=1)
future_predictions_lstm = scaler2.inverse_transform(future_predictions_lstm)
df_future_lstm = pd.DataFrame(future_predictions_lstm, columns=['Predicted Residual CCPI'])
df_future_lstm.index = pd.date_range(start=data.index[-1] + pd.DateOffset(months=1), periods=forecast_steps, freq='M')
df_hybrid_forecast = pd.DataFrame({
'Hybrid CCPI': var_forecast_df['CCPI Forecast'] + df_future_lstm['Predicted Residual CCPI'],
})
df_hybrid_forecast.index = var_forecast_df.index