在使用各种预测方法之前,我们首先需要对数据进行预处理和准备。代码如下:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import LSTM, Dense
# 将数据转换为时间序列
data['date'] = pd.to_datetime(data['date'], format='%Y%m%d')
data.set_index('date', inplace=True)
# 划分训练集和测试集
train_data = data.iloc[:-3]
test_data = data.iloc[-3:]
# SARIMA模型
def sarima_model(train_data):
model = SARIMAX(train_data['close'], order=(1, 0, 1), seasonal_order=(1, 0, 1, 7))
model_fit = model.fit(disp=False)
return model_fit
# 简单移动平均线
def simple_moving_average(train_data):
return train_data['close'].rolling(window=7).mean().iloc[-1]
# 指数加权移动平均线
def exponential_weighted_average(train_data):
return train_data['close'].ewm(span=7, adjust=False).mean().iloc[-1]
# Bollinger带
def bollinger_bands(train_data):
rolling_mean = train_data['close'].rolling(window=7).mean()
rolling_std = train_data['close'].rolling(window=7).std()
upper_band = rolling_mean + 2 * rolling_std
lower_band = rolling_mean - 2 * rolling_std
return upper_band.iloc[-1], lower_band.iloc[-1]
# 相对强弱指标
def relative_strength_index(train_data):
diff = train_data['close'].diff(1)
gain = diff.mask(diff < 0, 0).rolling(window=7).mean()
loss = abs(diff.mask(diff > 0, 0)).rolling(window=7).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1]
# 随机指标
def stochastic_oscillator(train_data):
high_max = train_data['high'].rolling(window=7).max()
low_min = train_data['low'].rolling(window=7).min()
K = (train_data['close'] - low_min) / (high_max - low_min)
D = K.rolling(window=3).mean()
return K.iloc[-1], D.iloc[-1]
# 线性回归
def linear_regression(train_data):
X = np.arange(len(train_data)).reshape(-1, 1)
y = train_data['close'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
model = LinearRegression()
model.fit(X_train, y_train)
return model.predict([[len(train_data)+1]])[0]
# 随机森林回归
def random_forest_regression(train_data):
X = np.arange(len(train_data)).reshape(-1, 1)
y = train_data['close'].values
model = RandomForestRegressor(n_estimators=100, random_state=0)
model.fit(X, y)
return model.predict([[len(train_data)+1]])[0]
# 支持向量回归法
def support_vector_regression(train_data):
X = np.arange(len(train_data)).reshape(-1, 1)
y = train_data['close'].values
model = SVR(kernel='linear')
model.fit(X, y)
return model.predict([[len(train_data)+1]])[0]
# 自回归移动平均法
def autoregressive_moving_average(train_data):
model = SARIMAX(train_data['close'], order=(1, 0, 1))
model_fit = model.fit(disp=False)
return model_fit.forecast(steps=3)[-1][-1]
# 长短期记忆模型
def lstm_model(train_data):
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(train_data['close'].values.reshape(-1, 1))
X_train, y_train = [], []
for i in range(len(scaled_data)-7):
X_train.append(scaled_data[i:i+7, 0])
y_train.append(scaled_data[i+7, 0])
X_train, y_train = np.array(X_train), np.array(y_train)
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
model = Sequential()
model.add(LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], 1)))
model.add(LSTM(units=50))
model.add(Dense(units=1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=0)
last_week = scaled_data[-7:]
X_test = np.array([last_week])
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
predicted = model.predict(X_test)
return scaler.inverse_transform(predicted)[0][0]
# 预测未来3天价格
sarima_result = []
for i in range(3):
train = train_data['close']
model_fit = sarima_model(train)
prediction = model_fit.forecast(steps=1)[-1][0]
sarima_result.append(prediction)
train = train.append(test_data['close'].iloc[i])
sma_result = [simple_moving_average(train_data)]
ewa_result = [exponential_weighted_average(train_data)]
bb_result = bollinger_bands(train_data)
rsi_result = [relative_strength_index(train_data)]
stochastic_result = stochastic_oscillator(train_data)
lr_result = [linear_regression(train_data)]
rf_result = [random_forest_regression(train_data)]
svr_result = [support_vector_regression(train_data)]
arima_result = [autoregressive_moving_average(train_data)]
lstm_result = [lstm_model(train_data)]
# 计算各种预测价格的均值
sarima_mean = np.mean(sarima_result)
sma_mean = np.mean(sma_result)
ewa_mean = np.mean(ewa_result)
bb_mean = np.mean(bb_result)
rsi_mean = np.mean(rsi_result)
stochastic_mean = np.mean(stochastic_result)
lr_mean = np.mean(lr_result)
rf_mean = np.mean(rf_result)
svr_mean = np.mean(svr_result)
arima_mean = np.mean(arima_result)
lstm_mean = np.mean(lstm_result)
# 打印预测结果
print("SARIMA预测结果:", sarima_result)
print("简单移动平均线预测结果:", sma_result)
print("指数加权移动平均线预测结果:", ewa_result)
print("Bollinger带预测结果:", bb_result)
print("相对强弱指标预测结果:", rsi_result)
print("随机指标预测结果:", stochastic_result)
print("线性回归预测结果:", lr_result)
print("随机森林回归预测结果:", rf_result)
print("支持向量回归法预测结果:", svr_result)
print("自回归移动平均法预测结果:", arima_result)
print("长短期记忆模型预测结果:", lstm_result)
# 打印均值及建议
print("各种预测价格的均值:")
print("SARIMA均值:", sarima_mean)
print("简单移动平均线均值:", sma_mean)
print("指数加权移动平均线均值:", ewa_mean)
print("Bollinger带均值:", bb_mean)
print("相对强弱指标均值:", rsi_mean)
print("随机指标均值:", stochastic_mean)
print("线性回归均值:", lr_mean)
print("随机森林回归均值:", rf_mean)
print("支持向量回归法均值:", svr_mean)
print("自回归移动平均法均值:", arima_mean)
print("长短期记忆模型均值:", lstm_mean)
print("建议:根据预测结果,可以综合考虑SARIMA、Bollinger带和长短期记忆模型的预测结果,进行投资决策。然而,请注意市场风险,投资需谨慎。")
根据上述代码运行结果,将得到各种预测方法对未来3天价格的预测结果。
最后还会给出各种预测价格的均值和建议。
请注意,这只是一种量化分析方法的示例,其准确性取决于所使用的算法和数据质量。股票市场随时可能受到各种因素的影响,因此预测结果仅供参考,不构成投资建议。
civilpy:Python数据分析及可视化实例目录