Python Code
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.linear_model import LinearRegression
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import matplotlib.pyplot as plt
import seaborn as sns
import traceback
import warnings
import os
import time
from scipy import optimize
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.stats.diagnostic import acorr_ljungbox
from docx import Document
from docx.shared import Inches
# Suppress warnings
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', message='.*Do not pass an `input_shape`.*')
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Suppress TensorFlow messages
# Set seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
# Configure font support for Chinese (if needed) and ensure minus sign works
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# Directory to save results
RESULT_DIR = r'C:\Users\19016\Desktop\北京大学\03金融计量\report\result'
os.makedirs(RESULT_DIR, exist_ok=True)
# Garman-Klass volatility estimator
def calculate_garman_klass_volatility(data):
"""
Compute the Garman-Klass volatility estimate.
Formula: GK = ln(H/C) * ln(H/O) + ln(L/C) * ln(L/O)
where H=High, L=Low, O=Open, C=Close.
Annualized assuming 252 trading days.
"""
required_cols = ['Open', 'High', 'Low', 'Close']
for col in required_cols:
if col not in data.columns:
raise ValueError(f"Missing required column: {col}")
ln_hc = np.log(data['High'] / data['Close'])
ln_ho = np.log(data['High'] / data['Open'])
ln_lc = np.log(data['Low'] / data['Close'])
ln_lo = np.log(data['Low'] / data['Open'])
gk_var = ln_hc * ln_ho + ln_lc * ln_lo
gk_vol = np.sqrt(gk_var * 252)
return gk_vol
def calculate_alternative_daily_rv(data):
"""
Alternative daily realized volatility proxy using close prices.
"""
returns = np.log(data['Close'] / data['Close'].shift(1))
daily_vol = np.abs(returns) * np.sqrt(252)
return daily_vol
# Improved HAR model
class HARModel:
"""
Heterogeneous Autoregressive (HAR) model for realized volatility prediction.
"""
def __init__(self):
self.model = None
self.coefficients = None
self.fitted_values = None
self.residuals = None
self.feature_names = None
def prepare_har_features(self, rv_series, add_lags=True):
"""
Prepare HAR features: daily, weekly, monthly, and optional extra lags.
"""
data = pd.DataFrame(index=rv_series.index)
data['RV'] = rv_series
data['RV_daily'] = data['RV'].shift(1)
data['RV_weekly'] = data['RV'].rolling(window=5).mean().shift(1)
data['RV_monthly'] = data['RV'].rolling(window=22).mean().shift(1)
if add_lags:
data['RV_lag2'] = data['RV'].shift(2)
data['RV_lag3'] = data['RV'].shift(3)
data = data.dropna()
return data
def fit(self, rv_series, add_lags=True):
"""
Fit the HAR model using linear regression.
"""
print("Starting HAR model fitting...")
har_data = self.prepare_har_features(rv_series, add_lags)
if len(har_data) < 50:
raise ValueError(f"Not enough data to train HAR model: only {len(har_data)} samples (need >= 50).")
feature_cols = ['RV_daily', 'RV_weekly', 'RV_monthly']
if add_lags:
feature_cols += ['RV_lag2', 'RV_lag3']
X = har_data[feature_cols]
y = har_data['RV']
correlation_matrix = X.corr()
print("Feature correlation matrix:")
print(correlation_matrix)
self.model = LinearRegression()
self.model.fit(X, y)
self.coefficients = pd.Series(self.model.coef_, index=feature_cols)
self.coefficients['intercept'] = self.model.intercept_
self.feature_names = feature_cols
self.fitted_values = self.model.predict(X)
self.residuals = y - self.fitted_values
r2_score = self.model.score(X, y)
print("HAR model fitting complete!")
print(f"R² = {r2_score:.4f}")
print("Coefficients:")
for name, coef in self.coefficients.items():
print(f" {name}: {coef:.6f}")
return self
def predict(self, rv_series, steps=1):
"""
Generate HAR forecasts using the most recent data.
"""
if self.model is None:
raise ValueError("Model is not yet fitted. Call fit() first.")
har_data = self.prepare_har_features(rv_series, add_lags=len(self.feature_names) > 3)
if len(har_data) == 0:
raise ValueError("Not enough data to predict.")
X_pred = har_data[self.feature_names].tail(steps)
predictions = self.model.predict(X_pred)
return predictions
# 1. Load and preprocess data
def load_and_preprocess_data(spy_file, vix_file, treasury_file):
print("\n[1. Loading data]")
print("="*50)
print("Loading SPY data...")
spy = pd.read_csv(spy_file, parse_dates=['Date'], index_col='Date')
print(f"SPY data loaded: {spy.shape[0]} rows")
print("Loading VIX data...")
vix = pd.read_csv(vix_file)
print(f"Raw VIX data: {vix.shape[0]} rows")
print("Processing VIX date format...")
vix['DATE'] = pd.to_datetime(vix['DATE'], format='%m/%d/%Y')
vix.set_index('DATE', inplace=True)
print("Loading Treasury data...")
treasury = pd.read_csv(treasury_file, parse_dates=['observation_date'], index_col='observation_date')
print(f"Treasury data loaded: {treasury.shape[0]} rows")
print(f"Raw data sizes: SPY={spy.shape}, VIX={vix.shape}, Treasury={treasury.shape}")
print(f"SPY missing values: {spy.isnull().sum().sum()}")
print(f"VIX missing values: {vix.isnull().sum().sum()}")
print(f"Treasury missing values: {treasury.isnull().sum().sum()}")
print("Renaming columns for consistency...")
spy_columns = spy.columns.tolist()
print(f"SPY original columns: {spy_columns}")
if 'Close/Last' in spy.columns:
spy = spy.rename(columns={'Close/Last': 'Close'})
price_columns = ['Open', 'High', 'Low', 'Close']
for col in price_columns:
if col in spy.columns and spy[col].dtype == 'object':
spy[col] = spy[col].astype(str).str.replace('$', '', regex=False)
spy[col] = pd.to_numeric(spy[col], errors='coerce')
vix = vix.rename(columns={'CLOSE': 'VIX_Close'})
print("Sorting data by date...")
if spy.index[0] > spy.index[-1]:
print("SPY data needs sorting (newest to oldest)")
spy = spy.sort_index()
if vix.index[0] > vix.index[-1]:
print("VIX data needs sorting")
vix = vix.sort_index()
if treasury.index[0] > treasury.index[-1]:
print("Treasury data needs sorting")
treasury = treasury.sort_index()
print("Date ranges after sorting:")
print(f"SPY: {spy.index.min()} to {spy.index.max()}")
print(f"VIX: {vix.index.min()} to {vix.index.max()}")
print(f"Treasury: {treasury.index.min()} to {treasury.index.max()}")
print("Handling missing values in Treasury data...")
treasury = treasury.fillna(method='ffill')
print("Merging datasets on trading days...")
spy_data = spy[['Open', 'High', 'Low', 'Close']]
data = spy_data.join([vix['VIX_Close'], treasury['DGS10']], how='inner')
print(f"Merged data size: {data.shape}")
print(f"Merged date range: {data.index.min()} to {data.index.max()}")
print(f"Merged missing values: \n{data.isnull().sum()}")
print(f"Columns after merge: {data.columns.tolist()}")
print("Forward-filling and dropping any remaining missing values...")
data = data.fillna(method='ffill').dropna()
print(f"Cleaned data size: {data.shape}")
print("Calculating log returns...")
data['Returns'] = np.log(data['Close'] / data['Close'].shift(1))
print("Calculating daily RV proxy using Garman-Klass...")
try:
data['Daily_RV_Proxy'] = calculate_garman_klass_volatility(data)
print("✅ Garman-Klass volatility calculated successfully")
except Exception as e:
print(f"⚠️ Garman-Klass calculation failed: {e}")
print("Using alternative daily RV proxy...")
data['Daily_RV_Proxy'] = calculate_alternative_daily_rv(data)
print("Calculating traditional 20-day realized volatility...")
data['RV_20d'] = np.sqrt((data['Returns']**2).rolling(window=20).sum() * 252 / 20)
print("Calculating rolling standard deviation volatility...")
data['StdVol'] = data['Returns'].rolling(window=20).std() * np.sqrt(252)
print("Calculating HAR features (using daily RV proxy)...")
data['RV_Daily'] = data['Daily_RV_Proxy'].shift(1)
data['RV_Weekly'] = data['Daily_RV_Proxy'].rolling(window=5).mean().shift(1)
data['RV_Monthly'] = data['Daily_RV_Proxy'].rolling(window=22).mean().shift(1)
print("Creating lag features for returns, VIX, treasury yields, and volatility...")
for lag in range(1, 6):
data[f'Returns_Lag{lag}'] = data['Returns'].shift(lag)
data[f'VIX_Lag{lag}'] = data['VIX_Close'].shift(lag)
data[f'DGS10_Lag{lag}'] = data['DGS10'].shift(lag)
data[f'StdVol_Lag{lag}'] = data['StdVol'].shift(lag)
data['VIX_Change'] = data['VIX_Close'] - data['VIX_Close'].shift(1)
before_drop = data.shape[0]
data = data.dropna()
print(f"Dropped missing values: from {before_drop} rows down to {data.shape[0]} rows")
print(f"Final data size: {data.shape}")
print(f"Date range: {data.index.min()} to {data.index.max()}")
print(f"Feature columns: {data.columns.tolist()}")
return data
# 2. Split dataset
def split_data(data, train_ratio=0.7, val_ratio=0.15):
n = len(data)
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
train_data = data.iloc[:train_end]
val_data = data.iloc[train_end:val_end]
test_data = data.iloc[val_end:]
return train_data, val_data, test_data
# 3. Prepare ML features (Random Forest)
def prepare_ml_features(data):
features = [
'Returns_Lag1', 'Returns_Lag2', 'Returns_Lag3', 'Returns_Lag4', 'Returns_Lag5',
'VIX_Lag1', 'VIX_Lag2', 'VIX_Lag3', 'VIX_Lag4', 'VIX_Lag5',
'DGS10_Lag1', 'DGS10_Lag2', 'DGS10_Lag3', 'DGS10_Lag4', 'DGS10_Lag5',
'VIX_Change'
]
X = data[features]
y = data['Daily_RV_Proxy']
mask = ~y.isna()
X = X[mask]
y = y[mask]
print(f"Feature shape: {X.shape}, Target shape: {y.shape}")
print(f"NaNs in X: {X.isna().sum().sum()}, NaNs in y: {y.isna().sum()}")
return X, y
# 4. Prepare LSTM data (sequence)
def prepare_lstm_data(data, seq_length=20):
features = ['Returns', 'VIX_Close', 'DGS10', 'Daily_RV_Proxy']
if data[features].isna().sum().sum() > 0:
print(f"NaNs found in LSTM features: {data[features].isna().sum()}")
print("Forward-filling NaNs...")
data_copy = data[features].fillna(method='ffill')
else:
data_copy = data[features].copy()
X, y = [], []
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data_copy)
for i in range(len(data_copy) - seq_length - 1):
X.append(scaled_data[i:i+seq_length])
y.append(scaled_data[i+seq_length, 3])
X = np.array(X)
y = np.array(y)
print(f"LSTM sequence data shapes: X={X.shape}, y={y.shape}")
if np.isnan(X).any() or np.isnan(y).any():
print("Warning: NaNs remain in LSTM data!")
mask = ~(np.isnan(X).any(axis=(1,2)) | np.isnan(y))
X = X[mask]
y = y[mask]
print(f"After removing NaN samples: X={X.shape}, y={y.shape}")
return X, y, scaler
# 5. Train and predict models
def train_and_predict_models(train_data, val_data, test_data):
results = {}
print("Preparing feature data...")
X_train, y_train = prepare_ml_features(train_data)
X_val, y_val = prepare_ml_features(val_data)
X_test, y_test = prepare_ml_features(test_data)
print(f"Train: X={X_train.shape}, y={y_train.shape}")
print(f"Validation: X={X_val.shape}, y={y_val.shape}")
print(f"Test: X={X_test.shape}, y={y_test.shape}")
if X_train.isna().sum().sum() > 0 or y_train.isna().sum() > 0:
print("Warning: NaNs found in training data, cleaning...")
mask = ~(X_train.isna().any(axis=1) | y_train.isna())
X_train = X_train[mask]
y_train = y_train[mask]
print(f"After cleaning: X_train={X_train.shape}, y_train={y_train.shape}")
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
# 1. HAR model
print("Training HAR model...")
try:
har_model = HARModel()
train_rv = train_data['Daily_RV_Proxy'].dropna()
har_model.fit(train_rv, add_lags=True)
print("Predicting with HAR model...")
full_train_rv = pd.concat([train_data['Daily_RV_Proxy'], val_data['Daily_RV_Proxy']]).dropna()
n_test_har = len(test_data)
har_predictions = []
current_rv = full_train_rv.copy()
for i in range(n_test_har):
pred = har_model.predict(current_rv, steps=1)
har_predictions.append(pred[0])
if i < len(test_data['Daily_RV_Proxy']):
actual_val = test_data['Daily_RV_Proxy'].iloc[i]
if not pd.isna(actual_val):
new_index = test_data.index[i]
current_rv = pd.concat([current_rv, pd.Series([actual_val], index=[new_index])])
else:
new_index = test_data.index[i] if i < len(test_data) else current_rv.index[-1] + pd.Timedelta(days=1)
current_rv = pd.concat([current_rv, pd.Series([pred[0]], index=[new_index])])
har_pred = np.array(har_predictions)
test_rv_har = test_data['Daily_RV_Proxy'].values
min_length = min(len(har_pred), len(test_rv_har))
results['HAR'] = {'pred': har_pred[:min_length], 'true': test_rv_har[:min_length]}
print(f"HAR predictions complete: {len(har_pred)} points")
except Exception as e:
print(f"HAR model training failed: {str(e)}")
traceback_info = traceback.format_exc()
print(f"Traceback: {traceback_info}")
print("Skipping HAR model...")
# 2. Random Forest
print("Training Random Forest model...")
try:
model_rf = RandomForestRegressor(n_estimators=100, max_depth=5, random_state=42)
model_rf.fit(X_train_scaled, y_train)
print("Random Forest training complete, predicting...")
rf_pred = model_rf.predict(X_test_scaled)
if len(rf_pred) != len(y_test):
print(f"Warning: RF prediction length ({len(rf_pred)}) != test target length ({len(y_test)})")
min_len = min(len(rf_pred), len(y_test))
rf_pred = rf_pred[:min_len]
y_test_rf = y_test[:min_len]
results['RandomForest'] = {'pred': rf_pred, 'true': y_test_rf}
print(f"Adjusted RF: pred={len(rf_pred)}, true={len(y_test_rf)}")
else:
results['RandomForest'] = {'pred': rf_pred, 'true': y_test}
print(f"Random Forest predictions: {len(rf_pred)} points, true: {len(y_test)} points")
print("Random Forest prediction complete")
except Exception as e:
print(f"Random Forest training failed: {str(e)}")
traceback_info = traceback.format_exc()
print(f"Traceback: {traceback_info}")
print("Skipping Random Forest model...")
# 3. LSTM
print("Preparing LSTM sequence data...")
try:
X_lstm_train, y_lstm_train, scaler_lstm = prepare_lstm_data(train_data)
X_lstm_val, y_lstm_val, _ = prepare_lstm_data(val_data)
X_lstm_test, y_lstm_test, _ = prepare_lstm_data(test_data)
print("Training LSTM model...")
model_lstm = Sequential([
LSTM(50, return_sequences=True, input_shape=(X_lstm_train.shape[1], X_lstm_train.shape[2])),
Dropout(0.2),
LSTM(50),
Dropout(0.2),
Dense(1)
])
model_lstm.compile(optimizer='adam', loss='mse')
print(f"LSTM training data shape: {X_lstm_train.shape}")
print("Starting LSTM training for 20 epochs...")
model_lstm.fit(
X_lstm_train, y_lstm_train,
validation_data=(X_lstm_val, y_lstm_val),
epochs=20, batch_size=32, verbose=1
)
print("LSTM training complete, predicting...")
lstm_pred = model_lstm.predict(X_lstm_test, verbose=1)
dummy_features = np.zeros((lstm_pred.shape[0], X_lstm_test.shape[2]))
dummy_features[:, 3] = lstm_pred.flatten()
lstm_pred_rescaled = scaler_lstm.inverse_transform(dummy_features)[:, 3]
n_test = len(y_test)
if len(lstm_pred_rescaled) != n_test:
print(f"Warning: LSTM prediction length ({len(lstm_pred_rescaled)}) != test length ({n_test})")
if len(lstm_pred_rescaled) < n_test:
print("Filling missing LSTM predictions with mean value")
fill_value = np.mean(lstm_pred_rescaled) if len(lstm_pred_rescaled) > 0 else train_data['Daily_RV_Proxy'].mean()
lstm_pred_rescaled = np.append(lstm_pred_rescaled, np.full(n_test - len(lstm_pred_rescaled), fill_value))
else:
lstm_pred_rescaled = lstm_pred_rescaled[:n_test]
dummy_true = np.zeros((y_lstm_test.shape[0], X_lstm_test.shape[2]))
dummy_true[:, 3] = y_lstm_test.flatten()
lstm_true = scaler_lstm.inverse_transform(dummy_true)[:, 3]
if len(lstm_pred_rescaled) != len(lstm_true):
print(f"Warning: LSTM pred length ({len(lstm_pred_rescaled)}) != true length ({len(lstm_true)})")
min_len = min(len(lstm_pred_rescaled), len(lstm_true))
results['LSTM'] = {'pred': lstm_pred_rescaled[:min_len], 'true': lstm_true[:min_len]}
else:
results['LSTM'] = {'pred': lstm_pred_rescaled, 'true': lstm_true}
print(f"LSTM predictions: {len(lstm_pred_rescaled)} points, true: {len(lstm_true)} points")
print("LSTM prediction complete")
except Exception as e:
print(f"LSTM training failed: {str(e)}")
traceback_info = traceback.format_exc()
print(f"Traceback: {traceback_info}")
print("Skipping LSTM model...")
if not results:
raise ValueError("All models failed!")
return results
# 6. Evaluate and visualize
def evaluate_and_plot(results, test_data, train_data):
metrics = {}
print("Evaluating model performance...")
for model_name, result in list(results.items()):
pred = result['pred']
true = result['true']
print(f"Model {model_name}: pred length={len(pred)}, true length={len(true)}")
if len(pred) == 0 or len(true) == 0:
print(f"Warning: {model_name} has empty predictions or true values; removing from results.")
del results[model_name]
continue
if len(pred) != len(true):
print(f"Warning: {model_name} pred vs true length mismatch ({len(pred)} vs {len(true)}), truncating")
min_len = min(len(pred), len(true))
if isinstance(pred, pd.Series):
pred = pred.values
if isinstance(true, pd.Series):
true = true.values
results[model_name]['pred'] = pred[:min_len]
results[model_name]['true'] = true[:min_len]
print(f"After truncation: pred length={len(results[model_name]['pred'])}, true length={len(results[model_name]['true'])}")
if not results:
raise ValueError("No valid model results to evaluate.")
for model_name, result in results.items():
pred = result['pred']
true = result['true']
try:
if isinstance(pred, pd.Series):
pred = pred.values
if isinstance(true, pd.Series):
true = true.values
if len(pred) != len(true):
print(f"Error: {model_name} pred/true length still mismatch ({len(pred)} vs {len(true)})")
continue
if np.isnan(pred).any() or np.isnan(true).any():
print(f"Error: {model_name} pred or true contains NaN")
mask = ~(np.isnan(pred) | np.isnan(true))
if mask.sum() > 0:
print(f"Removing NaNs: reducing from {len(pred)} to {mask.sum()} points")
pred = pred[mask]
true = true[mask]
else:
print("Cannot proceed: all points are NaN")
continue
print(f"{model_name} sample predictions: {pred[:5]}")
print(f"{model_name} sample true values: {true[:5]}")
mse = mean_squared_error(true, pred)
mae = mean_absolute_error(true, pred)
if len(pred) > 1:
direction_pred = np.sign(pred[1:] - pred[:-1])
direction_true = np.sign(true[1:] - true[:-1])
direction = np.mean((direction_pred == direction_true).astype(int))
else:
direction = np.nan
metrics[model_name] = {'MSE': mse, 'MAE': mae, 'Direction': direction}
print(f"{model_name} metrics computed successfully")
except Exception as e:
print(f"Warning: Failed to compute metrics for {model_name} - {str(e)}")
traceback_info = traceback.format_exc()
print(f"Traceback: {traceback_info}")
continue
print("\nModel Performance:")
for model_name, metric in metrics.items():
direction_str = f"{metric['Direction']:.4f}" if not np.isnan(metric['Direction']) else "N/A"
print(f"{model_name}: MSE={metric['MSE']:.6f}, MAE={metric['MAE']:.6f}, Direction={direction_str}")
if not metrics:
print("No metrics to visualize or export.")
return
# Export metrics to a Word document
doc = Document()
doc.add_heading('Model Performance Metrics', level=1)
table = doc.add_table(rows=1 + len(metrics), cols=4)
hdr_cells = table.rows[0].cells
hdr_cells[0].text = 'Model'
hdr_cells[1].text = 'MSE'
hdr_cells[2].text = 'MAE'
hdr_cells[3].text = 'Direction Accuracy'
row_idx = 1
for model_name, metric in metrics.items():
row_cells = table.rows[row_idx].cells
row_cells[0].text = model_name
row_cells[1].text = f"{metric['MSE']:.6f}"
row_cells[2].text = f"{metric['MAE']:.6f}"
direction_str = f"{metric['Direction']:.4f}" if not np.isnan(metric['Direction']) else "N/A"
row_cells[3].text = direction_str
row_idx += 1
word_path = os.path.join(RESULT_DIR, 'model_performance.docx')
doc.save(word_path)
print(f"Exported model performance metrics to {word_path}")
# Plot predictions
print("Plotting prediction results...")
try:
plt.figure(figsize=(12, 6))
reference_model = next(iter(results.keys()))
dates = test_data.index[-min(len(test_data), len(results[reference_model]['true'])):]
plt.plot(
dates[-len(results[reference_model]['true']):],
results[reference_model]['true'],
label='True Daily RV Proxy (Garman-Klass)',
color='black',
linewidth=2
)
colors = ['red', 'blue', 'green', 'orange', 'purple']
for i, (model_name, result) in enumerate(results.items()):
color = colors[i % len(colors)]
plt.plot(
dates[-len(result['true']):],
result['pred'],
label=f'{model_name} Prediction',
color=color,
alpha=0.8
)
plt.title('Daily RV Proxy Forecast Comparison (Garman-Klass)', fontsize=14)
plt.xlabel('Date', fontsize=12)
plt.ylabel('Annualized Volatility', fontsize=12)
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(RESULT_DIR, 'daily_rv_proxy_forecast.png'), dpi=300, bbox_inches='tight')
print("Saved prediction plot as daily_rv_proxy_forecast.png")
plt.show()
except Exception as e:
print(f"Failed to plot predictions: {str(e)}")
# Feature importance (Random Forest)
if 'RandomForest' in results:
try:
print("Plotting feature importance...")
model_rf = RandomForestRegressor(n_estimators=100, max_depth=5, random_state=42)
X_train, y_train = prepare_ml_features(train_data)
mask = ~(X_train.isna().any(axis=1) | y_train.isna())
X_train = X_train[mask]
y_train = y_train[mask]
if len(X_train) > 0 and len(y_train) > 0:
print(f"Training RF for feature importance, data shape: X={X_train.shape}, y={y_train.shape}")
model_rf.fit(X_train, y_train)
feature_names = X_train.columns.tolist()
feature_importance = pd.Series(model_rf.feature_importances_, index=feature_names)
plt.figure(figsize=(10, 6))
feature_importance.sort_values().plot(kind='barh')
plt.title('Feature Importance for Volatility Forecasting')
plt.xlabel('Relative Importance')
plt.tight_layout()
plt.savefig(os.path.join(RESULT_DIR, 'feature_importance.png'))
print("Saved feature importance plot as feature_importance.png")
print("\nFeature importance ranking:")
for i, (feature, importance) in enumerate(feature_importance.sort_values(ascending=False).items()):
print(f"{i+1}. {feature}: {importance:.4f}")
else:
print("Insufficient data to plot feature importance.")
except Exception as e:
print(f"Failed to plot feature importance: {str(e)}")
traceback_info = traceback.format_exc()
print(f"Traceback: {traceback_info}")
else:
print("RandomForest not available; skipping feature importance.")
# Main script
if __name__ == "__main__":
start_time = time.time()
spy_file = r'C:\Users\19016\Desktop\北京大学\03金融计量\report\data\HistoricalData_SPY.csv'
vix_file = r'C:\Users\19016\Desktop\北京大学\03金融计量\report\data\VIX_History.csv'
treasury_file = r'C:\Users\19016\Desktop\北京大学\03金融计量\report\data\DGS10.csv'
print("\n🔍 Volatility Forecasting Model Training & Evaluation 🔍")
print("="*70)
print(f"Using files:\nSPY: {spy_file}\nVIX: {vix_file}\nTreasury: {treasury_file}")
try:
print("\n[Step 1: Data Preprocessing]")
print("-"*50)
data = load_and_preprocess_data(spy_file, vix_file, treasury_file)
data.to_csv(os.path.join(RESULT_DIR, 'preprocessed_data.csv'))
print(f"✅ Preprocessed data saved to {os.path.join(RESULT_DIR, 'preprocessed_data.csv')}")
# Preliminary time series plots
plt.figure(figsize=(14, 6))
plt.plot(data.index, data['Close'], label='SPY Close', color='blue')
plt.title('SPY Closing Price Time Series')
plt.xlabel('Date')
plt.ylabel('Closing Price')
plt.tight_layout()
plt.savefig(os.path.join(RESULT_DIR, 'spy_close_timeseries.png'), dpi=300)
plt.show()
plt.figure(figsize=(14, 6))
plt.plot(data.index, data['VIX_Close'], label='VIX Close', color='orange')
plt.title('VIX Index Time Series')
plt.xlabel('Date')
plt.ylabel('VIX')
plt.tight_layout()
plt.savefig(os.path.join(RESULT_DIR, 'vix_timeseries.png'), dpi=300)
plt.show()
plt.figure(figsize=(14, 6))
plt.plot(data.index, data['DGS10'], label='10-Year Treasury Yield', color='green')
plt.title('10-Year Treasury Yield Time Series')
plt.xlabel('Date')
plt.ylabel('DGS10 (%)')
plt.tight_layout()
plt.savefig(os.path.join(RESULT_DIR, 'dgs10_timeseries.png'), dpi=300)
plt.show()
# Split datasets
train_data, val_data, test_data = split_data(data)
print(f"\nDataset split complete:")
print(f"* Train: {train_data.shape[0]} rows ({train_data.index.min():%Y-%m-%d} to {train_data.index.max():%Y-%m-%d})")
print(f"* Validation: {val_data.shape[0]} rows ({val_data.index.min():%Y-%m-%d} to {val_data.index.max():%Y-%m-%d})")
print(f"* Test: {test_data.shape[0]} rows ({test_data.index.min():%Y-%m-%d} to {test_data.index.max():%Y-%m-%d})")
print("\n[Step 2: Model Training & Prediction]")
print("-"*50)
results = train_and_predict_models(train_data, val_data, test_data)
print("\n[Step 3: Model Evaluation & Visualization]")
print("-"*50)
evaluate_and_plot(results, test_data, train_data)
total_time = time.time() - start_time
print(f"\n✅ All done! Total time: {total_time:.2f} seconds ({total_time/60:.2f} minutes)")
except Exception as e:
print(f"\n❌ Error during execution: {str(e)}")
import traceback
traceback.print_exc()