加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ddpg.py 3.80 KB
一键复制 编辑 原始数据 按行查看 历史
张利峰 提交于 2022-05-14 13:25 . update
__author__ = 'zhenhang.sun@gmail.com'
__version__ = '1.0.0'
import gym
import math
import random
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
class Actor(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(Actor, self).__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, output_size)
def forward(self, s):
x = F.relu(self.linear1(s))
x = F.relu(self.linear2(x))
x = torch.tanh(self.linear3(x))
return x
class Critic(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, output_size)
def forward(self, s, a):
x = torch.cat([s, a], 1)
# print(x)
x = F.relu(self.linear1(x))
# print(x)
x = F.relu(self.linear2(x))
x = self.linear3(x)
return x
class Agent(object):
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
s_dim = self.env.observation_space.shape[0]
a_dim = self.env.action_space.shape[0]
print(s_dim, a_dim)
self.actor = Actor(s_dim, 256, a_dim)
self.actor_target = Actor(s_dim, 256, a_dim)
self.critic = Critic(s_dim + a_dim, 256, a_dim)
self.critic_target = Critic(s_dim + a_dim, 256, a_dim)
self.actor_optim = optim.Adam(self.actor.parameters(), lr=self.actor_lr)
self.critic_optim = optim.Adam(self.critic.parameters(), lr=self.critic_lr)
self.buffer = []
self.actor_target.load_state_dict(self.actor.state_dict())
self.critic_target.load_state_dict(self.critic.state_dict())
self.learn_steps = 0
def act(self, s0):
s0 = torch.tensor(s0, dtype=torch.float).unsqueeze(0)
a0 = self.actor(s0).squeeze(0).detach().numpy()
# print(a0)
return a0
def put(self, *transition):
if len(self.buffer) == self.capacity:
self.buffer.pop(0)
self.buffer.append(transition)
def learn(self):
self.learn_steps += 1
if len(self.buffer) < self.batch_size:
return
samples = random.sample(self.buffer, self.batch_size)
# print(samples)
s0, a0, r1, s1 = zip(*samples)
s0 = torch.tensor(s0, dtype=torch.float)
a0 = torch.tensor(a0, dtype=torch.float)
r1 = torch.tensor(r1, dtype=torch.float).view(self.batch_size, -1)
s1 = torch.tensor(s1, dtype=torch.float)
def critic_learn():
a1 = self.actor_target(s1).detach()
# print(a1)
# print(self.critic_target(s1, a1))
y_true = r1 + self.gamma * self.critic_target(s1, a1).detach()
y_pred = self.critic(s0, a0)
loss_fn = nn.MSELoss()
loss = loss_fn(y_pred, y_true)
self.critic_optim.zero_grad()
loss.backward()
self.critic_optim.step()
def actor_learn():
loss = -torch.mean(self.critic(s0, self.actor(s0)))
self.actor_optim.zero_grad()
loss.backward()
self.actor_optim.step()
def soft_update(net_target, net, tau):
for target_param, param in zip(net_target.parameters(), net.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
critic_learn()
actor_learn()
soft_update(self.critic_target, self.critic, self.tau)
soft_update(self.actor_target, self.actor, self.tau)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化