天天看點

跟我學神經網絡4-循環神經網絡

1. 關鍵詞

BPTT

2. 簡介

人們在開始思考時,不是每次都從零開始。比如你讀這篇文章,因為你曾經看過相似的文字,是以也能了解這裡的文字。你不是從頭開始學,你的知識是逐漸積累的。

在多層感覺器中隐含層之間依次次連接配接。當把隐含層折疊起來,就可以得到一個遞歸網絡。如下圖:

跟我學神經網絡4-循環神經網絡

公式表示:

s t = t a n h ( U x t + W s t − 1 ) o t = s o f t m a x ( V s t ) s_t=tanh(Ux_t+Ws_{t-1})\\ o_t=softmax(Vs_t) st​=tanh(Uxt​+Wst−1​)ot​=softmax(Vst​)

式中:

  • x x x 是輸入層;
  • U U U 是輸入層到隐含層權重;
  • W W W 是到上一隐含層到隐含層權重;
  • s s s 是隐含層;
  • V V V 是隐含層到輸出層權重;
  • o o o 是輸出層;
  • tanh 和 softmax 是激活函數。

RNN 适用于輸出有互相聯系的問題,比如自然語言處理(NLP),同一句話中,後面的單詞依賴于前面的。

3. 損失函數

用交叉熵定義 t 時刻損失函數:

E t ( y t , y ^ t ) = − y t log ⁡ y ^ t E_t(y_t,\hat y_t)=-y_t\log\hat y_t Et​(yt​,y^​t​)=−yt​logy^​t​

其中: y t y_t yt​ 是真實值, y ^ t \hat y_t y^​t​ 是預測值。

則總誤差:

E ( y , y ) = ∑ t E t ( y t , y t ^ ) = − ∑ t y t log ⁡ y t ^ E(y,y)=\sum_tE_t(y_t,\hat{y_t})=-\sum_t y_t\log\hat{y_t} E(y,y)=t∑​Et​(yt​,yt​^​)=−t∑​yt​logyt​^​

4. 反向傳播

基于鍊式法則,通過時間的反向傳播(BPTT),權重梯度是所有時刻累加:

∂ E ∂ W = ∑ t ∂ E t ∂ W \frac{\partial E}{\partial W}=\sum_t\frac{\partial E_t}{\partial W} ∂W∂E​=t∑​∂W∂Et​​

因為 U 和 W 影響所有過程,以 E 3 E_3 E3​ 為例,誤差傳播過程:

跟我學神經網絡4-循環神經網絡

W 的梯度:

∂ E 3 ∂ W = ∑ k = 0 3 ∂ E 3 ∂ y ^ 3 ∂ y ^ 3 ∂ y ^ 3 ( ∏ j = k + 1 3 ∂ s j ∂ s j − 1 ) ∂ s k ∂ W \frac{\partial E_3}{\partial W}=\sum_{k=0}^3\frac{\partial E_3}{\partial\hat y_3}\frac{\partial\hat y_3}{\partial\hat y_3}(\prod_{j=k+1}^3\frac{\partial s_j}{\partial s_{j-1}})\frac{\partial s_k}{\partial W} ∂W∂E3​​=k=0∑3​∂y^​3​∂E3​​∂y^​3​∂y^​3​​(j=k+1∏3​∂sj−1​∂sj​​)∂W∂sk​​

V 不需要其他時刻,梯度如下:

∂ E 3 ∂ V = ∂ E 3 ∂ y 3 ^ ∂ y 3 ^ ∂ V = ∂ E 3 ∂ y 3 ^ ∂ y 3 ^ ∂ z 3 ∂ z 3 ∂ V = ( y 3 ^ − y 3 ) ⊗ s 3 \frac{\partial E_3}{\partial V}=\frac{\partial E_3}{\partial \hat{y_3}}\frac{\partial \hat{y_3}}{\partial V}=\frac{\partial E_3}{\partial \hat{y_3}}\frac{\partial \hat{y_3}}{\partial z_3}\frac{\partial z_3}{\partial V}=(\hat{y_3}-y_3)\otimes s_3 ∂V∂E3​​=∂y3​^​∂E3​​∂V∂y3​^​​=∂y3​^​∂E3​​∂z3​∂y3​^​​∂V∂z3​​=(y3​^​−y3​)⊗s3​

當使用 BPTT 算法時,需要設定最大傳播時間。以上圖為例,當設定為4時,t=5時刻,隻需要計算t=1。這也有其實體含義,時間間隔越大,互相的影響越小,如果一直保持,會導緻大量計算。

5. 例子

基于已知的語言,随機生成一句話。參考附錄1的代碼,并修改了其中的錯誤,彙總為一個可執行腳本。執行過程:

  1. 從 Google BigQuery 下載下傳資料;
  2. 使用 NLTK 對資料進行預處理,拆分為句子,句子開頭添加

    SENTENCE_START

    作為輸入,句子結尾添加

    SENTENCE_END

    作為輸出,以此建立了一句話詞的前後關系;
  3. 統計每個詞出現的頻率,隻取前8000個,其他詞用

    UNKNOWN_TOKEN

    替代,對詞進行編号,句子中的文字用數字代替;
  4. 建立一個 RNN 類,詞彙表次元(輸入、輸出)=8000,隐含層神經元=100,反向傳播的最大時間跨度=4,以此得到權重次元,U=V=100x8000,W=100x100,随機初始化權重;
  5. 開始疊代;
  6. 前向傳播,保留所有隐含層和輸出層值;
  7. 反向傳播,得到權重梯度;
  8. 求解新的權重,計算損失;
  9. 疊代到收斂;
  10. 利用訓練後的模型,生成語句。

完整代碼如下

import csv as csv
import itertools
import nltk
import numpy as np
import datetime
import sys


def softmax(z):
    return np.exp(z) / sum(np.exp(z))


class RNNNumpy:

    def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4):
        # Assign instance variables
        self.word_dim = word_dim
        self.hidden_dim = hidden_dim
        self.bptt_truncate = bptt_truncate
        # Randomly initialize the network parameters
        self.U = np.random.uniform(-np.sqrt(1. / word_dim), np.sqrt(1. / word_dim), (hidden_dim, word_dim))
        self.V = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (word_dim, hidden_dim))
        self.W = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (hidden_dim, hidden_dim))

    def forward_propagation(self, x):
        # The total number of time steps
        T = len(x)
        # During forward propagation we save all hidden states in s because need them later.
        # We add one additional element for the initial hidden, which we set to 0
        s = np.zeros((T + 1, self.hidden_dim))
        s[-1] = np.zeros(self.hidden_dim)
        # The outputs at each time step. Again, we save them for later.
        o = np.zeros((T, self.word_dim))
        # For each time step...
        for t in np.arange(T):
            # Note that we are indxing U by x[t]. This is the same as multiplying U with a one-hot vector.
            s[t] = np.tanh(self.U[:, x[t]] + self.W.dot(s[t - 1]))
            o[t] = softmax(self.V.dot(s[t]))
        return [o, s]

    def predict(self, x):
        # Perform forward propagation and return index of the highest score
        o, s = self.forward_propagation(x)
        return np.argmax(o, axis=1)

    def calculate_total_loss(self, x, y):
        L = 0
        # For each sentence...
        for i in np.arange(len(y)):
            o, s = self.forward_propagation(x[i])
            # We only care about our prediction of the "correct" words
            correct_word_predictions = o[np.arange(len(y[i])), y[i]]
            # Add to the loss based on how off we were
            L += -1 * np.sum(np.log(correct_word_predictions))
        return L

    def calculate_loss(self, x, y):
        # Divide the total loss by the number of training examples
        N = np.sum((len(y_i) for y_i in y))
        return self.calculate_total_loss(x, y) / N

    def bptt(self, x, y):
        T = len(y)
        # Perform forward propagation
        o, s = self.forward_propagation(x)
        # We accumulate the gradients in these variables
        dLdU = np.zeros(self.U.shape)
        dLdV = np.zeros(self.V.shape)
        dLdW = np.zeros(self.W.shape)
        delta_o = o
        delta_o[np.arange(len(y)), y] -= 1.
        # For each output backwards...
        for t in np.arange(T)[::-1]:
            dLdV += np.outer(delta_o[t], s[t].T)
            # Initial delta calculation
            delta_t = self.V.T.dot(delta_o[t]) * (1 - (s[t] ** 2))
            # Backpropagation through time (for at most self.bptt_truncate steps)
            for bptt_step in np.arange(max(0, t - self.bptt_truncate), t + 1)[::-1]:
                # print ("Backpropagation step t=%d bptt step=%d" % (t, bptt_step))
                dLdW += np.outer(delta_t, s[bptt_step - 1])
                dLdU[:, x[bptt_step]] += delta_t
                # Update delta for next step
                delta_t = self.W.T.dot(delta_t) * (1 - s[bptt_step - 1] ** 2)
        return [dLdU, dLdV, dLdW]

    # Performs one step of SGD.
    def sgd_step(self, x, y, learning_rate):
        # Calculate the gradients
        dLdU, dLdV, dLdW = self.bptt(x, y)
        # Change parameters according to gradients and learning rate
        self.U -= learning_rate * dLdU
        self.V -= learning_rate * dLdV
        self.W -= learning_rate * dLdW

    def gradient_check(self, x, y, h=0.001, error_threshold=0.01):
        # Calculate the gradients using backpropagation. We want to checker if these are correct.
        bptt_gradients = self.bptt(x, y)
        # List of all parameters we want to check.
        model_parameters = ['U', 'V', 'W']
        # Gradient check for each parameter
        for pidx, pname in enumerate(model_parameters):
            # Get the actual parameter value from the mode, e.g. model.W
            parameter = operator.attrgetter(pname)(self)
            print("Performing gradient check for parameter % s with size % d." % (pname, np.prod(parameter.shape)))
            # Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ...
            it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite'])
            while not it.finished:
                ix = it.multi_index
                # Save the original value so we can reset it later
                original_value = parameter[ix]
                # Estimate the gradient using (f(x+h) - f(x-h))/(2*h)
                parameter[ix] = original_value + h
                gradplus = self.calculate_total_loss([x], [y])
                parameter[ix] = original_value - h
                gradminus = self.calculate_total_loss([x], [y])
                estimated_gradient = (gradplus - gradminus) / (2 * h)
                # Reset parameter to original value
                parameter[ix] = original_value
                # The gradient for this parameter calculated using backpropagation
                backprop_gradient = bptt_gradients[pidx][ix]
                # calculate The relative error: (|x - y|/(|x| + |y|))
                relative_error = np.abs(backprop_gradient - estimated_gradient) / (
                        np.abs(backprop_gradient) + np.abs(estimated_gradient))
                # If the error is to large fail the gradient check
                if relative_error > error_threshold:
                    print("Gradient Check ERROR: parameter = % s ix = % s " % (pname, ix))
                    print("+h Loss: % f " % gradplus)
                    print("-h Loss: % f " % gradminus)
                    print("stimated_gradient: % f" % estimated_gradient)
                    print("Backpropagation gradient: % f " % backprop_gradient)
                    print("Relative Error: % f " % relative_error)
                    return
                it.iternext()
            print(" Gradient check  for parameter % s passed." % (pname))


# Outer SGD Loop
# - model: The RNN model instance
# - X_train: The training data set
# - y_train: The training data labels
# - learning_rate: Initial learning rate for SGD
# - nepoch: Number of times to iterate through the complete dataset
# - evaluate_loss_after: Evaluate the loss after this many epochs
def train_with_sgd(model, X_train, y_train, learning_rate=0.005, nepoch=100, evaluate_loss_after=5):
    # We keep track of the losses so we can plot them later
    losses = []
    num_examples_seen = 0
    for epoch in range(nepoch):
        # Optionally evaluate the loss
        if (epoch % evaluate_loss_after == 0):
            loss = model.calculate_loss(X_train, y_train)
            losses.append((num_examples_seen, loss))
            time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print(" % s: Loss after num_examples_seen = % d epoch = % d: % f " % (
                time, num_examples_seen, epoch, loss))
            # Adjust the learning rate if loss increases
            if (len(losses) > 1 and losses[-1][1] > losses[-2][1]):
                learning_rate = learning_rate * 0.5
                print("Setting learning rate to % f " % learning_rate)
            sys.stdout.flush()
        # For each training example...
        for i in range(len(y_train)):
            # One SGD step
            model.sgd_step(X_train[i], y_train[i], learning_rate)
            num_examples_seen += 1


def generate_sentence(model):
    # We start the sentence with the start token
    new_sentence = [word_to_index[sentence_start_token]]
    # Repeat until we get an end token
    while not new_sentence[-1] == word_to_index[sentence_end_token]:
        next_word_probs = model.forward_propagation(new_sentence)
        sampled_word = word_to_index[unknown_token]
        # We don't want to sample unknown words
        while sampled_word == word_to_index[unknown_token]:
            samples = np.random.multinomial(1, next_word_probs[-1])
            sampled_word = np.argmax(samples)
        new_sentence.append(sampled_word)
    sentence_str = [index_to_word[x] for x in new_sentence[1:-1]]
    return sentence_str


vocabulary_size = 8000
unknown_token = "UNKNOWN_TOKEN "
sentence_start_token = "SENTENCE_START "
sentence_end_token = "SENTENCE_END "

# Read the data and append SENTENCE_START and SENTENCE_END tokens
print("Reading CSV file... ")
with open('data/reddit-comments-2015-08.csv', encoding='utf-8', mode='r') as f:
    reader = csv.reader(f, skipinitialspace=True)
    next(reader)
    # Split full comments into sentences
    sentences = itertools.chain(*[nltk.sent_tokenize(x[0].lower()) for x in reader])
    # Append SENTENCE_START and SENTENCE_END
    sentences = [" % s % s % s " % (sentence_start_token, x, sentence_end_token)
                 for x in sentences]

print("Parsed % d sentences. " % (len(sentences)))

# Tokenize the sentences into words
tokenized_sentences = [nltk.word_tokenize(sent) for sent in sentences]

# Count the word frequencies
word_freq = nltk.FreqDist(itertools.chain(*tokenized_sentences))
print("Found % d unique words tokens. " % len(word_freq.items()))

# Get the most common words and build index_to_word and word_to_index vectors
vocab = word_freq.most_common(vocabulary_size - 1)
index_to_word = [x[0] for x in vocab]
index_to_word.append(unknown_token)
word_to_index = dict([(w, i) for i, w in enumerate(index_to_word)])

print("Using vocabulary size % d. " % vocabulary_size)
print("The least frequent word in our vocabulary is '%s' and appeared % d times. " % (vocab[-1][0], vocab[-1][1]))

# Replace all words not in our vocabulary with the unknown token
for i, sent in enumerate(tokenized_sentences):
    tokenized_sentences[i] = [w if w in word_to_index else unknown_token for w in sent]

print("\nExample sentence: '%s' " % sentences[0])
print("\nExample sentence after Pre - processing: '%s' " % tokenized_sentences[0])

# Create the training data
X_train = np.asarray([[word_to_index[w] for w in sent[:-1]] for sent in tokenized_sentences])
y_train = np.asarray([[word_to_index[w] for w in sent[1:]] for sent in tokenized_sentences])

np.random.seed(10)
# Train on a small subset of the data to see what happens
model = RNNNumpy(vocabulary_size)
losses = train_with_sgd(model, X_train[:100], y_train[:100], nepoch=10, evaluate_loss_after=1)

num_sentences = 10
senten_min_length = 7

for i in range(num_sentences):
    sent = []
    # We want long sentences, not sentences with one or two words
    while len(sent) < senten_min_length:
        sent = generate_sentence(model)
    print("".join(sent))

           

6. 附錄

6.1 中英文對照表

英文 中文 縮寫 數學
Backpropagation Through Time 基于時間的反向傳播 BPTT
Cross Entropy 交叉熵
Long Short Term Memory Networks 長短期記憶網絡 LSTM
Natural Language Processing 自然語言處理 NLP
Recurrent Neural Networks 循環(遞歸)神經網絡 RNN

6.2 擴充閱讀

  1. RNN 教程

繼續閱讀