加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
CNN.py 6.15 KB
一键复制 编辑 原始数据 按行查看 历史
itcast 提交于 2024-03-13 16:08 . time series init
import torch
import torch.nn as nn
from sklearn import preprocessing
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from function_relate import *
import os
class CNNnetwork(nn.Module):
'''
params: [Conv_output,window_size,Linear_output]
'''
def __init__(self, Conv_output, window_size, Linear_output):
super().__init__()
self.Conv_output = Conv_output
self.window_size = window_size
self.Linear_output = Linear_output
self.conv1d = nn.Conv1d(1, Conv_output, kernel_size=2) # 1:输入维度 64:表示卷积后输出 这个可以自己设置
# kernel_size:卷积核的大小
self.relu = nn.ReLU(inplace=True)
self.fc1 = nn.Linear(Conv_output * (window_size - 1), Linear_output) # 全连接层 (64*11) 704 是输入特征数 (50)是输出特征数
self.fc2 = nn.Linear(Linear_output, 1)
def forward(self, x):
# 该模型的网络结构为 一维卷积层 -> Relu层 -> Flatten -> 全连接层1 -> 全连接层2
x = self.conv1d(x)
x = self.relu(x)
x = x.view(-1)
x = self.fc1(x)
x = self.fc2(x)
return x
def CNN_model(train, test, params):
"""
:param train: 训练集
:param test: 测试集
:param params: [Conv_output,window_size,Linear_output,lr]
:return:
"""
Conv_output = params[0]
window_size = params[1]
Linear_output = params[2]
lr = params[3]
# 首先对数据进行最大最小归一化归一化
min_max_scaler = preprocessing.MinMaxScaler()
Train = np.array(train).reshape(-1, 1)
train_scaled = min_max_scaler.fit_transform(Train)
train_scaled = torch.FloatTensor(train_scaled).view(-1) # 变成tensor类型
# 进行窗口处理
train_X, train_Y = [], []
for i in range(0, len(train_scaled) - window_size):
train_X.append(train_scaled[i:i + window_size])
train_Y.append(train_scaled[i + window_size])
# 设置随机种子
torch.manual_seed(101)
model = CNNnetwork(Conv_output, window_size, Linear_output)
criterion = nn.MSELoss() # 均方差损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=lr) # 优化器为adam 学习率为0.001
epochs = 100 # 训练批次为100次
model.train() # 用于将模型设置为训练模式
start_time = time.time() # 记录当前时间,用于计算训练过程的耗时
for epoch in range(epochs):
for i in range(len(train_X)):
# 每次更新参数前都梯度归零和初始化
optimizer.zero_grad()
# 注意这里要对样本进行reshape,转换成conv1d的input size(batch size, channel, series length)
y_pred = model(train_X[i].reshape(1, 1, -1))
loss = criterion(y_pred, train_Y[i])
loss.backward()
optimizer.step()
# print(f'Epoch: {epoch + 1:2} Loss: {loss.item():10.8f}')
print(f'\nDuration: {time.time() - start_time:.0f} seconds')
test_extended = train[-1 * window_size:] + test
test_data = []
for i in test_extended:
try:
test_data.append(i[0])
except:
test_data.append(i)
test_data = np.array(test_data).reshape(-1, 1)
min_max_scaler = preprocessing.MinMaxScaler()
test_scaled = min_max_scaler.fit_transform(test_data)
test_scaled = torch.FloatTensor(test_scaled).view(-1)
test_X, test_Y = [], []
for i in range(0, len(test_scaled) - window_size):
test_X.append(test_scaled[i:i + window_size])
test_Y.append(test_scaled[i + window_size])
# 设置成eval模式
model.eval()
predictions = []
# 循环的每一步表示向时间序列向后滑动一格
for i in range(len(test_X)):
with torch.no_grad():
predictions.append(model(test_X[i].reshape(1, 1, -1)).item())
predictions_rescaled = min_max_scaler.inverse_transform(np.array(predictions).reshape(-1, 1))
return predictions_rescaled
# 参数选择
def CnnParamSelect(dataset,train_len):
'''
:param dataset: 数据集
:param train_len: 训练集的长度
:return:
'''
train,test = DataDivi(dataset,train_len)
min_value = float('inf') # 设置最小值设置为无穷大
Conv_output = 100
window_size = len(series) - train_len
Linear_output = 100
lr = 0.001
for i in range(2,Conv_output,2):
for j in range(2,window_size):
for z in range(2,Linear_output,2):
params = [i,j,z,lr]
CnnPredicted = CNN_model(train,test,params).reshape(-1)
# 计算 MAE、MSE、MAPE
mse_value = mse(test,CnnPredicted)
if mse_value < min_value:
min_value = mse_value
print('Conv_output=',i,'window_size=',j,'Linear_output=',z,'lr=',lr)
getErrIndex('CNN',test,CnnPredicted)
def CnnComProcess(datasetName,dataset,train_len,params):
"""
实现Cnn的功能函数
"""
# 划分train,test
train,test = dataset[:train_len],dataset[train_len:]
# 存入测试集
insert_file(test,datasetName,'test')
# 首先需要判断是否有文件LSTM.txt
if os.path.isfile(f'data/{datasetName}/CNN.txt'):
predicted = read_file(datasetName,'CNN')
min_value = mse(test,predicted) # 如果存在CNN.txt文件,则MSE最小值设置为 文件中的
else:
min_value = float('inf') # 否则,最小值设置为无穷大
CnnPredicted = CNN_model(train,test,params).reshape(-1)
insert_file(CnnPredicted, datasetName, 'CNN')
# 绘制Cnn的测试集预测图
CnnPredicted = read_file(datasetName,'CNN')
PreShow(test,CnnPredicted)
if __name__ == '__main__':
file = pd.read_csv('data/SunspotData.csv')
Sunspot = file['SUNACTIVITY']
series = Sunspot.tolist()
# 绘制原始数据图
# OriginalShow(series)
train_len = 221
# 循环获得最好参数
# CnnParamSelect(series,train_len)
params = [100,12,50,0.001] # [Conv_output,window_size,Linear_output,lr]
# train,test = DataDivi(series,train_len)
# CNN_model(train,test,params)
CnnComProcess('SunspotDataResult',series,train_len,params)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化