代码拉取完成,页面将自动刷新
from torch.utils.data import Dataset, DataLoader, random_split
from torch import Tensor, nn, optim
import torch
import csv
import datetime
class BaiduDataset(Dataset):
def __init__(self, sequence_length = 6):
self.baidu_data_origin = []
with open('data.csv', 'r') as fp:
csv_reader = csv.DictReader(fp)
for row in csv_reader:
self.baidu_data_origin.append(float(row['y']))
# 格式化原始数据
self.formatted_baidu_data = []
baidu_left_data_y = 0.05 # 2018.01, 5%
baidu_left_data_x = datetime.date(2018, 1, 1) # 2018.01
baidu_right_data_y = 0.052 # 2023.06, 5.2%
baidu_right_data_x = datetime.date(2023, 6, 1) # 2023.06
k = (baidu_right_data_y - baidu_left_data_y) / (self.baidu_data_origin[-1] - self.baidu_data_origin[0])
b = baidu_right_data_y - k * self.baidu_data_origin[-1]
for idx, y in enumerate(self.baidu_data_origin):
year = int(idx / 12)
month = idx % 12
date_value = baidu_left_data_x.replace(year=baidu_left_data_x.year+year, month=baidu_left_data_x.month+month)
self.formatted_baidu_data.append([date_value, k * y + b])
if self.formatted_baidu_data[-1][0] != baidu_right_data_x:
raise ValueError('日期的值和定义的不一致')
self.sequence_length = sequence_length
def __len__(self):
return len(self.formatted_baidu_data) - self.sequence_length
def __getitem__(self, idx):
seq_in = self.formatted_baidu_data[idx:(idx + self.sequence_length)]
seq_out = self.formatted_baidu_data[idx + self.sequence_length]
network_input = [[seq[1]] for seq in seq_in]
network_output = [seq_out[1]]
return Tensor(network_input), Tensor(network_output)
class NeuralNetwork(nn.Module):
def __init__(self, input_item_size = 1, output_item_size = 1):
super().__init__()
self.input_item_size = input_item_size
self.output_item_size = output_item_size
# LSTM Layer:
num_layers = 2
self.lstm = nn.LSTM(input_item_size, output_item_size, num_layers, batch_first=True)
self.linear = nn.Linear(output_item_size, output_item_size)
def forward(self, input_data):
lstm_out, _ = self.lstm(input_data)
return self.linear(lstm_out[:, -1, :])
def show_dataset():
batch_size = 1
dataset_object = BaiduDataset()
data_loader = DataLoader(dataset_object, batch_size=batch_size)
for x, y in data_loader:
print('x=')
print(x[0])
print('y=')
print(y[0])
def train(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset)
model.train()
for batch, (X, y) in enumerate(dataloader):
pred = model(X)
loss = loss_fn(pred, y)
loss.backward()
optimizer.step()
optimizer.zero_grad()
if batch % 6 == 0 or (batch) * dataloader.batch_size + len(X) == size:
loss, current = loss.item(), (batch) * dataloader.batch_size + len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def test(dataloader, model, loss_fn):
model.eval()
size = len(dataloader.dataset)
num_batches = len(dataloader)
test_loss = 0
with torch.no_grad():
for X, y in dataloader:
pred = model(X)
test_loss += loss_fn(pred, y).item()
test_loss /= num_batches
print(f"Test Avg loss: {test_loss:>8f} \n")
def run_train():
dataset_object = BaiduDataset()
#train_data_size = int(len(dataset_object) * 0.90)
#test_data_size = len(dataset_object) - train_data_size
#train_data, test_data = random_split(dataset_object, [train_data_size, test_data_size])
train_data = test_data = dataset_object
batch_size = 100
train_data_loader = DataLoader(train_data, batch_size=batch_size)
test_data_loader = DataLoader(test_data, batch_size=batch_size)
model = NeuralNetwork()
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 2000
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
train(train_data_loader, model, loss_fn, optimizer)
test(test_data_loader, model, loss_fn)
print("Done!")
# 下面的代码用来根据测试集的一个样本接着循环预测后面,不需要可以return退出
# return
model.eval()
output_series = []
with torch.no_grad():
# 使用测试集最后一个样本输入
for x, y in test_data_loader:
out = model(Tensor(x[-1]).view(1, -1, len(x[-1][-1])))
break
for i in range(len(x[-1][1:])):
output_series.append(x[-1][i + 1].item())
output_series.append(out[-1][-1].item())
for _ in range(100):
input_data = Tensor(output_series).view(1, -1, 1)
out = model(input_data)
output_series.append(out[-1][-1].item())
with open('test.csv', 'w') as fp:
fp.write("时间序列\n")
for o in output_series:
fp.write(str(o) + "\n")
if __name__ == '__main__':
run_train()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。