强化学习算法种类
On-policy vs Off-policy:
- On-policy: 训练数据由当前 agent 不断与环境交互得到
- Off-policy: 训练的 agent 和与环境交互的 agent 不是同一个 agent, 即别人与环境交互为我提供训练数据
PPO 算法
PPO (Proximal Policy Optimization) 即近端策略优化. PPO 是一种 on-policy 算法, 通过实现小批量更新, 解决了训练过程中新旧策略的变化差异过大导致不易学习的问题.
Actor-Critic 算法
Actor-Critic 算法共分为两部分. 第一部分为策略函数 Actor, 负责生成动作并与环境交互; 第二部分为价值函数, 负责评估 Actor 的表现.
Gym
Gym 是一个强化学习会经常用到的包. Gym 里收集了很多游戏的环境. 下面我们就会用 LunarLander-v2 来实现一个自动版的 “阿波罗登月”.
安装:
pip install gym
如果遇到报错:
AttributeError: module 'gym.envs.box2d' has no attribute 'LunarLander'
解决办法:
pip install gym[box2d]
LunarLander-v2
LunarLander-v2 是一个月球登陆器. 着陆平台位于坐标 (0, 0). 坐标是状态向量的前两个数字, 从屏幕顶部移动到着陆台和零速度的奖励大约是 100 到 140分. 如果着陆器坠毁或停止, 则回合结束, 获得额外的 -100 或 +100点. 每脚接地为 +10, 点火主机每帧 -0.3分, 正解为200分.
启动登陆器
import gym
# 创建环境
env = gym.make("LunarLander-v2")
# 重置环境
env.reset()
# 启动
for i in range(180):
# 渲染环境
env.render()
# 随机移动
observation, reward, done, info = env.step(env.action_space.sample())
if i % 10 == 0:
# 调试输出
print("观察:", observation)
print("得分:", reward)
PPO 算法实现月球登录器
PPO
import torch
import torch.nn as nn
from torch.distributions import Categorical
# 是否使用GPU加速
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
class Memory:
def __init__(self):
"""初始化"""
self.actions = [] # 行动(共4种)
self.states = [] # 状态, 由8个数字组成
self.logprobs = [] # 概率
self.rewards = [] # 奖励
self.is_terminals = [] # 游戏是否结束
def clear_memory(self):
"""清除memory"""
del self.actions[:]
del self.states[:]
del self.logprobs[:]
del self.rewards[:]
del self.is_terminals[:]
class ActorCritic(nn.Module):
def __init__(self, state_dim, action_dim, n_latent_var):
super(ActorCritic, self).__init__()
# 行动
self.action_layer = nn.Sequential(
# [b, 8] => [b, 64]
nn.Linear(state_dim, n_latent_var),
nn.Tanh(), # 激活
# [b, 64] => [b, 64]
nn.Linear(n_latent_var, n_latent_var),
nn.Tanh(), # 激活
# [b, 64] => [b, 4]
nn.Linear(n_latent_var, action_dim),
nn.Softmax(dim=-1)
)
# 评判
self.value_layer = nn.Sequential(
# [b, 8] => [8, 64]
nn.Linear(state_dim, n_latent_var),
nn.Tanh(), # 激活
# [b, 64] => [b, 64]
nn.Linear(n_latent_var, n_latent_var),
nn.Tanh(),
# [b, 64] => [b, 1]
nn.Linear(n_latent_var, 1)
)
def forward(self):
"""前向传播, 由act替代"""
raise NotImplementedError
def act(self, state, memory):
"""计算行动"""
# 转成张量
state = torch.from_numpy(state).float().to(device)
# 计算4个方向概率
action_probs = self.action_layer(state)
# 通过最大概率计算最终行动方向
dist = Categorical(action_probs)
action = dist.sample()
# 存入memory
memory.states.append(state)
memory.actions.append(action)
memory.logprobs.append(dist.log_prob(action))
# 返回行动
return action.item()
def evaluate(self, state, action):
"""
评估
:param state: 状态, 2000个一组, 形状为 [2000, 8]
:param action: 行动, 2000个一组, 形状为 [2000]
:return:
"""
# 计算行动概率
action_probs = self.action_layer(state)
dist = Categorical(action_probs) # 转换成类别分布
# 计算概率密度, log(概率)
action_logprobs = dist.log_prob(action)
# 计算熵
dist_entropy = dist.entropy()
# 评判
state_value = self.value_layer(state)
state_value = torch.squeeze(state_value) # [2000, 1] => [2000]
# 返回行动概率密度, 评判值, 行动概率熵
return action_logprobs, state_value, dist_entropy
class PPO:
def __init__(self, state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip):
self.lr = lr # 学习率
self.betas = betas # betas
self.gamma = gamma # gamma
self.eps_clip = eps_clip # 裁剪, 限制值范围
self.K_epochs = K_epochs # 迭代次数
# 初始化policy
self.policy = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
self.policy_old = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
self.policy_old.load_state_dict(self.policy.state_dict())
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas) # 优化器
self.MseLoss = nn.MSELoss() # 损失函数
def update(self, memory):
"""更新梯度"""
# 蒙特卡罗预测状态回报
rewards = []
discounted_reward = 0
for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
# 回合结束
if is_terminal:
discounted_reward = 0
# 更新削减奖励(当前状态奖励 + 0.99*上一状态奖励
discounted_reward = reward + (self.gamma * discounted_reward)
# 首插入
rewards.insert(0, discounted_reward)
# 标准化奖励
rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)
# 张量转换
old_states = torch.stack(memory.states).to(device).detach()
old_actions = torch.stack(memory.actions).to(device).detach()
old_logprobs = torch.stack(memory.logprobs).to(device).detach()
# 迭代优化 K 次:
for _ in range(self.K_epochs):
# 评估
logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)
# 计算ratios
ratios = torch.exp(logprobs - old_logprobs.detach())
# 计算损失
advantages = rewards - state_values.detach()
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy
# 梯度清零
self.optimizer.zero_grad()
# 反向传播
loss.mean().backward()
# 更新梯度
self.optimizer.step()
# 将新的权重赋值给旧policy
self.policy_old.load_state_dict(self.policy.state_dict())
main
import gym
import torch
from PPO import Memory, PPO
############## 超参数 ##############
env_name = "LunarLander-v2" # 游戏名字
env = gym.make(env_name)
state_dim = 8 # 状态维度
action_dim = 4 # 行动维度
render = False # 可视化
solved_reward = 230 # 停止循环条件 (奖励 > 230)
log_interval = 20 # print avg reward in the interval
max_episodes = 50000 # 最大迭代次数
max_timesteps = 300 # 最大单次游戏步数
n_latent_var = 64 # 全连接隐层维度
update_timestep = 2000 # 每2000步policy更新一次
lr = 0.002 # 学习率
betas = (0.9, 0.999) # betas
gamma = 0.99 # gamma
K_epochs = 4 # policy迭代更新次数
eps_clip = 0.2 # PPO 限幅
#############################################
def main():
# 实例化
memory = Memory()
ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip)
# 存放
total_reward = 0
total_length = 0
timestep = 0
# 训练
for i_episode in range(1, max_episodes + 1):
# 环境初始化
state = env.reset() # 初始化(重新玩)
# 迭代
for t in range(max_timesteps):
timestep += 1
# 用旧policy得到行动
action = ppo.policy_old.act(state, memory)
# 行动
state, reward, done, _ = env.step(action) # 得到(新的状态,奖励,是否终止,额外的调试信息)
# 更新memory(奖励/游戏是否结束)
memory.rewards.append(reward)
memory.is_terminals.append(done)
# 更新梯度
if timestep % update_timestep == 0:
ppo.update(memory)
# memory清零
memory.clear_memory()
# 累计步数清零
timestep = 0
# 累加
total_reward += reward
# 可视化
if render:
env.render()
# 如果游戏结束, 退出
if done:
break
# 游戏步长
total_length += t
# 如果达到要求(230分), 退出循环
if total_reward >= (log_interval * solved_reward):
print("########## Solved! ##########")
# 保存模型
torch.save(ppo.policy.state_dict(), './PPO_{}.pth'.format(env_name))
# 退出循环
break
# 输出log, 每20次迭代
if i_episode % log_interval == 0:
# 求20次迭代平均时长/收益
avg_length = int(total_length / log_interval)
running_reward = int(total_reward / log_interval)
# 调试输出
print('Episode {} \t avg length: {} \t average_reward: {}'.format(i_episode, avg_length, running_reward))
# 清零
total_reward = 0
total_length = 0
if __name__ == '__main__':
main()