天天看點

Word2Vec之Skip-Gram與CBOW模型原理

word2vec是一個開源的nlp工具,它可以将所有的詞向量化。至于什麼是詞向量,最開始是我們熟悉的one-hot編碼,但是這種表示方法孤立了每個詞,不能表示詞之間的關系,當然也有次元過大的原因。後面發展出一種方法,術語是詞嵌入。

[1.詞嵌入]

詞嵌入(Word Embedding)是NLP中語言模型與表征學習技術的統稱,它就是将one-hot表示的詞“嵌入”到一個低維空間中,簡單點就是嵌入矩陣E與詞的one-hot的乘積,數學關系為:

Word2Vec之Skip-Gram與CBOW模型原理

舉個例子,如果有2000個詞的字典,那麼每個詞的one-hot形式就是2000*1,如果學習到嵌入矩陣為300 * 2000的化,那麼将生成300 * 1的詞向量,就那麼簡單。還有,嵌入矩陣一般這樣表示,頻數是統計來的:

Word2Vec之Skip-Gram與CBOW模型原理

[2.Word2Vec]

Word2Vec是很流行的詞嵌入算法,它通常包含兩個模型,一個是Skip-Gram模型,這個模型的思想是,標明中間的詞(Context),然後在這個詞的正負n個詞内選擇目标詞(Target)與之對應,構造一個用Context預測輸出為Target的監督學習問題,訓練一個如下圖結構的網絡(吳恩達的課件):

Word2Vec之Skip-Gram與CBOW模型原理

Skip-Gram

其實還有其他的一些細節,但是說的太細了,寫不完,具體我一會慢慢完善。

還有一個是CBOW(Continuous Bag-of-Words Model),它的工作原理與Skip-Gram原理相反,他是用上下文預測中間的詞。

接下來,我将用tensorflow來實作一個Skip-Gram,代碼來自于tensorflow的word2vec的改編,是基于這位

部落客

,如果想看可以去github看tensorflow的例子。不羅嗦,直接上代碼。

3. 高亮一段代碼[^code]

import time
import numpy as np
import tensorflow as tf
import random
from collections import Counter

with open('data/text8') as f:
    text = f.read()


#定義函數來完成資料的預處理
def preprocess(text, freq=5):
    '''
    對文本進行預處理
    :param text: 文本資料
    :param freq: 詞頻門檻值
    '''
    #對文本中的符号進行替換,下面都是用英文翻譯來替代符号
    text = text.lower()
    text = text.replace('.', ' <PERIOD> ')
    text = text.replace(',', ' <COMMA> ')
    text = text.replace('"', ' <QUOTATION_MARK> ')
    text = text.replace(';', ' <SEMICOLON> ')
    text = text.replace('!', ' <EXCLAMATION_MARK> ')
    text = text.replace('?', ' <QUESTION_MARK> ')
    text = text.replace('(', ' <LEFT_PAREN> ')
    text = text.replace(')', ' <RIGHT_PAREN> ')
    text = text.replace('--', ' <HYPHENS> ')
    text = text.replace(':', ' <COLON> ')
    words = text.split()

    #删除低頻詞,減少噪音影響
    word_counts = Counter(words)
    trimmed_words = [word for word in words if word_counts[word] > freq] #這句話很好,可以模仿

    return trimmed_words


#清洗文本并分詞
words = preprocess(text)
# print(words[: 200])

#建構映射表
vocab = set(words)
vocab_to_int = {w: c for c, w in enumerate(vocab)}
int_to_vocab = {c: w for c, w in enumerate(vocab)}

print('total words: {}'.format(len(words))) #還有這種輸出方法,學習一下
print('unique words: {}'.format(len(set(words))))

#對原文本進行vocab到int的轉換
int_words = [vocab_to_int[w] for w in words]


#--------------------------------------------------------------------采樣

t = 1e-5  #t值
threshold = 0.8  #删除機率門檻值

#統計單詞出現頻數
int_word_counts = Counter(int_words)
total_count = len(int_words)
#計算單詞頻率
word_freqs = {w: c / total_count for w, c in int_word_counts.items()}
#計算單詞被删除的機率
prob_drop = {w: 1 - np.sqrt(t / word_freqs[w]) for w in int_word_counts}
#對單詞進行采樣
train_words = [w for w in int_words if prob_drop[w] < threshold]
print(len(train_words))
#-----------------------------------------------------------------------采樣


#獲得input word的上下文單詞清單
def get_targets(words, idx, window_size = 5):
    '''
    獲得input word的上下文單詞清單
    :param words: 單詞清單
    :param idx: input word 的索引号
    :param window_size: 視窗大小
    '''
    target_window = np.random.randint(1, window_size + 1)  #從1到 window_size+1 之間的數,包括1,不包括最後一個
    #這裡要考慮input word前面單詞不夠的情況,但是為什麼沒有寫後面單詞不夠的情況呢,
    #因為python裡面的list取分片越界時會自動隻取到結尾的
    start_point = idx - target_window if (idx - target_window) > 0 else 0  #雖說不能寫三元表達式,但這個挺不錯的
    end_point = idx + target_window
    #output words(即視窗中的上下文單詞,不包含目标詞,隻是它的上下文的詞)
    targets = set(words[start_point: idx] + words[idx + 1: end_point + 1])

    return list(targets)


#構造一個擷取batch的生成器
def get_batches(words, batch_size, window_size = 5):
    '''
    構造一個擷取batch的生成器
    '''
    n_batches = len(words) // batch_size

    #僅取full batches
    words = words[: n_batches * batch_size]

    for idx in range(0, len(words), batch_size):  #range(start, stop[, step])
        x, y = [], []
        batch = words[idx: idx + batch_size]
        for i in range(len(batch)):
            batch_x = batch[i]
            batch_y = get_targets(batch, i, window_size) #從一個batch的第0位開始,一直往下滑,直到最後一個
            #由于一個input word會對應多個output word,是以需要長度統一
            x.extend([batch_x] * len(batch_y))
            y.extend(batch_y)
        yield x, y


#建構網絡,該部分主要包括:輸入層,Embedding,Negative Sampling
train_graph = tf.Graph()
with train_graph.as_default():
    inputs = tf.placeholder(tf.int32, shape=[None], name='inputs')
    labels = tf.placeholder(tf.int32, shape=[None, None], name='labels')  #一般是[batch_size, num_true],num_true一般為1

vocab_size = len(int_to_vocab)
embedding_size = 200 #嵌入次元

with train_graph.as_default():
    #嵌入層權重矩陣
    embedding = tf.Variable(tf.random_uniform([vocab_size, embedding_size], -1, 1)) #tf.random_uniform((4, 4), minval=low,maxval=high,dtype=tf.float32)))傳回4*4的矩陣
                                                                                    # ,産生于low和high之間,産生的值是均勻分布的。
    embed = tf.nn.embedding_lookup(embedding, inputs) #[None, embedding_size],一般是[batch_size, embedding_size]


#-----------------------------------------------------------負采樣(Negative Sampling)
n_sampled = 100

with train_graph.as_default():
    sotfmax_w = tf.Variable(tf.truncated_normal([vocab_size, embedding_size], stddev=0.1)) #[vocab_size, dim], dim就是embedding_size
    sotfmax_b = tf.Variable(tf.zeros(vocab_size)) #[vocab_size]

    #計算negative sampling下的損失
    #tf.nn.sampled_softmax_loss()進行了negative sampling,它主要用在分類的類别較大的情況
    loss = tf.nn.sampled_softmax_loss(sotfmax_w, sotfmax_b, labels, embed, n_sampled, vocab_size)
    cost = tf.reduce_mean(loss)
    optimizer = tf.train.AdamOptimizer().minimize(cost)
#-----------------------------------------------------------負采樣


#為了直覺看到訓練結果,我們将檢視訓練出的相近語義的詞
with train_graph.as_default():
    #随機挑選一些單詞
    valid_size = 16
    valid_window = 100
    #從不同位置各選8個詞
    valid_examples = np.array(random.sample(range(valid_window), valid_size // 2))  #random.sample(seq, n) 從序列seq中選擇n個随機且獨立的元素
    #np.append([1, 2, 3], [[4, 5, 6], [7, 8, 9]]),結果是array([1, 2, 3, 4, 5, 6, 7, 8, 9])
    valid_examples = np.append(valid_examples, random.sample(range(1000, 1000 + valid_window), valid_size // 2))

    valid_size = len(valid_examples)
    #驗證單詞集
    valid_dataset = tf.constant(valid_examples, dtype=tf.int32)

    #計算每個詞向量的模并進行機關化
    norm = tf.sqrt(tf.reduce_sum(tf.squeeze(embedding), 1, keep_dims=True))
    normalized_embedding = embedding / norm
    #查找驗證單詞的詞向量
    valid_embedding = tf.nn.embedding_lookup(normalized_embedding, valid_dataset)
    #計算餘弦相似度
    similarity = tf.matmul(valid_embedding, tf.transpose(normalized_embedding))


#進行訓練
epochs = 10 #疊代次數
batch_size = 1000 #batch大小
window_size = 10 #視窗大小

with train_graph.as_default():
    saver = tf.train.Saver() #檔案存儲

with tf.Session(graph=train_graph) as sess:
    iteration = 1
    loss = 0
    sess.run(tf.global_variables_initializer())

    for e in range(1, epochs + 1):
        batches = get_batches(train_words, batch_size, window_size)
        start = time.time()

        for x, y in batches:
            feed = {
                inputs : x,
                labels : np.array(y)[:, None]
            }
            train_loss, _ = sess.run([cost, optimizer], feed_dict=feed)

            loss += train_loss

            if iteration % 100 == 0:
                end = time.time()
                print('Epoch {}/{}'.format(e, epochs),
                      'Iteration: {}'.format(iteration),
                      'Avg. Training loss: {:.4f}'.format(loss / 100),  #這是一種數字格式化的方法,{:.4f}表示保留小數點後四位
                      '{:.4f} sec / batch'.format((end-start) / 100))
                loss = 0
                start = time.time()

            #計算相似的詞
            if iteration % 1000 == 0:
                #計算similarity
                sim = similarity.eval()
                for i in range(valid_size):
                    valid_word = int_to_vocab[valid_examples[i]]
                    top_k = 8 #取最相似單詞的前8個
                    nearest = (-sim[i, :]).argsort()[1: top_k + 1]
                    log = 'Nearest to [%s]:' % valid_word
                    for k in range(top_k):
                        close_word = int_to_vocab[nearest[k]]
                        log = '%s%s,' % (log, close_word)
                    print(log)

            iteration += 1

    save_path = saver.save(sess, 'model/text8.ckpt')
    embed_mat = sess.run(normalized_embedding)


#下面這部分代碼出錯了,暫時沒明白咋回事
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

viz_words = 500
tsne = TSNE()
embed_tsne = tsne.fit_transform(embed_mat[:viz_words, :])

fig, ax = plt.subplots(figsize=(14, 14))
for idx in range(viz_words):
    plt.scatter(*embed_tsne[idx, :], color='steelblue')
    plt.annotate(int_to_vocab[idx], (embed_tsne[idx, 0], embed_tsne[idx, 1]), alpha=0.7)