【金融】【pytorch】使用深度學習預測期貨收盤價漲跌——資料處理
-
- 讀取資料
- 将資料按3年+3月分段
- 取資料并進行初步處理
- 處理出各種期貨金融名額
- 制作資料集
讀取資料
其中
pandas_techinal_indicators
參考jmartinezheras/reproduce-stock-market-direction-random-forests
import torch
from torch.autograd import Variable
import torch.nn as nn
import pandas as pd
from pandas import DataFrame
import matplotlib.pyplot as plt
import numpy as np
import random
import pandas_techinal_indicators as ta
np.random.seed(42)
random.seed(42)
# %matplotlib inline
# %config InlineBackend.figure_format = 'svg'
df1 = pd.read_csv(r'xxxxxxx/hengSheng_0404.csv')
# df1 = pd.read_csv(r'D:/MINE_FILE/dataSet/market_index_data/hengSheng_0404.csv')
df1.head()

将資料按3年+3月分段
每次取3年為訓練集,後三個月為測試集。下一組訓練集和測試集後推三個月。 每組訓練集中80%為訓練集,20%為驗證集。參考:M’Ng J , Mehralizadeh M . Forecasting East Asian Indices Futures via a Novel Hybrid of Wavelet-PCA Denoising and Artificial Neural Network Models[J]. PLOS ONE, 2016, 11.
train_ptr = []
test_ptr = []
end_ptr = []
date_flag = [2, 3*12+2, 3*12+5] # 分别代表2000.2,2003.2,2003.5
for i in range(0, len(df1)):
num = (df1.iloc[i]['year'] - 2006) * 12 + df1.iloc[i]['month']
if num == date_flag[0]:
train_ptr.append(i)
date_flag[0] += 3
if num == date_flag[1]:
test_ptr.append(i)
date_flag[1] += 3
if num == date_flag[2]:
end_ptr.append(i)
date_flag[2] += 3
print(len(end_ptr))
print(train_ptr)
取資料并進行初步處理
aapl = df1[['Open', 'High', 'Low', 'Close', 'Volume']]
aapl.head()
def get_exp_preprocessing(df, alpha=0.9):
edata = df.ewm(alpha=alpha).mean()
return edata
saapl = get_exp_preprocessing(aapl)
saapl.head() #saapl stands for smoothed aapl
處理出各種期貨金融名額
def feature_extraction(data):
for x in [5, 14, 26, 44, 66]:
# for x in [14]:
data = ta.relative_strength_index(data, n=x)
data = ta.stochastic_oscillator_d(data, n=x)
data = ta.accumulation_distribution(data, n=x)
data = ta.average_true_range(data, n=x)
data = ta.momentum(data, n=x)
data = ta.money_flow_index(data, n=x)
data = ta.rate_of_change(data, n=x)
data = ta.on_balance_volume(data, n=x)
data = ta.commodity_channel_index(data, n=x)
data = ta.ease_of_movement(data, n=x)
data = ta.trix(data, n=x)
data = ta.vortex_indicator(data, n=x)
data['ema50'] = data['Close'] / data['Close'].ewm(50).mean()
data['ema21'] = data['Close'] / data['Close'].ewm(21).mean()
data['ema14'] = data['Close'] / data['Close'].ewm(14).mean()
data['ema5'] = data['Close'] / data['Close'].ewm(5).mean()
#Williams %R is missing
data = ta.macd(data, n_fast=12, n_slow=26)
# del(data['Open'])
# del(data['High'])
# del(data['Low'])
# del(data['Volume'])
return data
def compute_prediction_int(df, n):
pred = (df.shift(-n)['Close'] >= df['Close'])
pred = pred.iloc[:-n]
return pred.astype(int)
def prepare_data(df, horizon):
data = feature_extraction(df).dropna().iloc[:-horizon]
data['pred'] = compute_prediction_int(data, n=horizon)
# del(data['Close'])
return data.dropna()
# 10天後收盤價是否上漲
data = prepare_data(saapl, 10)
y = data['pred']
#remove the output from the input
features = [x for x in data.columns if x not in ['gain', 'pred']]
X = data[features]
print(list(X.columns))
制作資料集
miData = X.values
scalarX = np.max(miData, axis=0) - np.min(miData, axis=0)
miData = (miData - np.min(miData, axis=0)) / scalarX
yData = y.values
print(yData)
#資料集和目标值指派,dataset為資料,look_back為以幾行資料為特征次元數量
def create_dataset(dataset, label, look_back):
data_x = []
data_y = []
batch_size = 50
ind = list(range(len(dataset)-look_back))
random.shuffle(ind)
# print(len(dataset), len(ind), int(len(ind) / batch_size))
for i in range(int(len(ind) / batch_size)):
# TODO: 考慮LSTM的機制,output是對應的後一天的還是全放最後一天的資料
ptr = ind[i * batch_size]
x_item = dataset[ptr:ptr+look_back, :]
y_item = label[ptr:ptr+look_back]
# TODO: 暫時設batch_size為1
x_item = torch.from_numpy(x_item.astype(np.float32))
y_item = torch.from_numpy(y_item.astype(np.float32))
x_item = torch.reshape(x_item, (look_back, 1 , dataset.shape[1]))
y_item = torch.reshape(y_item, (look_back, 1 , 1))
for j in range(1, batch_size):
ptr = ind[i * batch_size + j]
x_temp = dataset[ptr:ptr+look_back, :]
y_temp = label[ptr:ptr+look_back]
x_temp = torch.from_numpy(x_temp.astype(np.float32))
y_temp = torch.from_numpy(y_temp.astype(np.float32))
x_temp = torch.reshape(x_temp, (look_back, 1 , dataset.shape[1]))
y_temp = torch.reshape(y_temp, (look_back, 1 , 1))
x_item = torch.cat([x_item, x_temp], dim = 1)
y_item = torch.cat([y_item, y_temp], dim = 1)
y_item = y_item.long()
data_x.append(x_item)
data_y.append(y_item)
# return np.asarray(data_x), np.asarray(data_y) #轉為ndarray資料
return data_x, data_y
def create_Test_dataset(dataset, label, look_back):
data_x = []
data_y = []
x_item = torch.tensor([])
y_item = torch.tensor([])
for i in range(len(dataset)-look_back):
x_temp = dataset[i:i+look_back, :]
y_temp = label[i:i+look_back]
x_temp = torch.from_numpy(x_temp.astype(np.float32))
y_temp = torch.from_numpy(y_temp.astype(np.float32))
x_temp = torch.reshape(x_temp, (look_back, 1 , dataset.shape[1]))
y_temp = torch.reshape(y_temp, (look_back, 1 , 1))
x_item = torch.cat([x_item, x_temp], dim = 1)
y_item = torch.cat([y_item, y_temp], dim = 1)
y_item = y_item.long()
data_x.append(x_item)
data_y.append(y_item)
return data_x, data_y
def trainSet_split(dataX, dataY):
'''
80%為訓練集,20%為驗證集
'''
train_size = int(len(dataX)*0.8)
# TODO: 此時資料集順序未被打亂
ind = list(range(len(dataX)))
random.shuffle(ind)
trainLoaderX = []
trainLoaderY = []
for i in ind[:train_size]:
trainLoaderX.append(dataX[i])
trainLoaderY.append(dataY[i])
validateLoaderX = []
validateLoaderY = []
for i in ind[train_size:]:
validateLoaderX.append(dataX[i])
validateLoaderY.append(dataY[i])
return trainLoaderX, trainLoaderY, validateLoaderX, validateLoaderY