天天看點

深度強化學習之:PyTorch ppo算法實作月球登陸器

作者:甘道實研

強化學習算法種類

深度強化學習之:PyTorch ppo算法實作月球登陸器

On-policy vs Off-policy:

  • On-policy: 訓練資料由目前 agent 不斷與環境互動得到
  • Off-policy: 訓練的 agent 和與環境互動的 agent 不是同一個 agent, 即别人與環境互動為我提供訓練資料

PPO 算法

PPO (Proximal Policy Optimization) 即近端政策優化. PPO 是一種 on-policy 算法, 通過實作小批量更新, 解決了訓練過程中新舊政策的變化差異過大導緻不易學習的問題.

深度強化學習之:PyTorch ppo算法實作月球登陸器

Actor-Critic 算法

Actor-Critic 算法共分為兩部分. 第一部分為政策函數 Actor, 負責生成動作并與環境互動; 第二部分為價值函數, 負責評估 Actor 的表現.

深度強化學習之:PyTorch ppo算法實作月球登陸器

Gym

Gym 是一個強化學習會經常用到的包. Gym 裡收集了很多遊戲的環境. 下面我們就會用 LunarLander-v2 來實作一個自動版的 “阿波羅登月”.

深度強化學習之:PyTorch ppo算法實作月球登陸器

安裝:

pip install gym
           

如果遇到報錯:

AttributeError: module 'gym.envs.box2d' has no attribute 'LunarLander'
           

解決辦法:

pip install gym[box2d]
           

LunarLander-v2

LunarLander-v2 是一個月球登陸器. 着陸平台位于坐标 (0, 0). 坐标是狀态向量的前兩個數字, 從螢幕頂部移動到着陸台和零速度的獎勵大約是 100 到 140分. 如果着陸器墜毀或停止, 則回合結束, 獲得額外的 -100 或 +100點. 每腳接地為 +10, 點火主機每幀 -0.3分, 正解為200分.

深度強化學習之:PyTorch ppo算法實作月球登陸器

啟動登陸器

import gym

# 建立環境
env = gym.make("LunarLander-v2")

# 重置環境
env.reset()

# 啟動
for i in range(180):

  # 渲染環境
  env.render()

  # 随機移動
  observation, reward, done, info = env.step(env.action_space.sample())

  if i % 10 == 0:
      # 調試輸出
      print("觀察:", observation)
      print("得分:", reward)           

PPO 算法實作月球登入器

PPO

import torch
import torch.nn as nn
from torch.distributions import Categorical

# 是否使用GPU加速
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)


class Memory:
  def __init__(self):
      """初始化"""
      self.actions = []  # 行動(共4種)
      self.states = []  # 狀态, 由8個數字組成
      self.logprobs = []  # 機率
      self.rewards = []  # 獎勵
      self.is_terminals = []  # 遊戲是否結束

  def clear_memory(self):
      """清除memory"""
      del self.actions[:]
      del self.states[:]
      del self.logprobs[:]
      del self.rewards[:]
      del self.is_terminals[:]


class ActorCritic(nn.Module):
  def __init__(self, state_dim, action_dim, n_latent_var):
      super(ActorCritic, self).__init__()

      # 行動
      self.action_layer = nn.Sequential(
          # [b, 8] => [b, 64]
          nn.Linear(state_dim, n_latent_var),
          nn.Tanh(),  # 激活

          # [b, 64] => [b, 64]
          nn.Linear(n_latent_var, n_latent_var),
          nn.Tanh(),  # 激活

          # [b, 64] => [b, 4]
          nn.Linear(n_latent_var, action_dim),
          nn.Softmax(dim=-1)
      )

      # 評判
      self.value_layer = nn.Sequential(
          # [b, 8] => [8, 64]
          nn.Linear(state_dim, n_latent_var),
          nn.Tanh(),  # 激活

          # [b, 64] => [b, 64]
          nn.Linear(n_latent_var, n_latent_var),
          nn.Tanh(),

          # [b, 64] => [b, 1]
          nn.Linear(n_latent_var, 1)
      )

  def forward(self):
      """前向傳播, 由act替代"""

      raise NotImplementedError

  def act(self, state, memory):
      """計算行動"""

      # 轉成張量
      state = torch.from_numpy(state).float().to(device)

      # 計算4個方向機率
      action_probs = self.action_layer(state)

      # 通過最大機率計算最終行動方向
      dist = Categorical(action_probs)
      action = dist.sample()

      # 存入memory
      memory.states.append(state)
      memory.actions.append(action)
      memory.logprobs.append(dist.log_prob(action))

      # 傳回行動
      return action.item()

  def evaluate(self, state, action):
      """
      評估
      :param state: 狀态, 2000個一組, 形狀為 [2000, 8]
      :param action: 行動, 2000個一組, 形狀為 [2000]
      :return:
      """

      # 計算行動機率
      action_probs = self.action_layer(state)
      dist = Categorical(action_probs)  # 轉換成類别分布

      # 計算機率密度, log(機率)
      action_logprobs = dist.log_prob(action)

      # 計算熵
      dist_entropy = dist.entropy()

      # 評判
      state_value = self.value_layer(state)
      state_value = torch.squeeze(state_value)  # [2000, 1] => [2000]

      # 傳回行動機率密度, 評判值, 行動機率熵
      return action_logprobs, state_value, dist_entropy


class PPO:
  def __init__(self, state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip):
      self.lr = lr  # 學習率
      self.betas = betas  # betas
      self.gamma = gamma  # gamma
      self.eps_clip = eps_clip  # 裁剪, 限制值範圍
      self.K_epochs = K_epochs  # 疊代次數

      # 初始化policy
      self.policy = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
      self.policy_old = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
      self.policy_old.load_state_dict(self.policy.state_dict())

      self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas)  # 優化器
      self.MseLoss = nn.MSELoss()  # 損失函數

  def update(self, memory):
      """更新梯度"""

      # 蒙特卡羅預測狀态回報
      rewards = []
      discounted_reward = 0
      for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
          # 回合結束
          if is_terminal:
              discounted_reward = 0

          # 更新削減獎勵(目前狀态獎勵 + 0.99*上一狀态獎勵
          discounted_reward = reward + (self.gamma * discounted_reward)

          # 首插入
          rewards.insert(0, discounted_reward)

      # 标準化獎勵
      rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
      rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)

      # 張量轉換
      old_states = torch.stack(memory.states).to(device).detach()
      old_actions = torch.stack(memory.actions).to(device).detach()
      old_logprobs = torch.stack(memory.logprobs).to(device).detach()

      # 疊代優化 K 次:
      for _ in range(self.K_epochs):
          # 評估
          logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)

          # 計算ratios
          ratios = torch.exp(logprobs - old_logprobs.detach())

          # 計算損失
          advantages = rewards - state_values.detach()
          surr1 = ratios * advantages
          surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
          loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy

          # 梯度清零
          self.optimizer.zero_grad()

          # 反向傳播
          loss.mean().backward()

          # 更新梯度
          self.optimizer.step()

      # 将新的權重指派給舊policy
      self.policy_old.load_state_dict(self.policy.state_dict())           

main

import gym
import torch
from PPO import Memory, PPO

############## 超參數 ##############
env_name = "LunarLander-v2"  # 遊戲名字
env = gym.make(env_name)
state_dim = 8  # 狀态次元
action_dim = 4  # 行動次元
render = False  # 可視化
solved_reward = 230  # 停止循環條件 (獎勵 > 230)
log_interval = 20  # print avg reward in the interval
max_episodes = 50000  # 最大疊代次數
max_timesteps = 300  # 最大單次遊戲步數
n_latent_var = 64  # 全連接配接隐層次元
update_timestep = 2000  # 每2000步policy更新一次
lr = 0.002  # 學習率
betas = (0.9, 0.999)  # betas
gamma = 0.99  # gamma
K_epochs = 4  # policy疊代更新次數
eps_clip = 0.2  # PPO 限幅


#############################################

def main():
  # 執行個體化
  memory = Memory()
  ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip)

  # 存放
  total_reward = 0
  total_length = 0
  timestep = 0

  # 訓練
  for i_episode in range(1, max_episodes + 1):

      # 環境初始化
      state = env.reset()  # 初始化(重新玩)

      # 疊代
      for t in range(max_timesteps):
          timestep += 1

          # 用舊policy得到行動
          action = ppo.policy_old.act(state, memory)

          # 行動
          state, reward, done, _ = env.step(action)  # 得到(新的狀态,獎勵,是否終止,額外的調試資訊)

          # 更新memory(獎勵/遊戲是否結束)
          memory.rewards.append(reward)
          memory.is_terminals.append(done)

          # 更新梯度
          if timestep % update_timestep == 0:
              ppo.update(memory)

              # memory清零
              memory.clear_memory()

              # 累計步數清零
              timestep = 0

          # 累加
          total_reward += reward

          # 可視化
          if render:
              env.render()

          # 如果遊戲結束, 退出
          if done:
              break

      # 遊戲步長
      total_length += t

      # 如果達到要求(230分), 退出循環
      if total_reward >= (log_interval * solved_reward):
          print("########## Solved! ##########")

          # 儲存模型
          torch.save(ppo.policy.state_dict(), './PPO_{}.pth'.format(env_name))

          # 退出循環
          break

      # 輸出log, 每20次疊代
      if i_episode % log_interval == 0:
          
          # 求20次疊代平均時長/收益
          avg_length = int(total_length / log_interval)
          running_reward = int(total_reward / log_interval)

          # 調試輸出
          print('Episode {} \t avg length: {} \t average_reward: {}'.format(i_episode, avg_length, running_reward))

          # 清零
          total_reward = 0
          total_length = 0

if __name__ == '__main__':
  main()           

繼續閱讀