900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > 深度强化学习笔记——DDPG原理及实现(pytorch)

深度强化学习笔记——DDPG原理及实现(pytorch)

时间:2021-12-15 08:30:36

相关推荐

深度强化学习笔记——DDPG原理及实现(pytorch)

概要

本文是对深度强化学习中的DDPG(Deep Deterministic Policy Gradient)算法相关原理和代码实现的介绍。

有任何不足之处,望指正!

DDPG算法原理(Deep Deterministic Policy Gradient)

DDPG算法是基于DPG算法所提出的,属于无模型中的actor-critic方法中的off-policy算法(因为动作不是直接在交互的过程中更新的),之后学者又在此基础上提出了适合于多智能体环境的MADDPG (Multi Agent DDPG)算法。

可以说DDPG是在DQN算法的基础之上进行改进的,DQN存在的问题就在于它只能解决含有离散和低维度的动作空间的问题。而一般的物理问题或控制问题中,动作空间是连续的,比如控制一个倒立摆问题(gym里对应的是"Pendulum-v0")。学者在借鉴DQN算法之后提出了DDPG算法,该算法的元素有以下几个:

actor和critic这两个部分分别由训练的网络和目标网络构成,相当于是总共含有4个网络。与DQN中一样,DDPG中也引入了experience buffer的机制,用于存储agent与环境交互的数据( ( s t , a t , r t , s t + 1 ) (s_{t},a_{t},r_{t}, s_{t+1}) (st​,at​,rt​,st+1​))与DQN目标网络之间延迟复制现有网络不同的是,DDPG中采用soft update, 也就是缓慢地更新两个目标网络中的参数在神经网络中加入batch normalization的技巧(以上几点说明了算法模型如何学习的过程,除此之外,在强化学习中,还必须要包括智能体如何进行探索的方法)DDPG算法采用向动作网络的输出中添加随机噪声的方式实现exploration。

下图是DDPG的伪代码示意:

首先是定义actor和critic的这两个网络结构并初始化网络中的参数(网络模型均一致),之后定义经验池的存放和采样过程(ER buffer),最后是将完整的DDPG算法过程放到一个大的类中(面向对象的定义方法比较方便)。在伪代码中最需要关注的就是这两种网络分别是怎么更新参数的。对于actor网络来说,它的更新方法是基于梯度上升的。该网络的损失函数就是从critic网络中获取的Q值的平均值,在实现的过程中,需要加入负号,即最小化损失函数,来与深度学习框架保持一致。用数学公式表示其损失函数就是:

J ( θ μ ) = E [ Q ( s , a ∣ θ Q ) ∣ s = s t , a = μ ( s t ∣ θ μ ) ] J(\theta^{\mu})= \mathbb{E}[Q(s,a|\theta^{Q})|_{s=s_t, a=\mu(s_t|\theta^{\mu})}] J(θμ)=E[Q(s,a∣θQ)∣s=st​,a=μ(st​∣θμ)​]

在实际编程的过程中也是用的该损失函数,而不是之间将伪代码中的该函数的梯度进行实现。

而critic网络的参数更新方式是与DQN算法一样的,就是通过最小化目标网络与现有网络之间的均方误差来更新现有网络的参数,只不过唯一不同的地方在于目标网络的参数在DDPG算法中是缓慢更新的,而不是在DQN中每隔N步将现有网络的参数直接复制过来。

DDPG算法实现

本节对DDPG的算法进行实现,简化了一下此repo中的DDPG代码。此代码是用gym环境中的倒立摆环境,让agent学会如何让摆静止在倒挂的状态。该程序中我已经加入了必要的注释,目前训练结果不是很好,还需要继续调参和调试。

存在问题的地方:

应该先将智能体与环境交互的数据存储到储存器,且等到储存器的数据量被填充完之后再进行训练,第一个版本的代码我认为问题就在这,所以导致训练结果非常差。最新的代码参考了这篇文章,如下:

import torchimport torch.nn as nnimport torch.nn.functional as Fimport numpy as npimport gymimport time##################### hyper parameters ####################EPISODES = 200EP_STEPS = 200LR_ACTOR = 0.001LR_CRITIC = 0.002GAMMA = 0.9TAU = 0.01MEMORY_CAPACITY = 10000BATCH_SIZE = 32RENDER = FalseENV_NAME = 'Pendulum-v0'########################## DDPG Framework ######################class ActorNet(nn.Module): # define the network structure for actor and criticdef __init__(self, s_dim, a_dim):super(ActorNet, self).__init__()self.fc1 = nn.Linear(s_dim, 30)self.fc1.weight.data.normal_(0, 0.1) # initialization of FC1self.out = nn.Linear(30, a_dim)self.out.weight.data.normal_(0, 0.1) # initilizaiton of OUTdef forward(self, x):x = self.fc1(x)x = F.relu(x)x = self.out(x)x = torch.tanh(x)actions = x * 2 # for the game "Pendulum-v0", action range is [-2, 2]return actionsclass CriticNet(nn.Module):def __init__(self, s_dim, a_dim):super(CriticNet, self).__init__()self.fcs = nn.Linear(s_dim, 30)self.fcs.weight.data.normal_(0, 0.1)self.fca = nn.Linear(a_dim, 30)self.fca.weight.data.normal_(0, 0.1)self.out = nn.Linear(30, 1)self.out.weight.data.normal_(0, 0.1)def forward(self, s, a):x = self.fcs(s)y = self.fca(a)actions_value = self.out(F.relu(x+y))return actions_valueclass DDPG(object):def __init__(self, a_dim, s_dim, a_bound):self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_boundself.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)self.pointer = 0 # serves as updating the memory data # Create the 4 network objectsself.actor_eval = ActorNet(s_dim, a_dim)self.actor_target = ActorNet(s_dim, a_dim)self.critic_eval = CriticNet(s_dim, a_dim)self.critic_target = CriticNet(s_dim, a_dim)# create 2 optimizers for actor and criticself.actor_optimizer = torch.optim.Adam(self.actor_eval.parameters(), lr=LR_ACTOR)self.critic_optimizer = torch.optim.Adam(self.critic_eval.parameters(), lr=LR_CRITIC)# Define the loss function for critic network updateself.loss_func = nn.MSELoss()def store_transition(self, s, a, r, s_): # how to store the episodic data to buffertransition = np.hstack((s, a, [r], s_))index = self.pointer % MEMORY_CAPACITY # replace the old data with new data self.memory[index, :] = transitionself.pointer += 1def choose_action(self, s):# print(s)s = torch.unsqueeze(torch.FloatTensor(s), 0)return self.actor_eval(s)[0].detach()def learn(self):# softly update the target networksfor x in self.actor_target.state_dict().keys():eval('self.actor_target.' + x + '.data.mul_((1-TAU))')eval('self.actor_target.' + x + '.data.add_(TAU*self.actor_eval.' + x + '.data)')for x in self.critic_target.state_dict().keys():eval('self.critic_target.' + x + '.data.mul_((1-TAU))')eval('self.critic_target.' + x + '.data.add_(TAU*self.critic_eval.' + x + '.data)') # sample from buffer a mini-batch dataindices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)batch_trans = self.memory[indices, :]# extract data from mini-batch of transitions including s, a, r, s_batch_s = torch.FloatTensor(batch_trans[:, :self.s_dim])batch_a = torch.FloatTensor(batch_trans[:, self.s_dim:self.s_dim + self.a_dim])batch_r = torch.FloatTensor(batch_trans[:, -self.s_dim - 1: -self.s_dim])batch_s_ = torch.FloatTensor(batch_trans[:, -self.s_dim:])# make action and evaluate its action valuesa = self.actor_eval(batch_s)q = self.critic_eval(batch_s, a)actor_loss = -torch.mean(q)# optimize the loss of actor networkself.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# compute the target Q value using the information of next statea_target = self.actor_target(batch_s_)q_tmp = self.critic_target(batch_s_, a_target)q_target = batch_r + GAMMA * q_tmp# compute the current q value and the lossq_eval = self.critic_eval(batch_s, batch_a)td_error = self.loss_func(q_target, q_eval)# optimize the loss of critic networkself.critic_optimizer.zero_grad()td_error.backward()self.critic_optimizer.step()############################### Training ####################################### Define the env in gymenv = gym.make(ENV_NAME)env = env.unwrappedenv.seed(1)s_dim = env.observation_space.shape[0]a_dim = env.action_space.shape[0]a_bound = env.action_space.higha_low_bound = env.action_space.lowddpg = DDPG(a_dim, s_dim, a_bound)var = 3 # the controller of exploration which will decay during training processt1 = time.time()for i in range(EPISODES):s = env.reset()ep_r = 0for j in range(EP_STEPS):if RENDER: env.render()# add explorative noise to actiona = ddpg.choose_action(s)a = np.clip(np.random.normal(a, var), a_low_bound, a_bound)s_, r, done, info = env.step(a)ddpg.store_transition(s, a, r / 10, s_) # store the transition to memoryif ddpg.pointer > MEMORY_CAPACITY:var *= 0.9995 # decay the exploration controller factorddpg.learn()s = s_ep_r += rif j == EP_STEPS - 1:print('Episode: ', i, ' Reward: %i' % (ep_r), 'Explore: %.2f' % var)if ep_r > -300 : RENDER = Truebreakprint('Running time: ', time.time() - t1)

以下内容请跳过,这里只是记录一下训练结果不好的代码

import torchimport torch.nn as nn import torch.nn.functional as Fimport torch.optim as optim# from tensorboardX import SummaryWriterimport gymimport numpy as npimport randomfrom torch.distributions import Normalfrom itertools import count# Define hyperparametersENV_NAME = "Pendulum-v0" # gym envBATCH_SIZE = 100 # mini-batch size when sampled from bufferMEM_CAPACTIY = 10000 # Replay buffer sizeEPISODES = 200STEPS = 200GAMMA = 0.9 # discount factorLEARNING_RATE = 1e-3 # learning rate of optimizerTAU = 0.01 # update the target net parameter smoothlyRANDOM_SEED = 9527 # fix the random seed# SAMPLE_FREQ = 2000 NOISE_VAR = 0.1RENDER = Falsedevice = 'cuda' if torch.cuda.is_available() else 'cpu' # use GPU to trainprint(device)env = gym.make(ENV_NAME) ACTION_DIM = env.action_space.shape[0] #dim=1STATE_DIM = env.observation_space.shape[0] # dim=3ACTION_BOUND = env.action_space.high[0] # action interval [-2,2]np.random.seed(RANDOM_SEED) # fix the random seeddirectory = '.\\exp\\'class ReplayBuffer():def __init__(self, max_size=MEM_CAPACTIY):self.storage = [] #empty listself.max_size = max_sizeself.pointer= 0def store_transition(self, transition):if len(self.storage) == self.max_size: # replace the old dataself.storage[self.pointer] = transitionself.pointer = (self.pointer + 1) % self.max_size # point to next positionelse:self.storage.append(transition)def sample(self, batch_size):# Define the array of indices for random sampling from storage# the size of this array equals to batch_sizeind_array = np.random.randint(0, len(self.storage),size=batch_size)s, a, r, s_, d = [], [], [], [], []for i in ind_array:S, A, R, S_, D = self.storage[i]s.append(np.array(S, copy=False))a.append(np.array(A, copy=False))r.append(np.array(R, copy=False))s_.append(np.array(S_, copy=False))d.append(np.array(D, copy=False))return np.array(s), np.array(a), np.array(r).reshape(-1, 1), np.array(s_), np.array(d).reshape(-1, 1)class Actor(nn.Module):def __init__(self, state_dim, action_dim, max_action):super(Actor, self).__init__()self.l1 = nn.Linear(state_dim, 30)self.l1.weight.data.normal_(0, 0.3) # initializationself.l2 = nn.Linear(30, action_dim)self.l2.weight.data.normal_(0, 0.3) # initializationself.max_action = max_actiondef forward(self, x):x = F.relu(self.l1(x))x = self.max_action * torch.tanh(self.l2(x)) # the range of tanh is [-1, 1]return xclass Critic(nn.Module):def __init__(self, state_dim, action_dim):super(Critic,self).__init__()self.l1 = nn.Linear(state_dim + action_dim, 30)self.l1.weight.data.normal_(0, 0.3) # initializationself.l2 = nn.Linear(30, 1)self.l2.weight.data.normal_(0, 0.3) # initializationdef forward(self, x, a):x = F.relu(self.l1(torch.cat([x, a], 1)))x = self.l2(x)return xclass DDPG(object):def __init__(self, state_dim, action_dim, max_action):# network, optimizer for actorself.actor = Actor(state_dim, action_dim, max_action).to(device)self.actor_target = Actor(state_dim, action_dim, max_action).to(device)self.actor_target.load_state_dict(self.actor.state_dict())self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=LEARNING_RATE)# network, optimizer for criticself.critic = Critic(state_dim, action_dim).to(device)self.critic_target = Critic(state_dim, action_dim).to(device)self.critic_target.load_state_dict(self.critic.state_dict())self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=LEARNING_RATE)# create replay buffer objectself.replay_buffer = ReplayBuffer()def select_action(self, state):# select action based on actor network and add some noise on it for explorationstate = torch.FloatTensor(state.reshape(1, -1)).to(device)action = self.actor(state).cpu().data.numpy().flatten()noise = np.random.normal(0, NOISE_VAR, size=ACTION_DIM).clip(env.action_space.low, env.action_space.high)action = action + noisereturn actiondef update(self):for i in range(EPISODES):s, a, r, s_, d = self.replay_buffer.sample(BATCH_SIZE)# transfer these tensors to GPUstate = torch.FloatTensor(s).to(device)action = torch.FloatTensor(a).to(device)reward = torch.FloatTensor(r).to(device)next_state = torch.FloatTensor(s_).to(device)done = torch.FloatTensor(d).to(device)# compute the target Q valuetarget_Q = self.critic_target(next_state, self.actor_target(next_state))target_Q = reward + (done * GAMMA * target_Q).detach()# Get the current Q valuecurrent_Q = self.critic(state, action)# compute critic loss by MSEcritic_loss = F.mse_loss(current_Q, target_Q)# use optimizer to update the critic networkself.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# compute the actor loss and its gradient to update the parametersactor_loss = -self.critic(state,self.actor(state)).mean()self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# update the target network of actor and critic# zip() constructs tuple from iterable objectfor param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data)for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data)def save(self):torch.save(self.actor.state_dict(), directory + 'actor.pth')torch.save(self.critic.state_dict(), directory + 'critic.pth')def load(self):self.actor.load_state_dict(torch.load(directory + 'actor.pth'))self.critic.load_state_dict(torch.load(directory + 'critic.pth'))def main():agent = DDPG(STATE_DIM, ACTION_DIM, ACTION_BOUND)ep_r = 0total_step = 0for i in range(EPISODES):total_reward = 0step = 0state = env.reset()for t in count():if RENDER == True and i > 100: env.render() # Render is unnecessaryaction = agent.select_action(state)# get the next transition by using current actionnext_state, reward, done, info = env.step(action)# store the transition to the bufferagent.replay_buffer.store_transition((state, action, reward / 10, next_state, np.float(done)))state = next_stateif done:breakstep += 1total_reward += rewardtotal_step += step+1print("Total T:{} Episode: \t{} Total Reward: \t{:0.2f}".format(total_step, i, total_reward))agent.update()#NOISE_VAR *= 0.99env.close()if __name__ == '__main__':main()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。