天天看点

深度强化学习之:PyTorch ppo算法实现月球登陆器

作者:甘道实研

强化学习算法种类

深度强化学习之:PyTorch ppo算法实现月球登陆器

On-policy vs Off-policy:

  • On-policy: 训练数据由当前 agent 不断与环境交互得到
  • Off-policy: 训练的 agent 和与环境交互的 agent 不是同一个 agent, 即别人与环境交互为我提供训练数据

PPO 算法

PPO (Proximal Policy Optimization) 即近端策略优化. PPO 是一种 on-policy 算法, 通过实现小批量更新, 解决了训练过程中新旧策略的变化差异过大导致不易学习的问题.

深度强化学习之:PyTorch ppo算法实现月球登陆器

Actor-Critic 算法

Actor-Critic 算法共分为两部分. 第一部分为策略函数 Actor, 负责生成动作并与环境交互; 第二部分为价值函数, 负责评估 Actor 的表现.

深度强化学习之:PyTorch ppo算法实现月球登陆器

Gym

Gym 是一个强化学习会经常用到的包. Gym 里收集了很多游戏的环境. 下面我们就会用 LunarLander-v2 来实现一个自动版的 “阿波罗登月”.

深度强化学习之:PyTorch ppo算法实现月球登陆器

安装:

pip install gym
           

如果遇到报错:

AttributeError: module 'gym.envs.box2d' has no attribute 'LunarLander'
           

解决办法:

pip install gym[box2d]
           

LunarLander-v2

LunarLander-v2 是一个月球登陆器. 着陆平台位于坐标 (0, 0). 坐标是状态向量的前两个数字, 从屏幕顶部移动到着陆台和零速度的奖励大约是 100 到 140分. 如果着陆器坠毁或停止, 则回合结束, 获得额外的 -100 或 +100点. 每脚接地为 +10, 点火主机每帧 -0.3分, 正解为200分.

深度强化学习之:PyTorch ppo算法实现月球登陆器

启动登陆器

import gym

# 创建环境
env = gym.make("LunarLander-v2")

# 重置环境
env.reset()

# 启动
for i in range(180):

  # 渲染环境
  env.render()

  # 随机移动
  observation, reward, done, info = env.step(env.action_space.sample())

  if i % 10 == 0:
      # 调试输出
      print("观察:", observation)
      print("得分:", reward)           

PPO 算法实现月球登录器

PPO

import torch
import torch.nn as nn
from torch.distributions import Categorical

# 是否使用GPU加速
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)


class Memory:
  def __init__(self):
      """初始化"""
      self.actions = []  # 行动(共4种)
      self.states = []  # 状态, 由8个数字组成
      self.logprobs = []  # 概率
      self.rewards = []  # 奖励
      self.is_terminals = []  # 游戏是否结束

  def clear_memory(self):
      """清除memory"""
      del self.actions[:]
      del self.states[:]
      del self.logprobs[:]
      del self.rewards[:]
      del self.is_terminals[:]


class ActorCritic(nn.Module):
  def __init__(self, state_dim, action_dim, n_latent_var):
      super(ActorCritic, self).__init__()

      # 行动
      self.action_layer = nn.Sequential(
          # [b, 8] => [b, 64]
          nn.Linear(state_dim, n_latent_var),
          nn.Tanh(),  # 激活

          # [b, 64] => [b, 64]
          nn.Linear(n_latent_var, n_latent_var),
          nn.Tanh(),  # 激活

          # [b, 64] => [b, 4]
          nn.Linear(n_latent_var, action_dim),
          nn.Softmax(dim=-1)
      )

      # 评判
      self.value_layer = nn.Sequential(
          # [b, 8] => [8, 64]
          nn.Linear(state_dim, n_latent_var),
          nn.Tanh(),  # 激活

          # [b, 64] => [b, 64]
          nn.Linear(n_latent_var, n_latent_var),
          nn.Tanh(),

          # [b, 64] => [b, 1]
          nn.Linear(n_latent_var, 1)
      )

  def forward(self):
      """前向传播, 由act替代"""

      raise NotImplementedError

  def act(self, state, memory):
      """计算行动"""

      # 转成张量
      state = torch.from_numpy(state).float().to(device)

      # 计算4个方向概率
      action_probs = self.action_layer(state)

      # 通过最大概率计算最终行动方向
      dist = Categorical(action_probs)
      action = dist.sample()

      # 存入memory
      memory.states.append(state)
      memory.actions.append(action)
      memory.logprobs.append(dist.log_prob(action))

      # 返回行动
      return action.item()

  def evaluate(self, state, action):
      """
      评估
      :param state: 状态, 2000个一组, 形状为 [2000, 8]
      :param action: 行动, 2000个一组, 形状为 [2000]
      :return:
      """

      # 计算行动概率
      action_probs = self.action_layer(state)
      dist = Categorical(action_probs)  # 转换成类别分布

      # 计算概率密度, log(概率)
      action_logprobs = dist.log_prob(action)

      # 计算熵
      dist_entropy = dist.entropy()

      # 评判
      state_value = self.value_layer(state)
      state_value = torch.squeeze(state_value)  # [2000, 1] => [2000]

      # 返回行动概率密度, 评判值, 行动概率熵
      return action_logprobs, state_value, dist_entropy


class PPO:
  def __init__(self, state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip):
      self.lr = lr  # 学习率
      self.betas = betas  # betas
      self.gamma = gamma  # gamma
      self.eps_clip = eps_clip  # 裁剪, 限制值范围
      self.K_epochs = K_epochs  # 迭代次数

      # 初始化policy
      self.policy = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
      self.policy_old = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
      self.policy_old.load_state_dict(self.policy.state_dict())

      self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas)  # 优化器
      self.MseLoss = nn.MSELoss()  # 损失函数

  def update(self, memory):
      """更新梯度"""

      # 蒙特卡罗预测状态回报
      rewards = []
      discounted_reward = 0
      for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
          # 回合结束
          if is_terminal:
              discounted_reward = 0

          # 更新削减奖励(当前状态奖励 + 0.99*上一状态奖励
          discounted_reward = reward + (self.gamma * discounted_reward)

          # 首插入
          rewards.insert(0, discounted_reward)

      # 标准化奖励
      rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
      rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)

      # 张量转换
      old_states = torch.stack(memory.states).to(device).detach()
      old_actions = torch.stack(memory.actions).to(device).detach()
      old_logprobs = torch.stack(memory.logprobs).to(device).detach()

      # 迭代优化 K 次:
      for _ in range(self.K_epochs):
          # 评估
          logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)

          # 计算ratios
          ratios = torch.exp(logprobs - old_logprobs.detach())

          # 计算损失
          advantages = rewards - state_values.detach()
          surr1 = ratios * advantages
          surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
          loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy

          # 梯度清零
          self.optimizer.zero_grad()

          # 反向传播
          loss.mean().backward()

          # 更新梯度
          self.optimizer.step()

      # 将新的权重赋值给旧policy
      self.policy_old.load_state_dict(self.policy.state_dict())           

main

import gym
import torch
from PPO import Memory, PPO

############## 超参数 ##############
env_name = "LunarLander-v2"  # 游戏名字
env = gym.make(env_name)
state_dim = 8  # 状态维度
action_dim = 4  # 行动维度
render = False  # 可视化
solved_reward = 230  # 停止循环条件 (奖励 > 230)
log_interval = 20  # print avg reward in the interval
max_episodes = 50000  # 最大迭代次数
max_timesteps = 300  # 最大单次游戏步数
n_latent_var = 64  # 全连接隐层维度
update_timestep = 2000  # 每2000步policy更新一次
lr = 0.002  # 学习率
betas = (0.9, 0.999)  # betas
gamma = 0.99  # gamma
K_epochs = 4  # policy迭代更新次数
eps_clip = 0.2  # PPO 限幅


#############################################

def main():
  # 实例化
  memory = Memory()
  ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip)

  # 存放
  total_reward = 0
  total_length = 0
  timestep = 0

  # 训练
  for i_episode in range(1, max_episodes + 1):

      # 环境初始化
      state = env.reset()  # 初始化(重新玩)

      # 迭代
      for t in range(max_timesteps):
          timestep += 1

          # 用旧policy得到行动
          action = ppo.policy_old.act(state, memory)

          # 行动
          state, reward, done, _ = env.step(action)  # 得到(新的状态,奖励,是否终止,额外的调试信息)

          # 更新memory(奖励/游戏是否结束)
          memory.rewards.append(reward)
          memory.is_terminals.append(done)

          # 更新梯度
          if timestep % update_timestep == 0:
              ppo.update(memory)

              # memory清零
              memory.clear_memory()

              # 累计步数清零
              timestep = 0

          # 累加
          total_reward += reward

          # 可视化
          if render:
              env.render()

          # 如果游戏结束, 退出
          if done:
              break

      # 游戏步长
      total_length += t

      # 如果达到要求(230分), 退出循环
      if total_reward >= (log_interval * solved_reward):
          print("########## Solved! ##########")

          # 保存模型
          torch.save(ppo.policy.state_dict(), './PPO_{}.pth'.format(env_name))

          # 退出循环
          break

      # 输出log, 每20次迭代
      if i_episode % log_interval == 0:
          
          # 求20次迭代平均时长/收益
          avg_length = int(total_length / log_interval)
          running_reward = int(total_reward / log_interval)

          # 调试输出
          print('Episode {} \t avg length: {} \t average_reward: {}'.format(i_episode, avg_length, running_reward))

          # 清零
          total_reward = 0
          total_length = 0

if __name__ == '__main__':
  main()           

继续阅读