本文是对深度强化学习中的DDPG(Deep Deterministic Policy Gradient)算法相关原理和代码实现的介绍。


DDPG算法原理(Deep Deterministic Policy Gradient)

DDPG算法是基于DPG算法所提出的,属于无模型中的actor-critic方法中的off-policy算法(因为动作不是直接在交互的过程中更新的),之后学者又在此基础上提出了适合于多智能体环境的MADDPG (Multi Agent DDPG)算法。


actor和critic这两个部分分别由训练的网络和目标网络构成,相当于是总共含有4个网络。与DQN中一样,DDPG中也引入了experience buffer的机制,用于存储agent与环境交互的数据( ( s t , a t , r t , s t + 1 ) (s_{t},a_{t},r_{t}, s_{t+1}) (st​,at​,rt​,st+1​))与DQN目标网络之间延迟复制现有网络不同的是,DDPG中采用soft update, 也就是缓慢地更新两个目标网络中的参数在神经网络中加入batch normalization的技巧(以上几点说明了算法模型如何学习的过程,除此之外,在强化学习中,还必须要包括智能体如何进行探索的方法)DDPG算法采用向动作网络的输出中添加随机噪声的方式实现exploration。


首先是定义actor和critic的这两个网络结构并初始化网络中的参数(网络模型均一致),之后定义经验池的存放和采样过程(ER buffer),最后是将完整的DDPG算法过程放到一个大的类中(面向对象的定义方法比较方便)。在伪代码中最需要关注的就是这两种网络分别是怎么更新参数的。对于actor网络来说,它的更新方法是基于梯度上升的。该网络的损失函数就是从critic网络中获取的Q值的平均值,在实现的过程中,需要加入负号,即最小化损失函数,来与深度学习框架保持一致。用数学公式表示其损失函数就是:

J ( θ μ ) = E [ Q ( s , a ∣ θ Q ) ∣ s = s t , a = μ ( s t ∣ θ μ ) ] J(\theta^{\mu})= \mathbb{E}[Q(s,a|\theta^{Q})|_{s=s_t, a=\mu(s_t|\theta^{\mu})}] J(θμ)=E[Q(s,a∣θQ)∣s=st​,a=μ(st​∣θμ)​]







import torchimport torch.nn as nnimport torch.nn.functional as Fimport numpy as npimport gymimport time##################### hyper parameters ####################EPISODES = 200EP_STEPS = 200LR_ACTOR = 0.001LR_CRITIC = 0.002GAMMA = 0.9TAU = 0.01MEMORY_CAPACITY = 10000BATCH_SIZE = 32RENDER = FalseENV_NAME = 'Pendulum-v0'########################## DDPG Framework ######################class ActorNet(nn.Module): # define the network structure for actor and criticdef __init__(self, s_dim, a_dim):super(ActorNet, self).__init__()self.fc1 = nn.Linear(s_dim, 30)self.fc1.weight.data.normal_(0, 0.1) # initialization of FC1self.out = nn.Linear(30, a_dim)self.out.weight.data.normal_(0, 0.1) # initilizaiton of OUTdef forward(self, x):x = self.fc1(x)x = F.relu(x)x = self.out(x)x = torch.tanh(x)actions = x * 2 # for the game "Pendulum-v0", action range is [-2, 2]return actionsclass CriticNet(nn.Module):def __init__(self, s_dim, a_dim):super(CriticNet, self).__init__()self.fcs = nn.Linear(s_dim, 30)self.fcs.weight.data.normal_(0, 0.1)self.fca = nn.Linear(a_dim, 30)self.fca.weight.data.normal_(0, 0.1)self.out = nn.Linear(30, 1)self.out.weight.data.normal_(0, 0.1)def forward(self, s, a):x = self.fcs(s)y = self.fca(a)actions_value = self.out(F.relu(x+y))return actions_valueclass DDPG(object):def __init__(self, a_dim, s_dim, a_bound):self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_boundself.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)self.pointer = 0 # serves as updating the memory data # Create the 4 network objectsself.actor_eval = ActorNet(s_dim, a_dim)self.actor_target = ActorNet(s_dim, a_dim)self.critic_eval = CriticNet(s_dim, a_dim)self.critic_target = CriticNet(s_dim, a_dim)# create 2 optimizers for actor and criticself.actor_optimizer = torch.optim.Adam(self.actor_eval.parameters(), lr=LR_ACTOR)self.critic_optimizer = torch.optim.Adam(self.critic_eval.parameters(), lr=LR_CRITIC)# Define the loss function for critic network updateself.loss_func = nn.MSELoss()def store_transition(self, s, a, r, s_): # how to store the episodic data to buffertransition = np.hstack((s, a, [r], s_))index = self.pointer % MEMORY_CAPACITY # replace the old data with new data self.memory[index, :] = transitionself.pointer += 1def choose_action(self, s):# print(s)s = torch.unsqueeze(torch.FloatTensor(s), 0)return self.actor_eval(s)[0].detach()def learn(self):# softly update the target networksfor x in self.actor_target.state_dict().keys():eval('self.actor_target.' + x + '.data.mul_((1-TAU))')eval('self.actor_target.' + x + '.data.add_(TAU*self.actor_eval.' + x + '.data)')for x in self.critic_target.state_dict().keys():eval('self.critic_target.' + x + '.data.mul_((1-TAU))')eval('self.critic_target.' + x + '.data.add_(TAU*self.critic_eval.' + x + '.data)') # sample from buffer a mini-batch dataindices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)batch_trans = self.memory[indices, :]# extract data from mini-batch of transitions including s, a, r, s_batch_s = torch.FloatTensor(batch_trans[:, :self.s_dim])batch_a = torch.FloatTensor(batch_trans[:, self.s_dim:self.s_dim + self.a_dim])batch_r = torch.FloatTensor(batch_trans[:, -self.s_dim - 1: -self.s_dim])batch_s_ = torch.FloatTensor(batch_trans[:, -self.s_dim:])# make action and evaluate its action valuesa = self.actor_eval(batch_s)q = self.critic_eval(batch_s, a)actor_loss = -torch.mean(q)# optimize the loss of actor networkself.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# compute the target Q value using the information of next statea_target = self.actor_target(batch_s_)q_tmp = self.critic_target(batch_s_, a_target)q_target = batch_r + GAMMA * q_tmp# compute the current q value and the lossq_eval = self.critic_eval(batch_s, batch_a)td_error = self.loss_func(q_target, q_eval)# optimize the loss of critic networkself.critic_optimizer.zero_grad()td_error.backward()self.critic_optimizer.step()############################### Training ####################################### Define the env in gymenv = gym.make(ENV_NAME)env = env.unwrappedenv.seed(1)s_dim = env.observation_space.shape[0]a_dim = env.action_space.shape[0]a_bound = env.action_space.higha_low_bound = env.action_space.lowddpg = DDPG(a_dim, s_dim, a_bound)var = 3 # the controller of exploration which will decay during training processt1 = time.time()for i in range(EPISODES):s = env.reset()ep_r = 0for j in range(EP_STEPS):if RENDER: env.render()# add explorative noise to actiona = ddpg.choose_action(s)a = np.clip(np.random.normal(a, var), a_low_bound, a_bound)s_, r, done, info = env.step(a)ddpg.store_transition(s, a, r / 10, s_) # store the transition to memoryif ddpg.pointer > MEMORY_CAPACITY:var *= 0.9995 # decay the exploration controller factorddpg.learn()s = s_ep_r += rif j == EP_STEPS - 1:print('Episode: ', i, ' Reward: %i' % (ep_r), 'Explore: %.2f' % var)if ep_r > -300 : RENDER = Truebreakprint('Running time: ', time.time() - t1)


import torchimport torch.nn as nn import torch.nn.functional as Fimport torch.optim as optim# from tensorboardX import SummaryWriterimport gymimport numpy as npimport randomfrom torch.distributions import Normalfrom itertools import count# Define hyperparametersENV_NAME = "Pendulum-v0" # gym envBATCH_SIZE = 100 # mini-batch size when sampled from bufferMEM_CAPACTIY = 10000 # Replay buffer sizeEPISODES = 200STEPS = 200GAMMA = 0.9 # discount factorLEARNING_RATE = 1e-3 # learning rate of optimizerTAU = 0.01 # update the target net parameter smoothlyRANDOM_SEED = 9527 # fix the random seed# SAMPLE_FREQ = 2000 NOISE_VAR = 0.1RENDER = Falsedevice = 'cuda' if torch.cuda.is_available() else 'cpu' # use GPU to trainprint(device)env = gym.make(ENV_NAME) ACTION_DIM = env.action_space.shape[0] #dim=1STATE_DIM = env.observation_space.shape[0] # dim=3ACTION_BOUND = env.action_space.high[0] # action interval [-2,2]np.random.seed(RANDOM_SEED) # fix the random seeddirectory = '.\\exp\\'class ReplayBuffer():def __init__(self, max_size=MEM_CAPACTIY):self.storage = [] #empty listself.max_size = max_sizeself.pointer= 0def store_transition(self, transition):if len(self.storage) == self.max_size: # replace the old dataself.storage[self.pointer] = transitionself.pointer = (self.pointer + 1) % self.max_size # point to next positionelse:self.storage.append(transition)def sample(self, batch_size):# Define the array of indices for random sampling from storage# the size of this array equals to batch_sizeind_array = np.random.randint(0, len(self.storage),size=batch_size)s, a, r, s_, d = [], [], [], [], []for i in ind_array:S, A, R, S_, D = self.storage[i]s.append(np.array(S, copy=False))a.append(np.array(A, copy=False))r.append(np.array(R, copy=False))s_.append(np.array(S_, copy=False))d.append(np.array(D, copy=False))return np.array(s), np.array(a), np.array(r).reshape(-1, 1), np.array(s_), np.array(d).reshape(-1, 1)class Actor(nn.Module):def __init__(self, state_dim, action_dim, max_action):super(Actor, self).__init__()self.l1 = nn.Linear(state_dim, 30)self.l1.weight.data.normal_(0, 0.3) # initializationself.l2 = nn.Linear(30, action_dim)self.l2.weight.data.normal_(0, 0.3) # initializationself.max_action = max_actiondef forward(self, x):x = F.relu(self.l1(x))x = self.max_action * torch.tanh(self.l2(x)) # the range of tanh is [-1, 1]return xclass Critic(nn.Module):def __init__(self, state_dim, action_dim):super(Critic,self).__init__()self.l1 = nn.Linear(state_dim + action_dim, 30)self.l1.weight.data.normal_(0, 0.3) # initializationself.l2 = nn.Linear(30, 1)self.l2.weight.data.normal_(0, 0.3) # initializationdef forward(self, x, a):x = F.relu(self.l1(torch.cat([x, a], 1)))x = self.l2(x)return xclass DDPG(object):def __init__(self, state_dim, action_dim, max_action):# network, optimizer for actorself.actor = Actor(state_dim, action_dim, max_action).to(device)self.actor_target = Actor(state_dim, action_dim, max_action).to(device)self.actor_target.load_state_dict(self.actor.state_dict())self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=LEARNING_RATE)# network, optimizer for criticself.critic = Critic(state_dim, action_dim).to(device)self.critic_target = Critic(state_dim, action_dim).to(device)self.critic_target.load_state_dict(self.critic.state_dict())self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=LEARNING_RATE)# create replay buffer objectself.replay_buffer = ReplayBuffer()def select_action(self, state):# select action based on actor network and add some noise on it for explorationstate = torch.FloatTensor(state.reshape(1, -1)).to(device)action = self.actor(state).cpu().data.numpy().flatten()noise = np.random.normal(0, NOISE_VAR, size=ACTION_DIM).clip(env.action_space.low, env.action_space.high)action = action + noisereturn actiondef update(self):for i in range(EPISODES):s, a, r, s_, d = self.replay_buffer.sample(BATCH_SIZE)# transfer these tensors to GPUstate = torch.FloatTensor(s).to(device)action = torch.FloatTensor(a).to(device)reward = torch.FloatTensor(r).to(device)next_state = torch.FloatTensor(s_).to(device)done = torch.FloatTensor(d).to(device)# compute the target Q valuetarget_Q = self.critic_target(next_state, self.actor_target(next_state))target_Q = reward + (done * GAMMA * target_Q).detach()# Get the current Q valuecurrent_Q = self.critic(state, action)# compute critic loss by MSEcritic_loss = F.mse_loss(current_Q, target_Q)# use optimizer to update the critic networkself.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# compute the actor loss and its gradient to update the parametersactor_loss = -self.critic(state,self.actor(state)).mean()self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# update the target network of actor and critic# zip() constructs tuple from iterable objectfor param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data)for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data)def save(self):torch.save(self.actor.state_dict(), directory + 'actor.pth')torch.save(self.critic.state_dict(), directory + 'critic.pth')def load(self):self.actor.load_state_dict(torch.load(directory + 'actor.pth'))self.critic.load_state_dict(torch.load(directory + 'critic.pth'))def main():agent = DDPG(STATE_DIM, ACTION_DIM, ACTION_BOUND)ep_r = 0total_step = 0for i in range(EPISODES):total_reward = 0step = 0state = env.reset()for t in count():if RENDER == True and i > 100: env.render() # Render is unnecessaryaction = agent.select_action(state)# get the next transition by using current actionnext_state, reward, done, info = env.step(action)# store the transition to the bufferagent.replay_buffer.store_transition((state, action, reward / 10, next_state, np.float(done)))state = next_stateif done:breakstep += 1total_reward += rewardtotal_step += step+1print("Total T:{} Episode: \t{} Total Reward: \t{:0.2f}".format(total_step, i, total_reward))agent.update()#NOISE_VAR *= 0.99env.close()if __name__ == '__main__':main()
