使用TensorFlow2.0實作一個簡單的股票預測
1、環境
Python3.7.4
PyCharm 2019.1.3 (Professional Edition)
Windows 10
Tensorflow2.0+cuda10.0
2、資料集及預處理
- 資料集格式為一維資料,每行代表一天的資料,将資料集每十一天劃分為一組,前十天為訓練資料,第十一天作為标簽資料,如果大于第十天資料,則為1,否則為0。即問題是一個二分類任務。根據前十天走勢判斷第十一天上漲還是下跌。
- 預處理過程将資料集每十一天作為一組資料,其中前十天是訓練資料,第十一天是标簽。資料集取csv檔案中前119702行資料(剛好被11整除),119702/11即10882組資料,取百分之八十作為訓練集(8705組),百分之二十作為測試集(2177組)。 則資料格式分為訓練集樣本[8705,10,1],訓練集标簽[8705,1],測試集樣本[2177,10,1],測試集标簽[2177,1]。具體過程見代碼。
3、源代碼
import os
import tensorflow as tf
import numpy as np
import csv
import copy
from tensorflow import keras
from tensorflow.keras import layers
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
def preprocessing():
data = csv.reader(open('股票資料.csv', encoding='ANSI'))
count = 0 #劃分訓練集和測試集,80%作為訓練集
group = 11 #每十一個資料作為一組
train_x = []
train_y = []
test_x = []
test_y = []
ten = 0
group_list = []
line_list = []
for line in data:
if group == 1: #第十一天為标簽資料
if float(line[0]) > ten: #第十一天大于第十天為正,即1
list = [1]
if count > 95755:
test_y.append(copy.deepcopy(list))
else:
train_y.append(copy.deepcopy(list))
else:
list = [0]
if count > 95755:
test_y.append(copy.deepcopy(list))
else:
train_y.append(copy.deepcopy(list))
group = 11
if count > 95755:
test_x.append(copy.deepcopy(group_list)) #使用深拷貝
else:
train_x.append(copy.deepcopy(group_list))
group_list.clear()
else:
if group == 2: #儲存第十天的資料,用于和第十一天比較
ten = float(line[0])
line_list.append(float(line[0]))
#group_list.append(float(line[0]))
group_list.append(copy.deepcopy(line_list))
line_list.clear()
group = group - 1
count = count + 1
print(tf.constant(train_x)) #[8705,10,1]
print(tf.constant(train_y)) #[8705,1]
print(tf.constant(test_x)) #[2177,10,1]
print(tf.constant(test_y)) #[2177,1]
return tf.constant(train_x),tf.constant(train_y),tf.constant(test_x),tf.constant(test_y)
class MyRNN(keras.Model):
def __init__(self, units):
super(MyRNN, self).__init__()
self.rnn = keras.Sequential([
layers.LSTM(units, dropout=0.5, return_sequences=True),
layers.LSTM(units, dropout=0.5)
]
)
self.outlayer = layers.Dense(1)
def call(self, inputs, training=None):
x = inputs
x = self.rnn(x)
x = self.outlayer(x)
prob = tf.sigmoid(x)
return prob
def main():
train_x,train_y,test_x,test_y = preprocessing()
batchsz = 128
db_train = tf.data.Dataset.from_tensor_slices((train_x, train_y))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True) # 可能不是batchsz的整數倍,将最後一部分丢棄掉
db_test = tf.data.Dataset.from_tensor_slices((test_x, test_y))
db_test = db_test.batch(batchsz, drop_remainder=True)
print(db_train)
print(db_test)
units = 64
epochs = 40
model = MyRNN(units)
model.compile(optimizer = keras.optimizers.Adam(0.001),
loss = tf.losses.BinaryCrossentropy(),
metrics=['accuracy'])
model.fit(db_train, epochs=epochs, validation_data=db_test)
model.evaluate(db_test)
if __name__ == '__main__':
main()