Kaggle Optiver竞赛前排大神开源方案¶
1 引言¶
故事的起因是8月26号的时候看到了一篇微信文章,Kaggle Optiver竞赛的前排(top 50)选手竟然公开了自己的方案。
本着看热闹以及学习的态度,就让我们一起来看一下具有无私开源精神的前排大神的方案吧。
2 竞赛介绍¶
Optiver是非常top的对冲基金,赛题的要求是预测高频金融数据(股票)的波动率。为什么要预测波动率呢?一是因为波动率相较股价容易预测,二是准确预测波动率可以运用于期权定价等。
在金融范畴中,波动率有不同的衡量方式,如最简单的股票收益率的标准差,也有其他的,如高频中的已实现波动率,隐含波动率等。
赛题是一个回归问题,给出了10分钟的订单薄数据,需要我们预测未来10分钟的已实现波动率:
其中,r是股票的log收益率,即股价取log做差分。评价指标是RMSPE(root mean square percentage error):
3 具体代码¶
下面我们就来看看前排大神的代码。
3.1 import packages¶
第一部分照例是导入一堆包。
from IPython.core.display import display, HTML
import pandas as pd
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import glob
import os
import gc
from joblib import Parallel, delayed
from sklearn import preprocessing, model_selection
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import QuantileTransformer
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt
import seaborn as sns
import numpy.matlib
path_submissions = '/'
target_name = 'target'
scores_folds = {}
3.2 特征工程¶
第二部分是构造了一堆特征。其中,题目说明了股价的计算方式采用WAP(weighted averaged price)的方式:
这种计算方式适用于有买一卖一等的订单薄数据,同时考虑了价格和挂单量。
3.2.1 订单薄特征¶
关于订单薄特征(在book_preprocessor函数中),我们可以看到构造了基于不同档位(买一卖一,买二卖二,买三卖三,买四卖四)WAP计算的已实现波动率,一些盘口的特征,如价差,size等等。并用滑动窗口,构造了以上特征的一些统计量。
# data directory
data_dir = '../input/optiver-realized-volatility-prediction/'
# Function to calculate first WAP
def calc_wap1(df):
wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
return wap
# Function to calculate second WAP
def calc_wap2(df):
wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
return wap
def calc_wap3(df):
wap = (df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']) / (df['bid_size1'] + df['ask_size1'])
return wap
def calc_wap4(df):
wap = (df['bid_price2'] * df['bid_size2'] + df['ask_price2'] * df['ask_size2']) / (df['bid_size2'] + df['ask_size2'])
return wap
# Function to calculate the log of the return
# Remember that logb(x / y) = logb(x) - logb(y)
def log_return(series):
return np.log(series).diff()
# Calculate the realized volatility
def realized_volatility(series):
return np.sqrt(np.sum(series**2))
# Function to count unique elements of a series
def count_unique(series):
return len(np.unique(series))
# Function to read our base train and test set
def read_train_test():
train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv')
# Create a key to merge with book and trade data
train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)
print(f'Our training set has {train.shape[0]} rows')
return train, test
# Function to preprocess book data (for each stock id)
def book_preprocessor(file_path):
df = pd.read_parquet(file_path)
# Calculate Wap
df['wap1'] = calc_wap1(df)
df['wap2'] = calc_wap2(df)
df['wap3'] = calc_wap3(df)
df['wap4'] = calc_wap4(df)
# Calculate log returns
df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return)
df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return)
df['log_return3'] = df.groupby(['time_id'])['wap3'].apply(log_return)
df['log_return4'] = df.groupby(['time_id'])['wap4'].apply(log_return)
# Calculate wap balance
df['wap_balance'] = abs(df['wap1'] - df['wap2'])
# Calculate spread
df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / ((df['ask_price2'] + df['bid_price2']) / 2)
df['bid_spread'] = df['bid_price1'] - df['bid_price2']
df['ask_spread'] = df['ask_price1'] - df['ask_price2']
df["bid_ask_spread"] = abs(df['bid_spread'] - df['ask_spread'])
df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))
# Dict for aggregations
create_feature_dict = {
'wap1': [np.sum, np.std],
'wap2': [np.sum, np.std],
'wap3': [np.sum, np.std],
'wap4': [np.sum, np.std],
'log_return1': [realized_volatility],
'log_return2': [realized_volatility],
'log_return3': [realized_volatility],
'log_return4': [realized_volatility],
'wap_balance': [np.sum, np.max],
'price_spread':[np.sum, np.max],
'price_spread2':[np.sum, np.max],
'bid_spread':[np.sum, np.max],
'ask_spread':[np.sum, np.max],
'total_volume':[np.sum, np.max],
'volume_imbalance':[np.sum, np.max],
"bid_ask_spread":[np.sum, np.max],
}
create_feature_dict_time = {
'log_return1': [realized_volatility],
'log_return2': [realized_volatility],
'log_return3': [realized_volatility],
'log_return4': [realized_volatility],
}
# Function to get group stats for different windows (seconds in bucket)
def get_stats_window(fe_dict,seconds_in_bucket, add_suffix = False):
# Group by the window
df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(fe_dict).reset_index()
# Rename columns joining suffix
df_feature.columns = ['_'.join(col) for col in df_feature.columns]
# Add a suffix to differentiate windows
if add_suffix:
df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
return df_feature
# Get the stats for different windows
df_feature = get_stats_window(create_feature_dict,seconds_in_bucket = 0, add_suffix = False)
df_feature_500 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 500, add_suffix = True)
df_feature_400 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 400, add_suffix = True)
df_feature_300 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 300, add_suffix = True)
df_feature_200 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 200, add_suffix = True)
df_feature_100 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 100, add_suffix = True)
# Merge all
df_feature = df_feature.merge(df_feature_500, how = 'left', left_on = 'time_id_', right_on = 'time_id__500')
df_feature = df_feature.merge(df_feature_400, how = 'left', left_on = 'time_id_', right_on = 'time_id__400')
df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
df_feature = df_feature.merge(df_feature_200, how = 'left', left_on = 'time_id_', right_on = 'time_id__200')
df_feature = df_feature.merge(df_feature_100, how = 'left', left_on = 'time_id_', right_on = 'time_id__100')
# Drop unnecesary time_ids
df_feature.drop(['time_id__500','time_id__400', 'time_id__300', 'time_id__200','time_id__100'], axis = 1, inplace = True)
# Create row_id so we can merge
stock_id = file_path.split('=')[1]
df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
df_feature.drop(['time_id_'], axis = 1, inplace = True)
return df_feature
3.2.2 交易特征¶
这部分特征包括实际交易价格,成交量的相关特征,如波动率,最小值,最大值等等。
# Function to preprocess trade data (for each stock id)
def trade_preprocessor(file_path):
df = pd.read_parquet(file_path)
df['log_return'] = df.groupby('time_id')['price'].apply(log_return)
df['amount']=df['price']*df['size']
# Dict for aggregations
create_feature_dict = {
'log_return':[realized_volatility],
'seconds_in_bucket':[count_unique],
'size':[np.sum, np.max, np.min],
'order_count':[np.sum,np.max],
'amount':[np.sum,np.max,np.min],
}
create_feature_dict_time = {
'log_return':[realized_volatility],
'seconds_in_bucket':[count_unique],
'size':[np.sum],
'order_count':[np.sum],
}
# Function to get group stats for different windows (seconds in bucket)
def get_stats_window(fe_dict,seconds_in_bucket, add_suffix = False):
# Group by the window
df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(fe_dict).reset_index()
# Rename columns joining suffix
df_feature.columns = ['_'.join(col) for col in df_feature.columns]
# Add a suffix to differentiate windows
if add_suffix:
df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
return df_feature
# Get the stats for different windows
df_feature = get_stats_window(create_feature_dict,seconds_in_bucket = 0, add_suffix = False)
df_feature_500 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 500, add_suffix = True)
df_feature_400 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 400, add_suffix = True)
df_feature_300 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 300, add_suffix = True)
df_feature_200 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 200, add_suffix = True)
df_feature_100 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 100, add_suffix = True)
def tendency(price, vol):
df_diff = np.diff(price)
val = (df_diff/price[1:])*100
power = np.sum(val*vol[1:])
return(power)
lis = []
for n_time_id in df['time_id'].unique():
df_id = df[df['time_id'] == n_time_id]
tendencyV = tendency(df_id['price'].values, df_id['size'].values)
f_max = np.sum(df_id['price'].values > np.mean(df_id['price'].values))
f_min = np.sum(df_id['price'].values < np.mean(df_id['price'].values))
df_max = np.sum(np.diff(df_id['price'].values) > 0)
df_min = np.sum(np.diff(df_id['price'].values) < 0)
# new
abs_diff = np.median(np.abs( df_id['price'].values - np.mean(df_id['price'].values)))
energy = np.mean(df_id['price'].values**2)
iqr_p = np.percentile(df_id['price'].values,75) - np.percentile(df_id['price'].values,25)
# vol vars
abs_diff_v = np.median(np.abs( df_id['size'].values - np.mean(df_id['size'].values)))
energy_v = np.sum(df_id['size'].values**2)
iqr_p_v = np.percentile(df_id['size'].values,75) - np.percentile(df_id['size'].values,25)
lis.append({'time_id':n_time_id,'tendency':tendencyV,'f_max':f_max,'f_min':f_min,'df_max':df_max,'df_min':df_min,
'abs_diff':abs_diff,'energy':energy,'iqr_p':iqr_p,'abs_diff_v':abs_diff_v,'energy_v':energy_v,'iqr_p_v':iqr_p_v})
df_lr = pd.DataFrame(lis)
df_feature = df_feature.merge(df_lr, how = 'left', left_on = 'time_id_', right_on = 'time_id')
# Merge all
df_feature = df_feature.merge(df_feature_500, how = 'left', left_on = 'time_id_', right_on = 'time_id__500')
df_feature = df_feature.merge(df_feature_400, how = 'left', left_on = 'time_id_', right_on = 'time_id__400')
df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
df_feature = df_feature.merge(df_feature_200, how = 'left', left_on = 'time_id_', right_on = 'time_id__200')
df_feature = df_feature.merge(df_feature_100, how = 'left', left_on = 'time_id_', right_on = 'time_id__100')
# Drop unnecesary time_ids
df_feature.drop(['time_id__500','time_id__400', 'time_id__300', 'time_id__200','time_id','time_id__100'], axis = 1, inplace = True)
df_feature = df_feature.add_prefix('trade_')
stock_id = file_path.split('=')[1]
df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)
return df_feature
# Function to get group stats for the stock_id and time_id
def get_time_stock(df):
vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_400', 'log_return2_realized_volatility_400',
'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 'log_return1_realized_volatility_200', 'log_return2_realized_volatility_200',
'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_400', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_200']
# Group by the stock id
df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
# Rename columns joining suffix
df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
df_stock_id = df_stock_id.add_suffix('_' + 'stock')
# Group by the stock id
df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
# Rename columns joining suffix
df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
df_time_id = df_time_id.add_suffix('_' + 'time')
# Merge with original dataframe
df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock'])
df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time'])
df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True)
return df
3.2.3 聚合特征¶
这一块采用了kmeans聚类的方式,按相似股票聚合,并计算以上特征的平均值作为新的特征。
from sklearn.cluster import KMeans
# making agg features
train_p = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
train_p = train_p.pivot(index='time_id', columns='stock_id', values='target')
corr = train_p.corr()
ids = corr.index
kmeans = KMeans(n_clusters=7, random_state=0).fit(corr.values)
print(kmeans.labels_)
l = []
for n in range(7):
l.append ( [ (x-1) for x in ( (ids+1)*(kmeans.labels_ == n)) if x > 0] )
mat = []
matTest = []
n = 0
for ind in l:
print(ind)
newDf = train.loc[train['stock_id'].isin(ind) ]
newDf = newDf.groupby(['time_id']).agg(np.nanmean)
newDf.loc[:,'stock_id'] = str(n)+'c1'
mat.append ( newDf )
newDf = test.loc[test['stock_id'].isin(ind) ]
newDf = newDf.groupby(['time_id']).agg(np.nanmean)
newDf.loc[:,'stock_id'] = str(n)+'c1'
matTest.append ( newDf )
n+=1
mat1 = pd.concat(mat).reset_index()
mat1.drop(columns=['target'],inplace=True)
mat2 = pd.concat(matTest).reset_index()
3.3 并行化以及损失函数计算¶
并行化是对每只股票做特征计算的并行。
# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids, is_train = True):
# Parrallel for loop
def for_joblib(stock_id):
# Train
if is_train:
file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
# Test
else:
file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
# Preprocess book and trade data and merge them
df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')
# Return the merge dataframe
return df_tmp
# Use parallel api to call paralle for loop
df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
# Concatenate all the dataframes that return from Parallel
df = pd.concat(df, ignore_index = True)
return df
# Function to calculate the root mean squared percentage error
def rmspe(y_true, y_pred):
return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))
# Function to early stop with root mean squared percentage error
def feval_rmspe(y_pred, lgb_train):
y_true = lgb_train.get_label()
return 'RMSPE', rmspe(y_true, y_pred), False
# Read train and test
train, test = read_train_test()
# Get unique stock ids
train_stock_ids = train['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
train_ = preprocessor(train_stock_ids, is_train = True)
train = train.merge(train_, on = ['row_id'], how = 'left')
# Get unique stock ids
test_stock_ids = test['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
test_ = preprocessor(test_stock_ids, is_train = False)
test = test.merge(test_, on = ['row_id'], how = 'left')
# Get group stats of time_id and stock_id
train = get_time_stock(train)
test = get_time_stock(test)
3.4 模型训练¶
关于模型部分,采用了一个LightGBM和一个NN模型ensemble,这一块并没有太多独特的东西,有兴趣的可以看下源代码。
3.5 模型融合及提交¶
Ensemble这块就是LGBM和NN的简单平均。
test_nn["row_id"] = test_nn["stock_id"].astype(str) + "-" + test_nn["time_id"].astype(str)
test_nn[target_name] = (test_predictions_nn+predictions_lgb)/2
score = round(rmspe(y_true = train_nn[target_name].values, y_pred = train_nn[pred_name].values),5)
print('RMSPE {}: {} - Folds: {}'.format(model_name, score, scores_folds[model_name]))
display(test_nn[['row_id', target_name]].head(3))
test_nn[['row_id', target_name]].to_csv('submission.csv',index = False)
4 小结¶
可以看到前排大神在特征工程方面做了不少工作,同时工程上的良好实现也是有一个不错结果的必备基础。虽然模型并不fancy,但是结果依旧给力,值得我们好好学习。