加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
myStock.py 9.41 KB
一键复制 编辑 原始数据 按行查看 历史
fengsl 提交于 2021-06-03 20:45 . 使用净值训练
from torch import nn
import torch
import pandas
import numpy
import matplotlib.pyplot as plt
import os
import pickle
import time
import requests
import matplotlib
matplotlib.use('TkAgg') # 大小写无所谓 tkaGg ,TkAgg 都行
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
RATIO = 1
class MyData(object):
def __init__(self, name='上证指数', days4train=30):
self.path = os.path.dirname(os.path.abspath(__file__))
self.name = name
self.days4train = days4train
def loadData(self, stock='SH000001'):
print('尝试加载本地数据...')
csvPath = os.path.join(self.path, 'csv', f'{self.name}.csv')
data = pandas.read_csv(csvPath, index_col=0, header=0)
localTime = time.localtime()
with open(os.path.join(self.path,'cookie.txt')) as f:
cookie=f.read()
date = f'{localTime[0]}-{localTime[1]:0>2}-{localTime[2]:0>2}'
infoUrl = f'https://stock.xueqiu.com/v5/stock/quote.json?symbol={stock}&extend=detail'
headers = {
'cookie': cookie,
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36 Edg/89.0.774.48'
}
print('尝试获取在线数据...')
rq = requests.get(infoUrl, headers=headers)
jsonData = rq.json()['data']['quote']
todayData = [
jsonData['current'],
jsonData['open'],
jsonData['volume'],
0.0,
jsonData['high'],
jsonData['amount'],
jsonData['low'],
]
data.loc[date] = todayData
print('开始分析....')
self.df = data
self.df.to_csv(csvPath, encoding='utf_8_sig')
data = data.close
data = numpy.array(data, dtype='float32')
startPoint = data[0]
oneData = [i*RATIO/startPoint for i in data] # 将数据净值化
self.absData = data
self.data = oneData
self.oneData = oneData
xSet, ySet = [], []
for i in range(len(self.data)-self.days4train):
xSet.append(self.data[i:i+self.days4train]) # 前days4train来推算后1天的
ySet.append(self.data[i+self.days4train])
xSet = numpy.array(xSet, dtype='float32')
ySet = numpy.array(ySet, dtype='float32')
self.xSet = xSet
self.ySet = ySet
self.startPoint = startPoint
def trainData(self):
n = len(self.xSet)
n = int(n*0.9)
trainX = self.xSet[:n]
trainY = self.ySet[:n]
# 将数据改变形状,RNN 读入的数据维度是 (seq_size, batch_size, feature_size)
trainX = trainX.reshape(-1, 1, self.days4train)
trainY = trainY.reshape(-1, 1, 1)
# 转为pytorch的tensor对象
trainX = torch.from_numpy(trainX)
trainY = torch.from_numpy(trainY)
if torch.cuda.is_available():
trainX = trainX.cuda() # 把数据放到显卡中
trainY = trainY.cuda() # 把数据放到显卡中
self.data = [trainX, trainY]
def run(self):
self.loadData()
self.trainData()
class LSTM(nn.Module):
"""
使用LSTM进行回归
参数:
- input_size: feature size
- hidden_size: number of hidden units
- output_size: number of output
- num_layers: layers of LSTM to stack
"""
def __init__(self, input_size, hidden_size, output_size=1, num_layers=2):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, _x):
x, _ = self.lstm(_x) # _x is input, size (seq_len, batch, input_size)
s, b, h = x.shape # x is output, size (seq_len, batch, hidden_size)
x = x.view(s*b, h)
x = self.fc(x)
x = x.view(s, b, -1) # 把形状改回来
return x
def train(data):
'训练'
start = time.time()
model = LSTM(data.days4train, 8, output_size=1, num_layers=2)
model.to(DEVICE)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
log = ''
losses = []
for i in range(10000):
out = model(data.data[0])
loss = loss_function(out, data.data[1])
loss.backward()
optimizer.step()
optimizer.zero_grad()
losses.append(loss.item())
if (i+1) % 10 == 0:
s = 'Epoch: {}, Loss:{:.5f}\n'.format(i+1, loss.item())
log += s
print(s)
with open(f'log{data.days4train}.txt', 'w+', encoding='utf-8') as f:
f.write(log)
torch.save(model.state_dict(), 'myRnn.pt')
plt.plot(losses, 'g', label='loss')
end = time.time()
print(f'耗时{end-start}s')
plt.show()
def test(data):
'测试'
startPoint = data.startPoint
if torch.cuda.is_available(): # 如果能用cuda就放到显卡中,否则放cpu中
rnnDict = torch.load('myRnn.pt')
else:
rnnDict = torch.load('myRnn.pt', map_location=torch.device('cpu'))
model = LSTM(data.days4train, 8, output_size=1, num_layers=2)
model.load_state_dict(rnnDict)
model = model.eval() # 转换成测试模式
# 注意这里用的是全集 模型的输出长度会比原数据少切片后再作图
# (seq_size, batch_size, feature_size)
dataset_x = data.xSet.reshape(-1, 1, data.days4train)
dataset_x = torch.from_numpy(dataset_x)
pred_test = model(dataset_x)
pred_test = pred_test.view(-1).data.numpy()
minData = min(data.absData)
maxData = max(data.absData)
pred_test = [i/RATIO*startPoint for i in pred_test]
real = data.absData[data.days4train:]
'''
plt.figure(dpi=300, figsize=(16, 12))
plt.plot(real, 'g', label='真实值')
plt.plot(pred_test, 'r', label='预测值')
#plt.plot((len(real)*0.7, len(real)*0.7), (minData, maxData), 'g--')
plt.show()
'''
def pre(data):
'预测'
date = data.df.index[-1]
startPoint = data.startPoint
if torch.cuda.is_available():
rnnDict = torch.load('myRnn.pt')
else:
rnnDict = torch.load('myRnn.pt', map_location=torch.device('cpu'))
real = data.absData[data.days4train:]
model = LSTM(data.days4train, 8, output_size=1, num_layers=2)
model.load_state_dict(rnnDict)
lastDays = data.oneData[-data.days4train:]
lastDays = numpy.array(lastDays, dtype='float32')
data.xSet = numpy.append(data.xSet, lastDays)
dataset_x = data.xSet.reshape(-1, 1, data.days4train)
dataset_x = torch.from_numpy(dataset_x)
pred = model(dataset_x)
pred = pred.view(-1).data.numpy()
minData = min(data.absData)
maxData = max(data.absData)
pred = [i/RATIO*startPoint for i in pred]
nextDay = pred[-1]
today = data.absData[-1]
path = os.path.dirname(os.path.abspath(__file__))
checkPath = os.path.join(path, 'check.dat')
if os.path.isfile(checkPath):
with open(checkPath, 'rb') as f:
checkData = pickle.load(f)
else:
checkData = {}
preRise = nextDay/today-1
if len(checkData)>1:
dates=list(checkData.keys())
yesterday=dates[dates.index(date)-1]
yesterday=checkData[yesterday]
realRise=today/yesterday['real']-1
if realRise*yesterday['preRise']<0:
isRight=0
else:
isRight=1
else:
realRise=0
isRight=1
checkData[date]={
'pre': nextDay,
'real': today,
'preRise': preRise,
'realRise': realRise,
'isRight': isRight,}
plt.figure(dpi=300, figsize=(12, 9))
plt.plot(real, 'g', label='真实值')
plt.plot(pred, 'r', label='预测值')
color = 'red' if nextDay > today else 'green'
plt.title(f'{data.name}基于深度学习的预测曲线\n{date}日预测明日收盘价可能是{nextDay:.2f}',
color=color, fontsize='large', fontweight='bold')
plt.savefig('todayPre.png')
plt.legend()
plt.show()
with open(checkPath, 'wb') as f:
pickle.dump(checkData, f)
return checkData
def showIsright(checkData):
'显示预测正确与否'
f, axes = plt.subplots(nrows=2, ncols=1, figsize=(8, 20))
isRight=[]
preRise=[]
realRise=[]
for date in checkData:
preRise.append(checkData[date]['preRise'])
realRise.append(checkData[date]['realRise'])
isRight.append(checkData[date]['isRight'])
x=list(checkData.keys())
labels = ['成功', '失败']
right = sum(isRight)
n = len(isRight)
sizes = [right/n, (n-right)/n]
explode = (0.1, 0)
plt.legend()
axes[0].plot(preRise, 'g', label='预测值')
axes[0].plot(realRise, 'r', label='真实值')
axes[0].set_title('预测涨幅与真实涨幅')
axes[0].set_xticks(range(len(x)))#设置刻度
axes[0].set_xticklabels(x)#改写刻度名称
axes[1].pie(sizes, explode=explode, labels=labels,
autopct='%1.1f%%', shadow=True, startangle=150, normalize=False)
axes[1].set_title('正确率分布')
plt.title('深度学习预测上证指数正确率情况', fontsize='large', fontweight='bold')
plt.show()
if __name__ == "__main__":
data = MyData(name='上证指数', days4train=30)
data.run()
#train(data)
#test(data)
checkData = pre(data)
print(checkData)
showIsright(checkData)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化