天天看點

基于Python實作的人工智能作業小車問題

目錄

1 任務描述 2

2 環境配置 2

3 算法設計 2

3.1 離散版本𝐌𝐨𝐮𝐧𝐭𝐚𝐢𝐧𝐂𝐚𝐫 − 𝐯𝟎 2

(1)問題背景 2

(2)Q-learning 算法 2

(3)程式流程 3

3.2 連續版本 3

3.3 其他算法(選做) 4

(1)SARSA 4

(2)𝐒𝐀𝐑𝐒𝐀(𝛌) 4

(3)DQN 4

3.4 算法對比 5

4 參數調整及優化1 6

·回報 reward: 6

5 總結與反思 7

6 參考文獻和資料 7

1任務描述

·必做:使用強化學習算法,解決MountainCar − v0;

使用強化學習算法,解決MountainCarContinous − v0

·選做:使用其他強化學習算法解決上述問題

2環境配置

·python(3.6) + gym(0.15.4) + tensorflow(1.2.1) + keras(2.2.4)

3算法設計

3.1 離散版本𝐌𝐨𝐮𝐧𝐭𝐚𝐢𝐧𝐂𝐚𝐫 − 𝐯𝟎

(1)問題背景

現有一小車在兩座山峰之間的谷底,小車動力有限,無法直接登上右側山峰,需要借助 動能和勢能之間的轉化才能到達目的地。本文轉載自http://www.biyezuopin.vip/onews.asp?id=16767在離散版本的MountainCar中,小車的行為(action) 是離散的,有向左、向右、靜止三個選項,每個狀态(state)下小車的觀測值包含位置(position)和速度(velocity)兩個方面,小車從-0.4—-0.6 之間的任意位置開始運動,在一個 episode

(200 步)内抵達 0.5 處即為成功,每走一步獲得-1 的回報值。

本次設計為強化學習方面的程式設計,通過這次作業我加深了對 Q-learning 和 SARSA 兩類算法的了解,用實踐鞏固了理論知識,并從程式結果更清晰地體會到了不同算法之間的差異,新學習了 SARSA(λ)和函數逼近的 DQN 算法,也是對之前監督學習的一個回顧,同時, 我掌握并鍛煉了 python 語言的編寫,熟悉了 pycharm 平台的相關操作,有了上大作業的經驗,這次配置環境、安裝資料包等都熟練了很多。這是人工智能課程最後一個作業了,在這門課上真的收獲很多,感謝老師和助教本學期以來的幫助!

# DQN 離散

import gym
import numpy as np
import matplotlib.pyplot as plt

from keras import models
from keras.layers import Dense, Activation
from keras.optimizers import Adam
from collections import deque
import random
from tqdm import tqdm

env = gym.make("MountainCar-v0")
env.reset()

ACTION_SPACE_SIZE = env.action_space.n
REPLAY_MEMORY_SIZE = 50000
MIN_REPLAY_MEMORY_SIZE = 1000
MINIBATCH_SIZE = 64
UPDATE_TARGET_EVERY = 5
DISCOUNT = 0.99
EPISODES = 1000

# 參數設定
epsilon = 1  # 動态變化
EPSILON_DECAY = 0.995
MIN_EPSILON = 0.001

ep_rewards = []

AGGREGATE_STATS_EVERY = 50
MIN_EPSILON = 0.001

recorder = {"epsode": [], "epsilon": []}

for epsode in range(EPISODES):
    if epsilon > MIN_EPSILON:
        epsilon *= EPSILON_DECAY
        epsilon = max(MIN_EPSILON, epsilon)

    recorder["epsode"].append(epsode)
    recorder["epsilon"].append(epsilon)

plt.plot(recorder["epsode"], recorder["epsilon"])
plt.show()


# 建立網絡模型
def create_model():
    model = models.Sequential()

    model.add(Dense(16, input_shape=(env.observation_space.shape)))
    model.add(Activation('relu'))
    model.add(Dense(16))
    model.add(Activation('relu'))
    model.add(Dense(16))
    model.add(Activation('relu'))
    model.add(Dense(ACTION_SPACE_SIZE))
    model.add(Activation('linear'))

    print(model.summary())

    model.compile(loss='mse', optimizer=Adam(lr=0.001), metrics=['accuracy'])

    return model


# DQN智能體
class DQNAgent:

    def __init__(self):

        # 記憶池
        self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)

        # 建立Prediction Network
        self.model_prediction = create_model()

        # 建立Target Network
        self.model_target = create_model()
        self.model_target.set_weights(self.model_prediction.get_weights())

        # 計數,到一定次數更新Target Network
        self.target_update_counter = 0

    # 添加一組新資料到記憶池
    # (current_state, action, reward, next_state, done)
    def update_replay_memory(self, transition):
        self.replay_memory.append(transition)

    # 預測Q表
    def get_qs(self, state):
        return self.model_prediction.predict(np.array(state).reshape(-1, *state.shape))[0]

    # 訓練
    def train(self, terminal_state, step):
        # 如果記憶池未滿,退出
        if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return
        # 從記憶池中取一個batch
        minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)

        # 擷取batch中的s,送入Prediction Network獲得q表current_qs_list輸出
        current_states = np.array([transition[0] for transition in minibatch])
        current_qs_list = self.model_prediction.predict(current_states)

        # 擷取batch中的s',送入Target Network獲得q'表target_qs_list
        next_states = np.array([transition[3] for transition in minibatch])
        target_qs_list = self.model_target.predict(next_states)

        X = []
        y = []

        # 周遊batch中元素,對每一組資料
        for index, (current_state, action, reward, next_state, done) in enumerate(minibatch):

            # q=learning算法
            if not done:
                max_target_q = np.max(target_qs_list[index])
                new_q = reward + DISCOUNT * max_target_q
            else:
                new_q = reward

            # 更新q表
            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            # 将狀态s和q表添加至網絡的輸入x和輸出y
            X.append(current_state)
            y.append(current_qs)

            # Fit on all samples as one batch, log only on terminal state
        self.model_prediction.fit(np.array(X), np.array(y), batch_size=MINIBATCH_SIZE, verbose=0,
                                  shuffle=False if terminal_state else None)

        # 計數,定期更新Target Network參數,使之與Prediction同步
        if terminal_state:
            self.target_update_counter += 1

        if self.target_update_counter > UPDATE_TARGET_EVERY:
            self.model_target.set_weights(self.model_prediction.get_weights())
            self.target_update_counter = 0


# 執行個體化
agent = DQNAgent()

aggr_ep_rewards = {'ep': [], 'avg': [], 'min': [], 'max': []}

for episode in tqdm(range(1, EPISODES + 1), ascii=True, unit='episodes'):

    # Restarting episode - reset episode reward and step number
    episode_reward = 0
    step = 1

    current_state = env.reset()

    done = False
    while not done:

        # epsilon-greedy政策
        if np.random.random() > epsilon:
            # 利用網絡訓練出的q表取最大
            action = np.argmax(agent.get_qs(current_state))
        else:
            # 随機
            action = np.random.randint(0, ACTION_SPACE_SIZE)

        next_state, reward, done, _ = env.step(action)

        episode_reward += reward

        # 每個episode更新記憶池,并訓練
        agent.update_replay_memory((current_state, action, reward, next_state, done))
        agent.train(done, step)

        current_state = next_state
        step += 1

    # 記錄資料
    ep_rewards.append(episode_reward)
    if not episode % AGGREGATE_STATS_EVERY or episode == 1:
        average_reward = sum(ep_rewards[-AGGREGATE_STATS_EVERY:]) / len(ep_rewards[-AGGREGATE_STATS_EVERY:])
        min_reward = min(ep_rewards[-AGGREGATE_STATS_EVERY:])
        max_reward = max(ep_rewards[-AGGREGATE_STATS_EVERY:])

        aggr_ep_rewards['ep'].append(episode)
        aggr_ep_rewards['avg'].append(average_reward)
        aggr_ep_rewards['min'].append(min_reward)
        aggr_ep_rewards['max'].append(max_reward)

    # 動态降低epsilon
    if epsilon > MIN_EPSILON:
        epsilon *= EPSILON_DECAY
        epsilon = max(MIN_EPSILON, epsilon)

env.close()

agent.model_prediction.save('dqn_1.model')

plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label='avg')
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label='min')
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label='max')
plt.legend(loc='upper left')
plt.xlabel('Episodes')
plt.ylabel('Rewards')
plt.show()

           
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題
基于Python實作的人工智能作業小車問題