代码拉取完成,页面将自动刷新
同步操作将从 天昊/时间序列分析 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import torch
import torch.nn as nn
from sklearn import preprocessing
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from function_relate import *
import os
class CNNnetwork(nn.Module):
'''
params: [Conv_output,window_size,Linear_output]
'''
def __init__(self, Conv_output, window_size, Linear_output):
super().__init__()
self.Conv_output = Conv_output
self.window_size = window_size
self.Linear_output = Linear_output
self.conv1d = nn.Conv1d(1, Conv_output, kernel_size=2) # 1:输入维度 64:表示卷积后输出 这个可以自己设置
# kernel_size:卷积核的大小
self.relu = nn.ReLU(inplace=True)
self.fc1 = nn.Linear(Conv_output * (window_size - 1), Linear_output) # 全连接层 (64*11) 704 是输入特征数 (50)是输出特征数
self.fc2 = nn.Linear(Linear_output, 1)
def forward(self, x):
# 该模型的网络结构为 一维卷积层 -> Relu层 -> Flatten -> 全连接层1 -> 全连接层2
x = self.conv1d(x)
x = self.relu(x)
x = x.view(-1)
x = self.fc1(x)
x = self.fc2(x)
return x
def CNN_model(train, test, params):
"""
:param train: 训练集
:param test: 测试集
:param params: [Conv_output,window_size,Linear_output,lr]
:return:
"""
Conv_output = params[0]
window_size = params[1]
Linear_output = params[2]
lr = params[3]
# 首先对数据进行最大最小归一化归一化
min_max_scaler = preprocessing.MinMaxScaler()
Train = np.array(train).reshape(-1, 1)
train_scaled = min_max_scaler.fit_transform(Train)
train_scaled = torch.FloatTensor(train_scaled).view(-1) # 变成tensor类型
# 进行窗口处理
train_X, train_Y = [], []
for i in range(0, len(train_scaled) - window_size):
train_X.append(train_scaled[i:i + window_size])
train_Y.append(train_scaled[i + window_size])
# 设置随机种子
torch.manual_seed(101)
model = CNNnetwork(Conv_output, window_size, Linear_output)
criterion = nn.MSELoss() # 均方差损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=lr) # 优化器为adam 学习率为0.001
epochs = 100 # 训练批次为100次
model.train() # 用于将模型设置为训练模式
start_time = time.time() # 记录当前时间,用于计算训练过程的耗时
for epoch in range(epochs):
for i in range(len(train_X)):
# 每次更新参数前都梯度归零和初始化
optimizer.zero_grad()
# 注意这里要对样本进行reshape,转换成conv1d的input size(batch size, channel, series length)
y_pred = model(train_X[i].reshape(1, 1, -1))
loss = criterion(y_pred, train_Y[i])
loss.backward()
optimizer.step()
# print(f'Epoch: {epoch + 1:2} Loss: {loss.item():10.8f}')
print(f'\nDuration: {time.time() - start_time:.0f} seconds')
test_extended = train[-1 * window_size:] + test
test_data = []
for i in test_extended:
try:
test_data.append(i[0])
except:
test_data.append(i)
test_data = np.array(test_data).reshape(-1, 1)
min_max_scaler = preprocessing.MinMaxScaler()
test_scaled = min_max_scaler.fit_transform(test_data)
test_scaled = torch.FloatTensor(test_scaled).view(-1)
test_X, test_Y = [], []
for i in range(0, len(test_scaled) - window_size):
test_X.append(test_scaled[i:i + window_size])
test_Y.append(test_scaled[i + window_size])
# 设置成eval模式
model.eval()
predictions = []
# 循环的每一步表示向时间序列向后滑动一格
for i in range(len(test_X)):
with torch.no_grad():
predictions.append(model(test_X[i].reshape(1, 1, -1)).item())
predictions_rescaled = min_max_scaler.inverse_transform(np.array(predictions).reshape(-1, 1))
return predictions_rescaled
# 参数选择
def CnnParamSelect(dataset,train_len):
'''
:param dataset: 数据集
:param train_len: 训练集的长度
:return:
'''
train,test = DataDivi(dataset,train_len)
min_value = float('inf') # 设置最小值设置为无穷大
Conv_output = 100
window_size = len(series) - train_len
Linear_output = 100
lr = 0.001
for i in range(2,Conv_output,2):
for j in range(2,window_size):
for z in range(2,Linear_output,2):
params = [i,j,z,lr]
CnnPredicted = CNN_model(train,test,params).reshape(-1)
# 计算 MAE、MSE、MAPE
mse_value = mse(test,CnnPredicted)
if mse_value < min_value:
min_value = mse_value
print('Conv_output=',i,'window_size=',j,'Linear_output=',z,'lr=',lr)
getErrIndex('CNN',test,CnnPredicted)
def CnnComProcess(datasetName,dataset,train_len,params):
"""
实现Cnn的功能函数
"""
# 划分train,test
train,test = dataset[:train_len],dataset[train_len:]
# 存入测试集
insert_file(test,datasetName,'test')
# 首先需要判断是否有文件LSTM.txt
if os.path.isfile(f'data/{datasetName}/CNN.txt'):
predicted = read_file(datasetName,'CNN')
min_value = mse(test,predicted) # 如果存在CNN.txt文件,则MSE最小值设置为 文件中的
else:
min_value = float('inf') # 否则,最小值设置为无穷大
CnnPredicted = CNN_model(train,test,params).reshape(-1)
insert_file(CnnPredicted, datasetName, 'CNN')
# 绘制Cnn的测试集预测图
CnnPredicted = read_file(datasetName,'CNN')
PreShow(test,CnnPredicted)
if __name__ == '__main__':
file = pd.read_csv('data/SunspotData.csv')
Sunspot = file['SUNACTIVITY']
series = Sunspot.tolist()
# 绘制原始数据图
# OriginalShow(series)
train_len = 221
# 循环获得最好参数
# CnnParamSelect(series,train_len)
params = [100,12,50,0.001] # [Conv_output,window_size,Linear_output,lr]
# train,test = DataDivi(series,train_len)
# CNN_model(train,test,params)
CnnComProcess('SunspotDataResult',series,train_len,params)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。