天天看點

使用TensorFlow2.0實作一個簡單的股票預測

使用TensorFlow2.0實作一個簡單的股票預測

1、環境

Python3.7.4

PyCharm 2019.1.3 (Professional Edition)

Windows 10

Tensorflow2.0+cuda10.0

2、資料集及預處理

  • 資料集格式為一維資料,每行代表一天的資料,将資料集每十一天劃分為一組,前十天為訓練資料,第十一天作為标簽資料,如果大于第十天資料,則為1,否則為0。即問題是一個二分類任務。根據前十天走勢判斷第十一天上漲還是下跌。
  • 預處理過程将資料集每十一天作為一組資料,其中前十天是訓練資料,第十一天是标簽。資料集取csv檔案中前119702行資料(剛好被11整除),119702/11即10882組資料,取百分之八十作為訓練集(8705組),百分之二十作為測試集(2177組)。 則資料格式分為訓練集樣本[8705,10,1],訓練集标簽[8705,1],測試集樣本[2177,10,1],測試集标簽[2177,1]。具體過程見代碼。

3、源代碼

import os
import tensorflow as tf
import numpy as np
import csv
import copy
from tensorflow import keras
from tensorflow.keras import layers

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')

def preprocessing():
    data = csv.reader(open('股票資料.csv', encoding='ANSI'))
    count = 0 #劃分訓練集和測試集,80%作為訓練集
    group = 11 #每十一個資料作為一組
    train_x = []
    train_y = []
    test_x = []
    test_y = []
    ten = 0
    group_list = []
    line_list = []
    for line in data:
        if group == 1:   #第十一天為标簽資料
            if float(line[0]) > ten:   #第十一天大于第十天為正,即1
                list = [1]
                if count > 95755:
                    test_y.append(copy.deepcopy(list))
                else:
                    train_y.append(copy.deepcopy(list))
            else:
                list = [0]
                if count > 95755:
                    test_y.append(copy.deepcopy(list))
                else:
                    train_y.append(copy.deepcopy(list))
            group = 11
            if count > 95755:
                test_x.append(copy.deepcopy(group_list))      #使用深拷貝
            else:
                train_x.append(copy.deepcopy(group_list))
            group_list.clear()
        else:
            if group == 2:   #儲存第十天的資料,用于和第十一天比較
                ten = float(line[0])
            line_list.append(float(line[0]))
            #group_list.append(float(line[0]))
            group_list.append(copy.deepcopy(line_list))
            line_list.clear()
            group = group - 1
        count = count + 1

    print(tf.constant(train_x))   #[8705,10,1]
    print(tf.constant(train_y))   #[8705,1]
    print(tf.constant(test_x))    #[2177,10,1]
    print(tf.constant(test_y))    #[2177,1]
    return tf.constant(train_x),tf.constant(train_y),tf.constant(test_x),tf.constant(test_y)

class MyRNN(keras.Model):

    def __init__(self, units):
        super(MyRNN, self).__init__()

        self.rnn = keras.Sequential([
            layers.LSTM(units, dropout=0.5, return_sequences=True),
            layers.LSTM(units, dropout=0.5)
        ]
        )

        self.outlayer = layers.Dense(1)

    def call(self, inputs, training=None):
        x = inputs
        x = self.rnn(x)
        x = self.outlayer(x)
        prob = tf.sigmoid(x)
        return prob


def main():
    train_x,train_y,test_x,test_y = preprocessing()
    batchsz = 128

    db_train = tf.data.Dataset.from_tensor_slices((train_x, train_y))
    db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)  # 可能不是batchsz的整數倍,将最後一部分丢棄掉
    db_test = tf.data.Dataset.from_tensor_slices((test_x, test_y))
    db_test = db_test.batch(batchsz, drop_remainder=True)

    print(db_train)
    print(db_test)

    units = 64
    epochs = 40

    model = MyRNN(units)
    model.compile(optimizer = keras.optimizers.Adam(0.001),
                  loss = tf.losses.BinaryCrossentropy(),
                  metrics=['accuracy'])
    model.fit(db_train, epochs=epochs, validation_data=db_test)
    model.evaluate(db_test)

if __name__ == '__main__':
    main()