天天看點

Deep Q-Network 學習筆記(三)—— 改進①:nature dqn

由于 Q 值與 next Q 使用同一個網絡時,是在一邊更新一邊學習,會不穩定。

是以,這個算法其實就是将神經網絡拆分成 2 個,一個 Q 網絡,用于同步更新 Q 值,另一個是 target 網絡,用于計算目标 Q 值,并且每隔一段時間,自動将最新的 Q 網絡的權值同步給 target 網絡即可。

其實也就是在上一篇的基礎上做以下修改即可:

1.增加一個 target 網絡。

2.在記憶回放的時候,取 max Q 的值時将原本使用的 Q 網絡修改成使用 target 網絡。

3.在訓練一定次數後,同步權值。

以下是源碼:

import tensorflow as tf
import numpy as np
from collections import deque
import random


class DeepQNetwork:
    r = np.array([[-1, -1, -1, -1, 0, -1],
                  [-1, -1, -1, 0, -1, 100.0],
                  [-1, -1, -1, 0, -1, -1],
                  [-1, 0, 0, -1, 0, -1],
                  [0, -1, -1, 1, -1, 100],
                  [-1, 0, -1, -1, 0, 100],
                  ])

    # 執行步數。
    step_index = 0

    # 狀态數。
    STATE_NUM = 6

    # 動作數。
    ACTION_NUM = 6

    # 訓練之前觀察多少步。
    OBSERVE = 1000.

    # 選取的小批量訓練樣本數。
    BATCH = 20

    # epsilon 的最小值,當 epsilon 小于該值時,将不在随機選擇行為。
    FINAL_EPSILON = 0.0001

    # epsilon 的初始值,epsilon 逐漸減小。
    INITIAL_EPSILON = 0.1

    # epsilon 衰減的總步數。
    EXPLORE = 3000000.

    # 探索模式計數。
    epsilon = 0

    # 訓練步數統計。
    learn_step_counter = 0

    # 學習率。
    learning_rate = 0.001

    # γ經驗折損率。
    gamma = 0.9

    # 記憶上限。
    memory_size = 5000

    # 目前記憶數。
    memory_counter = 0

    # 儲存觀察到的執行過的行動的存儲器,即:曾經經曆過的記憶。
    replay_memory_store = deque()

    # 生成一個狀态矩陣(6 X 6),每一行代表一個狀态。
    state_list = None

    # 生成一個動作矩陣。
    action_list = None

    # q_eval 網絡狀态輸入參數。
    q_eval_input = None

    # q_eval 網絡動作輸入參數。
    q_action_input = None

    # q_eval 網絡中 q_target 的輸入參數。
    q_eval_target = None

    # q_eval 網絡輸出結果。
    q_eval_output = None

    # q_eval 網絡輸出的結果中的最優得分。
    q_predict = None

    # q_eval 網絡輸出的結果中目前選擇的動作得分。
    reward_action = None

    # q_eval 網絡損失函數。
    loss = None

    # q_eval 網絡訓練。
    train_op = None

    # q_target 網絡狀态輸入參數。
    q_target_input = None

    # q_target 網絡輸出結果。
    q_target_output = None

    # 更換 target_net 的步數。
    replace_target_stepper = None

    # loss 值的集合。
    cost_list = None

    # 輸出圖表顯示 Q 值走向。
    q_list = None
    running_q = 0

    # tensorflow 會話。
    session = None

    def __init__(self, learning_rate=0.001, gamma=0.9, memory_size=5000, replace_target_stepper=300):
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.memory_size = memory_size
        self.replace_target_stepper = replace_target_stepper

        # 初始化成一個 6 X 6 的狀态矩陣。
        self.state_list = np.identity(self.STATE_NUM)

        # 初始化成一個 6 X 6 的動作矩陣。
        self.action_list = np.identity(self.ACTION_NUM)

        # 建立神經網絡。
        self.create_network()

        # 初始化 tensorflow 會話。
        self.session = tf.InteractiveSession()

        # 初始化 tensorflow 參數。
        self.session.run(tf.initialize_all_variables())

        # 記錄所有 loss 變化。
        self.cost_list = []

        # 記錄 q 值的變化。
        self.q_list = []

    def create_network(self):
        """
        建立神經網絡。
        :return:
        """
        neuro_layer_1 = 3
        w_init = tf.random_normal_initializer(0, 0.3)
        b_init = tf.constant_initializer(0.1)

        # -------------- 建立 eval 神經網絡, 及時提升參數 -------------- #
        self.q_eval_input = tf.placeholder(shape=[None, self.STATE_NUM], dtype=tf.float32, name="q_eval_input")
        self.q_action_input = tf.placeholder(shape=[None, self.ACTION_NUM], dtype=tf.float32)
        self.q_eval_target = tf.placeholder(shape=[None], dtype=tf.float32, name="q_target")

        with tf.variable_scope("eval_net"):
            q_name = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES]

            with tf.variable_scope('l1'):
                w1 = tf.get_variable('w1', [self.STATE_NUM, neuro_layer_1], initializer=w_init, collections=q_name)
                b1 = tf.get_variable('b1', [1, neuro_layer_1], initializer=b_init, collections=q_name)
                l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1)

            with tf.variable_scope('l2'):
                w2 = tf.get_variable('w2', [neuro_layer_1, self.ACTION_NUM], initializer=w_init, collections=q_name)
                b2 = tf.get_variable('b2', [1, self.ACTION_NUM], initializer=b_init, collections=q_name)
                self.q_eval_output = tf.matmul(l1, w2) + b2
                self.q_predict = tf.argmax(self.q_eval_output, 1)

        with tf.variable_scope('loss'):
            # 取出目前動作的得分。
            self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval_output, self.q_action_input), reduction_indices=1)
            self.loss = tf.reduce_mean(tf.square((self.q_eval_target - self.reward_action)))
        
        with tf.variable_scope('train'):
            self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss)
        
        # -------------- 建立 target 神經網絡, 及時提升參數 -------------- #
        self.q_target_input = tf.placeholder(shape=[None, self.STATE_NUM], dtype=tf.float32, name="q_target_input")

        with tf.variable_scope("target_net"):
            t_name = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]

            with tf.variable_scope('l1'):
                w1 = tf.get_variable('w1', [self.STATE_NUM, neuro_layer_1], initializer=w_init, collections=t_name)
                b1 = tf.get_variable('b1', [1, neuro_layer_1], initializer=b_init, collections=t_name)
                l1 = tf.nn.relu(tf.matmul(self.q_target_input, w1) + b1)

            with tf.variable_scope('l2'):
                w2 = tf.get_variable('w2', [neuro_layer_1, self.ACTION_NUM], initializer=w_init, collections=t_name)
                b2 = tf.get_variable('b2', [1, self.ACTION_NUM], initializer=b_init, collections=t_name)
                self.q_target_output = tf.matmul(l1, w2) + b2

    def _replace_target_params(self):
        # 使用 Tensorflow 中的 assign 功能替換 target_net 所有參數
        t_params = tf.get_collection('target_net_params')                       # 提取 target_net 的參數
        e_params = tf.get_collection('eval_net_params')                         # 提取  eval_net 的參數
        self.session.run([tf.assign(t, e) for t, e in zip(t_params, e_params)])    # 更新 target_net 參數

    def select_action(self, state_index):
        """
        根據政策選擇動作。
        :param state_index: 目前狀态。
        :return:
        """
        current_state = self.state_list[state_index:state_index + 1]
        actions_value = self.session.run(self.q_eval_output, feed_dict={self.q_eval_input: current_state})
        action = np.argmax(actions_value)
        current_action_index = action

        # 輸出圖表。
        self.running_q = self.running_q * 0.99 + 0.01 * np.max(actions_value)
        self.q_list.append(self.running_q)

        if np.random.uniform() < self.epsilon:
            current_action_index = np.random.randint(0, self.ACTION_NUM)

        # 開始訓練後,在 epsilon 小于一定的值之前,将逐漸減小 epsilon。
        if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON:
            self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE

        return current_action_index

    def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):
        """
        儲存記憶。
        :param current_state_index: 目前狀态 index。
        :param current_action_index: 動作 index。
        :param current_reward: 獎勵。
        :param next_state_index: 下一個狀态 index。
        :param done: 是否結束。
        :return:
        """
        current_state = self.state_list[current_state_index:current_state_index + 1]
        current_action = self.action_list[current_action_index:current_action_index + 1]
        next_state = self.state_list[next_state_index:next_state_index + 1]
        # 記憶動作(目前狀态, 目前執行的動作, 目前動作的得分,下一個狀态)。
        self.replay_memory_store.append((
            current_state,
            current_action,
            current_reward,
            next_state,
            done))

        # 如果超過記憶的容量,則将最久遠的記憶移除。
        if len(self.replay_memory_store) > self.memory_size:
            self.replay_memory_store.popleft()

        self.memory_counter += 1

    def step(self, state, action):
        """
        執行動作。
        :param state: 目前狀态。
        :param action: 執行的動作。
        :return:
        """
        reward = self.r[state][action]

        next_state = action

        done = False

        if action == 5:
            done = True

        return next_state, reward, done

    def experience_replay(self):
        """
        記憶回放。
        :return:
        """
        # 檢查是否替換 target_net 參數
        if self.learn_step_counter % self.replace_target_stepper == 0:
            self._replace_target_params()

        # 随機選擇一小批記憶樣本。
        batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter
        minibatch = random.sample(self.replay_memory_store, batch)

        batch_state = None
        batch_action = None
        batch_reward = None
        batch_next_state = None
        batch_done = None

        for index in range(len(minibatch)):
            if batch_state is None:
                batch_state = minibatch[index][0]
            elif batch_state is not None:
                batch_state = np.vstack((batch_state, minibatch[index][0]))

            if batch_action is None:
                batch_action = minibatch[index][1]
            elif batch_action is not None:
                batch_action = np.vstack((batch_action, minibatch[index][1]))

            if batch_reward is None:
                batch_reward = minibatch[index][2]
            elif batch_reward is not None:
                batch_reward = np.vstack((batch_reward, minibatch[index][2]))

            if batch_next_state is None:
                batch_next_state = minibatch[index][3]
            elif batch_next_state is not None:
                batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))

            if batch_done is None:
                batch_done = minibatch[index][4]
            elif batch_done is not None:
                batch_done = np.vstack((batch_done, minibatch[index][4]))

        # -------------- 改進部分  -------------- #
        # 獲得 q_next 使用另一個神經網絡 target。
        # q_next:下一個狀态的 Q 值。
        q_next = self.session.run([self.q_target_output], feed_dict={self.q_target_input: batch_next_state})
        # -------------- 改進部分  -------------- #

        q_target = []
        for i in range(len(minibatch)):
            # 目前即時得分。
            current_reward = batch_reward[i][0]

            # # 遊戲是否結束。
            # current_done = batch_done[i][0]

            # 更新 Q 值。
            q_value = current_reward + self.gamma * np.max(q_next[0][i])

            # 當得分小于 -1 時,表示走了不可走的位置。
            if current_reward <= -1:
                q_target.append(current_reward)
            else:
                q_target.append(q_value)

        _, cost, reward = self.session.run([self.train_op, self.loss, self.reward_action],
                                           feed_dict={self.q_eval_input: batch_state,
                                                      self.q_action_input: batch_action,
                                                      self.q_eval_target: q_target})

        self.cost_list.append(cost)

        # if self.step_index % 1000 == 0:
        #     print("loss:", cost)

        self.learn_step_counter += 1

    def train(self):
        """
        訓練。
        :return:
        """
        # 初始化目前狀态。
        current_state = np.random.randint(0, self.ACTION_NUM - 1)
        self.epsilon = self.INITIAL_EPSILON

        while True:
            # 選擇動作。
            action = self.select_action(current_state)

            # 執行動作,得到:下一個狀态,執行動作的得分,是否結束。
            next_state, reward, done = self.step(current_state, action)

            # 儲存記憶。
            self.save_store(current_state, action, reward, next_state, done)

            # 先觀察一段時間累積足夠的記憶在進行訓練。
            if self.step_index > self.OBSERVE:
                self.experience_replay()

            if self.step_index - self.OBSERVE > 15000:
                break

            if done:
                current_state = np.random.randint(0, self.ACTION_NUM - 1)
            else:
                current_state = next_state

            self.step_index += 1

    def pay(self):
        """
        運作并測試。
        :return:
        """
        self.train()

        # 顯示 R 矩陣。
        print(self.r)

        for index in range(5):

            start_room = index

            print("#############################", "Agent 在", start_room, "開始行動", "#############################")

            current_state = start_room

            step = 0

            target_state = 5

            while current_state != target_state:
                out_result = self.session.run(self.q_eval_output, feed_dict={
                    self.q_eval_input: self.state_list[current_state:current_state + 1]})

                next_state = np.argmax(out_result[0])

                print("Agent 由", current_state, "号房間移動到了", next_state, "号房間")

                current_state = next_state

                step += 1

            print("Agent 在", start_room, "号房間開始移動了", step, "步到達了目标房間 5")

            print("#############################", "Agent 在", 5, "結束行動", "#############################")
        
    def show_plt(self):
        import matplotlib.pyplot as plt
        plt.plot(np.array(self.q_list), c='r', label='natural')
        # plt.plot(np.array(q_double), c='b', label='double')
        plt.legend(loc='best')
        plt.ylabel('Q eval')
        plt.xlabel('training steps')
        plt.grid()
        plt.show()

if __name__ == "__main__":
    q_network = DeepQNetwork()
    q_network.pay()
    q_network.show_plt()      

轉載于:https://www.cnblogs.com/cjnmy36723/p/7063092.html