1. 關鍵詞
BPTT
2. 簡介
人們在開始思考時,不是每次都從零開始。比如你讀這篇文章,因為你曾經看過相似的文字,是以也能了解這裡的文字。你不是從頭開始學,你的知識是逐漸積累的。
在多層感覺器中隐含層之間依次次連接配接。當把隐含層折疊起來,就可以得到一個遞歸網絡。如下圖:
公式表示:
s t = t a n h ( U x t + W s t − 1 ) o t = s o f t m a x ( V s t ) s_t=tanh(Ux_t+Ws_{t-1})\\ o_t=softmax(Vs_t) st=tanh(Uxt+Wst−1)ot=softmax(Vst)
式中:
- x x x 是輸入層;
- U U U 是輸入層到隐含層權重;
- W W W 是到上一隐含層到隐含層權重;
- s s s 是隐含層;
- V V V 是隐含層到輸出層權重;
- o o o 是輸出層;
- tanh 和 softmax 是激活函數。
RNN 适用于輸出有互相聯系的問題,比如自然語言處理(NLP),同一句話中,後面的單詞依賴于前面的。
3. 損失函數
用交叉熵定義 t 時刻損失函數:
E t ( y t , y ^ t ) = − y t log y ^ t E_t(y_t,\hat y_t)=-y_t\log\hat y_t Et(yt,y^t)=−ytlogy^t
其中: y t y_t yt 是真實值, y ^ t \hat y_t y^t 是預測值。
則總誤差:
E ( y , y ) = ∑ t E t ( y t , y t ^ ) = − ∑ t y t log y t ^ E(y,y)=\sum_tE_t(y_t,\hat{y_t})=-\sum_t y_t\log\hat{y_t} E(y,y)=t∑Et(yt,yt^)=−t∑ytlogyt^
4. 反向傳播
基于鍊式法則,通過時間的反向傳播(BPTT),權重梯度是所有時刻累加:
∂ E ∂ W = ∑ t ∂ E t ∂ W \frac{\partial E}{\partial W}=\sum_t\frac{\partial E_t}{\partial W} ∂W∂E=t∑∂W∂Et
因為 U 和 W 影響所有過程,以 E 3 E_3 E3 為例,誤差傳播過程:
W 的梯度:
∂ E 3 ∂ W = ∑ k = 0 3 ∂ E 3 ∂ y ^ 3 ∂ y ^ 3 ∂ y ^ 3 ( ∏ j = k + 1 3 ∂ s j ∂ s j − 1 ) ∂ s k ∂ W \frac{\partial E_3}{\partial W}=\sum_{k=0}^3\frac{\partial E_3}{\partial\hat y_3}\frac{\partial\hat y_3}{\partial\hat y_3}(\prod_{j=k+1}^3\frac{\partial s_j}{\partial s_{j-1}})\frac{\partial s_k}{\partial W} ∂W∂E3=k=0∑3∂y^3∂E3∂y^3∂y^3(j=k+1∏3∂sj−1∂sj)∂W∂sk
V 不需要其他時刻,梯度如下:
∂ E 3 ∂ V = ∂ E 3 ∂ y 3 ^ ∂ y 3 ^ ∂ V = ∂ E 3 ∂ y 3 ^ ∂ y 3 ^ ∂ z 3 ∂ z 3 ∂ V = ( y 3 ^ − y 3 ) ⊗ s 3 \frac{\partial E_3}{\partial V}=\frac{\partial E_3}{\partial \hat{y_3}}\frac{\partial \hat{y_3}}{\partial V}=\frac{\partial E_3}{\partial \hat{y_3}}\frac{\partial \hat{y_3}}{\partial z_3}\frac{\partial z_3}{\partial V}=(\hat{y_3}-y_3)\otimes s_3 ∂V∂E3=∂y3^∂E3∂V∂y3^=∂y3^∂E3∂z3∂y3^∂V∂z3=(y3^−y3)⊗s3
當使用 BPTT 算法時,需要設定最大傳播時間。以上圖為例,當設定為4時,t=5時刻,隻需要計算t=1。這也有其實體含義,時間間隔越大,互相的影響越小,如果一直保持,會導緻大量計算。
5. 例子
基于已知的語言,随機生成一句話。參考附錄1的代碼,并修改了其中的錯誤,彙總為一個可執行腳本。執行過程:
- 從 Google BigQuery 下載下傳資料;
- 使用 NLTK 對資料進行預處理,拆分為句子,句子開頭添加
作為輸入,句子結尾添加SENTENCE_START
作為輸出,以此建立了一句話詞的前後關系;SENTENCE_END
- 統計每個詞出現的頻率,隻取前8000個,其他詞用
替代,對詞進行編号,句子中的文字用數字代替;UNKNOWN_TOKEN
- 建立一個 RNN 類,詞彙表次元(輸入、輸出)=8000,隐含層神經元=100,反向傳播的最大時間跨度=4,以此得到權重次元,U=V=100x8000,W=100x100,随機初始化權重;
- 開始疊代;
- 前向傳播,保留所有隐含層和輸出層值;
- 反向傳播,得到權重梯度;
- 求解新的權重,計算損失;
- 疊代到收斂;
- 利用訓練後的模型,生成語句。
完整代碼如下
import csv as csv
import itertools
import nltk
import numpy as np
import datetime
import sys
def softmax(z):
return np.exp(z) / sum(np.exp(z))
class RNNNumpy:
def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4):
# Assign instance variables
self.word_dim = word_dim
self.hidden_dim = hidden_dim
self.bptt_truncate = bptt_truncate
# Randomly initialize the network parameters
self.U = np.random.uniform(-np.sqrt(1. / word_dim), np.sqrt(1. / word_dim), (hidden_dim, word_dim))
self.V = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (word_dim, hidden_dim))
self.W = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (hidden_dim, hidden_dim))
def forward_propagation(self, x):
# The total number of time steps
T = len(x)
# During forward propagation we save all hidden states in s because need them later.
# We add one additional element for the initial hidden, which we set to 0
s = np.zeros((T + 1, self.hidden_dim))
s[-1] = np.zeros(self.hidden_dim)
# The outputs at each time step. Again, we save them for later.
o = np.zeros((T, self.word_dim))
# For each time step...
for t in np.arange(T):
# Note that we are indxing U by x[t]. This is the same as multiplying U with a one-hot vector.
s[t] = np.tanh(self.U[:, x[t]] + self.W.dot(s[t - 1]))
o[t] = softmax(self.V.dot(s[t]))
return [o, s]
def predict(self, x):
# Perform forward propagation and return index of the highest score
o, s = self.forward_propagation(x)
return np.argmax(o, axis=1)
def calculate_total_loss(self, x, y):
L = 0
# For each sentence...
for i in np.arange(len(y)):
o, s = self.forward_propagation(x[i])
# We only care about our prediction of the "correct" words
correct_word_predictions = o[np.arange(len(y[i])), y[i]]
# Add to the loss based on how off we were
L += -1 * np.sum(np.log(correct_word_predictions))
return L
def calculate_loss(self, x, y):
# Divide the total loss by the number of training examples
N = np.sum((len(y_i) for y_i in y))
return self.calculate_total_loss(x, y) / N
def bptt(self, x, y):
T = len(y)
# Perform forward propagation
o, s = self.forward_propagation(x)
# We accumulate the gradients in these variables
dLdU = np.zeros(self.U.shape)
dLdV = np.zeros(self.V.shape)
dLdW = np.zeros(self.W.shape)
delta_o = o
delta_o[np.arange(len(y)), y] -= 1.
# For each output backwards...
for t in np.arange(T)[::-1]:
dLdV += np.outer(delta_o[t], s[t].T)
# Initial delta calculation
delta_t = self.V.T.dot(delta_o[t]) * (1 - (s[t] ** 2))
# Backpropagation through time (for at most self.bptt_truncate steps)
for bptt_step in np.arange(max(0, t - self.bptt_truncate), t + 1)[::-1]:
# print ("Backpropagation step t=%d bptt step=%d" % (t, bptt_step))
dLdW += np.outer(delta_t, s[bptt_step - 1])
dLdU[:, x[bptt_step]] += delta_t
# Update delta for next step
delta_t = self.W.T.dot(delta_t) * (1 - s[bptt_step - 1] ** 2)
return [dLdU, dLdV, dLdW]
# Performs one step of SGD.
def sgd_step(self, x, y, learning_rate):
# Calculate the gradients
dLdU, dLdV, dLdW = self.bptt(x, y)
# Change parameters according to gradients and learning rate
self.U -= learning_rate * dLdU
self.V -= learning_rate * dLdV
self.W -= learning_rate * dLdW
def gradient_check(self, x, y, h=0.001, error_threshold=0.01):
# Calculate the gradients using backpropagation. We want to checker if these are correct.
bptt_gradients = self.bptt(x, y)
# List of all parameters we want to check.
model_parameters = ['U', 'V', 'W']
# Gradient check for each parameter
for pidx, pname in enumerate(model_parameters):
# Get the actual parameter value from the mode, e.g. model.W
parameter = operator.attrgetter(pname)(self)
print("Performing gradient check for parameter % s with size % d." % (pname, np.prod(parameter.shape)))
# Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ...
it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite'])
while not it.finished:
ix = it.multi_index
# Save the original value so we can reset it later
original_value = parameter[ix]
# Estimate the gradient using (f(x+h) - f(x-h))/(2*h)
parameter[ix] = original_value + h
gradplus = self.calculate_total_loss([x], [y])
parameter[ix] = original_value - h
gradminus = self.calculate_total_loss([x], [y])
estimated_gradient = (gradplus - gradminus) / (2 * h)
# Reset parameter to original value
parameter[ix] = original_value
# The gradient for this parameter calculated using backpropagation
backprop_gradient = bptt_gradients[pidx][ix]
# calculate The relative error: (|x - y|/(|x| + |y|))
relative_error = np.abs(backprop_gradient - estimated_gradient) / (
np.abs(backprop_gradient) + np.abs(estimated_gradient))
# If the error is to large fail the gradient check
if relative_error > error_threshold:
print("Gradient Check ERROR: parameter = % s ix = % s " % (pname, ix))
print("+h Loss: % f " % gradplus)
print("-h Loss: % f " % gradminus)
print("stimated_gradient: % f" % estimated_gradient)
print("Backpropagation gradient: % f " % backprop_gradient)
print("Relative Error: % f " % relative_error)
return
it.iternext()
print(" Gradient check for parameter % s passed." % (pname))
# Outer SGD Loop
# - model: The RNN model instance
# - X_train: The training data set
# - y_train: The training data labels
# - learning_rate: Initial learning rate for SGD
# - nepoch: Number of times to iterate through the complete dataset
# - evaluate_loss_after: Evaluate the loss after this many epochs
def train_with_sgd(model, X_train, y_train, learning_rate=0.005, nepoch=100, evaluate_loss_after=5):
# We keep track of the losses so we can plot them later
losses = []
num_examples_seen = 0
for epoch in range(nepoch):
# Optionally evaluate the loss
if (epoch % evaluate_loss_after == 0):
loss = model.calculate_loss(X_train, y_train)
losses.append((num_examples_seen, loss))
time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(" % s: Loss after num_examples_seen = % d epoch = % d: % f " % (
time, num_examples_seen, epoch, loss))
# Adjust the learning rate if loss increases
if (len(losses) > 1 and losses[-1][1] > losses[-2][1]):
learning_rate = learning_rate * 0.5
print("Setting learning rate to % f " % learning_rate)
sys.stdout.flush()
# For each training example...
for i in range(len(y_train)):
# One SGD step
model.sgd_step(X_train[i], y_train[i], learning_rate)
num_examples_seen += 1
def generate_sentence(model):
# We start the sentence with the start token
new_sentence = [word_to_index[sentence_start_token]]
# Repeat until we get an end token
while not new_sentence[-1] == word_to_index[sentence_end_token]:
next_word_probs = model.forward_propagation(new_sentence)
sampled_word = word_to_index[unknown_token]
# We don't want to sample unknown words
while sampled_word == word_to_index[unknown_token]:
samples = np.random.multinomial(1, next_word_probs[-1])
sampled_word = np.argmax(samples)
new_sentence.append(sampled_word)
sentence_str = [index_to_word[x] for x in new_sentence[1:-1]]
return sentence_str
vocabulary_size = 8000
unknown_token = "UNKNOWN_TOKEN "
sentence_start_token = "SENTENCE_START "
sentence_end_token = "SENTENCE_END "
# Read the data and append SENTENCE_START and SENTENCE_END tokens
print("Reading CSV file... ")
with open('data/reddit-comments-2015-08.csv', encoding='utf-8', mode='r') as f:
reader = csv.reader(f, skipinitialspace=True)
next(reader)
# Split full comments into sentences
sentences = itertools.chain(*[nltk.sent_tokenize(x[0].lower()) for x in reader])
# Append SENTENCE_START and SENTENCE_END
sentences = [" % s % s % s " % (sentence_start_token, x, sentence_end_token)
for x in sentences]
print("Parsed % d sentences. " % (len(sentences)))
# Tokenize the sentences into words
tokenized_sentences = [nltk.word_tokenize(sent) for sent in sentences]
# Count the word frequencies
word_freq = nltk.FreqDist(itertools.chain(*tokenized_sentences))
print("Found % d unique words tokens. " % len(word_freq.items()))
# Get the most common words and build index_to_word and word_to_index vectors
vocab = word_freq.most_common(vocabulary_size - 1)
index_to_word = [x[0] for x in vocab]
index_to_word.append(unknown_token)
word_to_index = dict([(w, i) for i, w in enumerate(index_to_word)])
print("Using vocabulary size % d. " % vocabulary_size)
print("The least frequent word in our vocabulary is '%s' and appeared % d times. " % (vocab[-1][0], vocab[-1][1]))
# Replace all words not in our vocabulary with the unknown token
for i, sent in enumerate(tokenized_sentences):
tokenized_sentences[i] = [w if w in word_to_index else unknown_token for w in sent]
print("\nExample sentence: '%s' " % sentences[0])
print("\nExample sentence after Pre - processing: '%s' " % tokenized_sentences[0])
# Create the training data
X_train = np.asarray([[word_to_index[w] for w in sent[:-1]] for sent in tokenized_sentences])
y_train = np.asarray([[word_to_index[w] for w in sent[1:]] for sent in tokenized_sentences])
np.random.seed(10)
# Train on a small subset of the data to see what happens
model = RNNNumpy(vocabulary_size)
losses = train_with_sgd(model, X_train[:100], y_train[:100], nepoch=10, evaluate_loss_after=1)
num_sentences = 10
senten_min_length = 7
for i in range(num_sentences):
sent = []
# We want long sentences, not sentences with one or two words
while len(sent) < senten_min_length:
sent = generate_sentence(model)
print("".join(sent))
6. 附錄
6.1 中英文對照表
英文 | 中文 | 縮寫 | 數學 |
---|---|---|---|
Backpropagation Through Time | 基于時間的反向傳播 | BPTT | |
Cross Entropy | 交叉熵 | ||
Long Short Term Memory Networks | 長短期記憶網絡 | LSTM | |
Natural Language Processing | 自然語言處理 | NLP | |
Recurrent Neural Networks | 循環(遞歸)神經網絡 | RNN |
6.2 擴充閱讀
- RNN 教程