天天看點

TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

前一時刻的特征對後一時刻産生影響

CNN-視覺

RNN-自然語言

RNN-所有特征都記下來,誤差大

LSTM-合理遺忘

自然語言處理-詞向量模型-Word2Vec

目的是構造向量

先來考慮第一個問題:如何能将文本向量化呢?看起來比較抽象,可以先從人的角度來觀察。

TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

輸入:第一個詞,第二個詞

輸出:第三個詞,找那個詞的機率最大

TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

紅框向右滑動,步長為1

TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

CBOW:輸入是上文、下文,輸出是中間文

Skipgram:輸入中間文,輸出上文、下文

這兩個效果差不多

反向傳播

CNN是對w求導即可

RNN分别對w和x求偏導

如果一個語料庫稍微大一些,可能的結果簡直太多了,最後一層相當于softmax,計算起來十分耗時,有什麼辦法來解決嘛?

初始方案:輸入兩個單詞,看他們是不是前後對應的輸入和輸出,也就相當于一個二分類任務

出發點非常好,但是此時訓練集建構出來的标簽全為1,無法進行較好的訓練

改進方案:加入一些負樣本(負采樣模型)

TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

負采樣推介設為5

執行個體

import collections
import os
import random
import urllib
import zipfile

import numpy as np
import tensorflow as tf
           
# 訓練參數
learning_rate = 0.1		#學習率
batch_size = 128
num_steps = 3000000		#訓練次數,這個案例次數較小,因為樣本小
display_step = 10000		#每隔多少次列印目前損失值
eval_step = 200000		#每隔多少次試驗一下效果

# 測試樣例
eval_words = ['nine', 'of', 'going', 'hardware', 'american', 'britain']

# Word2Vec 參數
embedding_size = 200 # 詞向量次元,可調
max_vocabulary_size = 50000 # 語料庫詞語數,不重複的詞,案例較大
min_occurrence = 10 # 最小詞頻,詞頻少的去掉
skip_window = 3 # 左右視窗大小
num_skips = 2 # 一次制作多少個輸入輸出對
num_sampled = 64 # 負采樣,每次選擇多少樣本
           
# 加載訓練資料,其實什麼資料都行
data_path = 'text8.zip'
with zipfile.ZipFile(data_path) as f:
    text_words = f.read(f.namelist()[0]).lower().split()
           
17005207

詞頻控制

counter,計數器,統計每個詞出現的次數

# 建立一個計數器,計算每個詞出現了多少次
count = [('UNK', -1)]		#UNK:unknow
# 基于詞頻傳回max_vocabulary_size個常用詞嗎,上面設定了50000個
count.extend(collections.Counter(text_words).most_common(max_vocabulary_size - 1))
           

預設已經是排序好了的

[(‘UNK’, -1),

(b’the’, 1061396),

(b’of’, 593677),

(b’and’, 416629),

(b’one’, 411764),

(b’in’, 372201),

(b’a’, 325873),

(b’to’, 316376),

(b’zero’, 264975),

(b’nine’, 250430)]

最小值控制

别忘了,咱們還設定了min_occurrence參數,需要判斷每一個詞是否滿足給定條件

# 剔除掉出現次數少于'min_occurrence'的詞
for i in range(len(count) - 1, -1, -1):# 從start到end每次step多少,每次-1
    if count[i][1] < min_occurrence:
        count.pop(i)
    else:
        # 判斷時,從小到大排序的,是以跳出時候剩下的都是滿足條件的
        break
           

詞-ID映射

每個詞對應一個ID

# 計算語料庫大小
vocabulary_size = len(count)
# 每個詞都配置設定一個ID
word2id = dict()
for i, (word, _)in enumerate(count):
    word2id[word] = i
           
word2id
           
TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

所有詞轉換成ID

data = list()
unk_count = 0
for word in text_words:
    # 全部轉換成id
    index = word2id.get(word, 0)		#0表示unknown
    if index == 0:
        unk_count += 1
    data.append(index)
count[0] = ('UNK', unk_count)
id2word = dict(zip(word2id.values(), word2id.keys()))		#word to ID 的反結構

print("Words count:", len(text_words))
print("Unique words:", len(set(text_words)))		#不重複此的大小
print("Vocabulary size:", vocabulary_size)		#原來50000的那個,有些不滿足詞頻
print("Most common words:", count[:10])
           
TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

建構所需訓練資料

data_index = 0

def next_batch(batch_size, num_skips, skip_window):		#視窗操作
    global data_index
    assert batch_size % num_skips == 0
    assert num_skips <= 2 * skip_window
    batch = np.ndarray(shape=(batch_size), dtype=np.int32)
    labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
    # get window size (words left and right + current one).
    span = 2 * skip_window + 1 #7為視窗,左3右3中間1
    buffer = collections.deque(maxlen=span)#建立一個長度為7的隊列
    if data_index + span > len(data):#如果資料被滑完一遍了
        data_index = 0
    buffer.extend(data[data_index:data_index + span])#隊列裡存的是目前視窗,例如deque([5234, 3081, 12, 6, 195, 2, 3134], maxlen=7)		#5234是ID
    data_index += span
    for i in range(batch_size // num_skips):#num_skips表示取多少組不同的詞作為輸出,此例為2,//為除法忽略餘數
        context_words = [w for w in range(span) if w != skip_window]#指定上下文,就是[0, 1, 2, 4, 5, 6],span==7
        words_to_use = random.sample(context_words, num_skips)#在上下文裡随機選2個候選詞
        for j, context_word in enumerate(words_to_use):#周遊每一個候選詞,用其當做輸出也就是标簽
            batch[i * num_skips + j] = buffer[skip_window]#輸入都為目前視窗的中間詞,即3
            labels[i * num_skips + j, 0] = buffer[context_word]#用目前候選詞當做标簽
        if data_index == len(data):
            buffer.extend(data[0:span])
            data_index = span
        else:
            buffer.append(data[data_index])#之前已經傳入7個詞了,視窗要右移了,例如原來為[5234, 3081, 12, 6, 195, 2, 3134],現在為[3081, 12, 6, 195, 2, 3134, 46]
            data_index += 1

    data_index = (data_index + len(data) - span) % len(data)
    return batch, labels
           
with tf.device('/cpu:0'):   	#embedding:詞映射成向量
    embedding = tf.Variable(tf.random.normal([vocabulary_size, embedding_size])) #次元:47135, 200,随機初始化向量
    nce_weights = tf.Variable(tf.random.normal([vocabulary_size, embedding_size]))
    nce_biases = tf.Variable(tf.zeros([vocabulary_size]))
           

通過tf.nn.embedding_lookup函數将索引轉換成詞向量

def get_embedding(x):
    with tf.device('/cpu:0'):
        
        x_embed = tf.nn.embedding_lookup(embedding, x)		#找到索引對應的向量
        return x_embed
           

損失函數定義

先分别計算出正樣本和采樣出的負樣本對應的output和label

再通過 sigmoid cross entropy來計算output和label的loss

def nce_loss(x_embed, y):
    with tf.device('/cpu:0'):
        y = tf.cast(y, tf.int64)
        loss = tf.reduce_mean(
            tf.nn.nce_loss(weights=nce_weights,
                           biases=nce_biases,
                           labels=y,
                           inputs=x_embed,
                           num_sampled=num_sampled,#采樣出多少個負樣本
                           num_classes=vocabulary_size))
        return loss
           

測試觀察子產品

# Evaluation.評估子產品,找到最相近向量
def evaluate(x_embed):
    with tf.device('/cpu:0'):
        # Compute the cosine similarity between input data embedding and every embedding vectors
        x_embed = tf.cast(x_embed, tf.float32)		#訓練好之後某一詞的向量
        x_embed_norm = x_embed / tf.sqrt(tf.reduce_sum(tf.square(x_embed)))#歸一化
        embedding_norm = embedding / tf.sqrt(tf.reduce_sum(tf.square(embedding), 1, keepdims=True), tf.float32)#全部向量的
        cosine_sim_op = tf.matmul(x_embed_norm, embedding_norm, transpose_b=True)#計算餘弦相似度
        return cosine_sim_op

# SGD,驗證子產品
optimizer = tf.optimizers.SGD(learning_rate)
           
# 疊代優化
def run_optimization(x, y):
    with tf.device('/cpu:0'):
        with tf.GradientTape() as g:
            emb = get_embedding(x)
            loss = nce_loss(emb, y)

        # 計算梯度
        gradients = g.gradient(loss, [embedding, nce_weights, nce_biases])

        # 更新
        optimizer.apply_gradients(zip(gradients, [embedding, nce_weights, nce_biases]))
           
# 待測試的幾個詞
x_test = np.array([word2id[w.encode('utf-8')] for w in eval_words])

# 訓練
for step in range(1, num_steps + 1):
    batch_x, batch_y = next_batch(batch_size, num_skips, skip_window)
    run_optimization(batch_x, batch_y)
    
    if step % display_step == 0 or step == 1:
        loss = nce_loss(get_embedding(batch_x), batch_y)
        print("step: %i, loss: %f" % (step, loss))
        
    # Evaluation.
    if step % eval_step == 0 or step == 1:
        print("Evaluation...")
        sim = evaluate(get_embedding(x_test)).numpy()
        for i in range(len(eval_words)):
            top_k = 8  # 傳回前8個最相似的
            nearest = (-sim[i, :]).argsort()[1:top_k + 1]
            log_str = '"%s" nearest neighbors:' % eval_words[i]
            for k in range(top_k):
                log_str = '%s %s,' % (log_str, id2word[nearest[k]])
            print(log_str)
           
TensorFlow2.0.0_課時52-62_遞歸神經網絡(RNN)

繼續閱讀