天天看點

Pytorch實作Bert模型

Pytorch實作Bert模型

一、資料預處理

處理的文本是自定義的對話文本,模仿R和J兩人對話

‘Hello, how are you? I am Romeo.\n’ # R

‘Hello, Romeo My name is Juliet. Nice to meet you.\n’ # J

‘Nice meet you too. How are you today?\n’ # R

‘Great. My baseball team won the competition.\n’ # J

‘Oh Congratulations, Juliet\n’ # R

‘Thank you Romeo\n’ # J

‘Where are you going today?\n’ # R

‘I am going shopping. What about you?\n’ # J

‘I am going to visit my grandmother. she is not very well’ # R

import re
import math
import torch
import numpy as np
from random import *
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data

# 自定義的對話文本
text = (
    'Hello, how are you? I am Romeo.\n' # R
    'Hello, Romeo My name is Juliet. Nice to meet you.\n' # J
    'Nice meet you too. How are you today?\n' # R
    'Great. My baseball team won the competition.\n' # J
    'Oh Congratulations, Juliet\n' # R
    'Thank you Romeo\n' # J
    'Where are you going today?\n' # R
    'I am going shopping. What about you?\n' # J
    'I am going to visit my grandmother. she is not very well' # R
)
# 采用正規表達式去除标點,以及轉成小寫
sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n') # filter '.', ',', '?', '!'
# 将預處理後的詞語(token)放在word_list
word_list = list(set(" ".join(sentences).split())) # ['hello', 'how', 'are', 'you',...]
# 将每一個token映射為索引 特殊編碼 [PAD]:一句話的長度不夠,後面添加0
word2idx = {'[PAD]' : 0, '[CLS]' : 1, '[SEP]' : 2, '[MASK]' : 3}
for i, w in enumerate(word_list): # 其他的編碼通過for循環添加 從4開始,0-3已經使用了
    word2idx[w] = i + 4
# print(word2idx)
# 實作索引到token的轉換
idx2word = {i: w for i, w in enumerate(word2idx)}
print(idx2word)
vocab_size = len(word2idx)
# print(vocab_size) # 沒有重複詞
token_list = list()
for sentence in sentences:
    arr = [word2idx[s] for s in sentence.split()]
    token_list.append(arr)
           
Pytorch實作Bert模型
# 9行文本對應9個清單,每個清單中的元素個數對應于每行文本的token數
print(token_list)
           
Pytorch實作Bert模型

二、定義Bert模型的參數

# BERT Parameters
maxlen = 30 # 每一個樣本也就是每一個句子的最大長度為30,超過30要切除,少于30 補PAD
batch_size = 6
max_pred = 5 # max tokens of prediction 最多5個token被mask,雖然按照比例需要15%的token需要被mask,但是太多了設定一個上限
n_layers = 6 # encoder的層數
n_heads = 12 # multi-head的個數
d_model = 768 #word embedding 、positional embedding 、segment embedding都是768相同的次元
d_ff = 768*4 # 4*d_model, FeedForward dimension 在全連接配接神經網絡中提升的次元 768*4 = 3072
d_k = d_v = 64  # dimension of K(=Q), V 
n_segments = 2 # 一個batch由2句話構成
           

三、預處理部分

拼接特殊标簽,以及完成兩個任務:語言模型任務以及NSP任務

# sample IsNext and NotNext to be same in small batch size
def make_data():
    batch = [] 
    positive = negative = 0 # positive若樣本中兩條樣本相鄰 加 1 ; 不相鄰則negative 加 1 但最終需要保證positive 與negative比例相等
    while positive != batch_size/2 or negative != batch_size/2:
        # tokens_a_index:第一個文本的索引 tokens_b_index:第二個文本的索引 
        # randrange(0-8) 随機抽取兩個索引,判斷tokens_a_index + 1 是否等于 tokens_b_index 得到兩個文本是否相鄰
        tokens_a_index, tokens_b_index = randrange(len(sentences)), randrange(len(sentences)) # sample random index in sentences
        # 通過文本的索引tokens_a_index,擷取文本中的每個token的索引放到tokens_a中;tokens_b_index同理
        tokens_a, tokens_b = token_list[tokens_a_index], token_list[tokens_b_index]
        # 在token前後拼接[CLS] 與 [SEP]
        input_ids = [word2idx['[CLS]']] + tokens_a + [word2idx['[SEP]']] + tokens_b + [word2idx['[SEP]']]
        # segment_ids 表示模型中的segment embedding,前一句全為0,個數是【 1 + len(tokens_a) + 1 】個 ,前後兩個1表示特殊辨別的1;後一句全0
        segment_ids = [0] * (1 + len(tokens_a) + 1) + [1] * (len(tokens_b) + 1)

        # MASK LM
        # 先取整個句子長度的15%做mask,但需要注意的是有時整個句子長度太短,比如6個token,6*0.15小于1,此時需要和1進行比較取最大值
        # 但有時句子過長,我們設定的界限是mask不超過5個,是以要和max_pred取最小值
        n_pred =  min(max_pred, max(1, int(len(input_ids) * 0.15))) # 15 % of tokens in one sentence
        # cand_maked_pos:候選mask标記,由于特殊标記[CLS]和[SEP]做mask是無意義的,是以需要排除
        cand_maked_pos = [i for i, token in enumerate(input_ids)
                          # 隻要不是[CLS]和[SEP] 就可以将索引存入cand_maked_pos
                          if token != word2idx['[CLS]'] and token != word2idx['[SEP]']] # candidate masked position
        shuffle(cand_maked_pos) # 由于是随機mask,将cand_maked_pos清單中的元素随機打亂
        masked_tokens, masked_pos = [], []
        for pos in cand_maked_pos[:n_pred]: # 取亂序的索引cand_maked_pos前n_pred 做mask
            masked_pos.append(pos) # masked_pos:mask标記對應的索引
            masked_tokens.append(input_ids[pos]) # masked_tokens:mask标記對應的原來的token值
            # Bert模型中mask标記有80%被替換為真正的mask,10% 被随機替換為詞表中的任意詞,10%不變
            if random() < 0.8:  # 80% 的 機率 被替換為真正的mask
                input_ids[pos] = word2idx['[MASK]'] # make mask
            elif random() > 0.9:  # 10% 的機率被随機替換為詞表中的任意詞
                index = randint(0, vocab_size - 1) # random index in vocabulary 從詞表中随機選擇一個詞的索引 可以是本身
                while index < 4: # can't involve 'CLS', 'SEP', 'PAD' 但不能是特殊标記,也就是說索引要大于4
                  index = randint(0, vocab_size - 1) # 索引小于4需要重新擷取一個随機數
                input_ids[pos] = index # replace 用随機的詞替換該位置的token

        # Zero Paddings
        # 對長度不足maxlen30的文本 補 PAD
        n_pad = maxlen - len(input_ids) # 30 - 文本長度 = 需要補 0 的個數
        input_ids.extend([0] * n_pad) # 不足30的位置token補0
        segment_ids.extend([0] * n_pad) # segment embedding 補 0 

        # Zero Padding (100% - 15%) tokens
        if max_pred > n_pred: 
            n_pad = max_pred - n_pred
            masked_tokens.extend([0] * n_pad) # 保證masked_tokens和masked_pos長度始終為max_pred(5)
            masked_pos.extend([0] * n_pad)

        # 判斷兩個文本是否相鄰  tokens_a_index + 1 是否等于 tokens_b_index  ;前提是positive 和negative比例相等
        if tokens_a_index + 1 == tokens_b_index and positive < batch_size/2:
            batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True]) # IsNext
            positive += 1
        elif tokens_a_index + 1 != tokens_b_index and negative < batch_size/2:
            batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False]) # NotNext
            negative += 1
    print(batch)
    return batch
# Proprecessing Finished 資料預處理結束

batch = make_data()
input_ids, segment_ids, masked_tokens, masked_pos, isNext = zip(*batch)
input_ids, segment_ids, masked_tokens, masked_pos, isNext = \
    torch.LongTensor(input_ids),  torch.LongTensor(segment_ids), torch.LongTensor(masked_tokens),\
    torch.LongTensor(masked_pos), torch.LongTensor(isNext)

class MyDataSet(Data.Dataset):
  def __init__(self, input_ids, segment_ids, masked_tokens, masked_pos, isNext):
    self.input_ids = input_ids
    self.segment_ids = segment_ids
    self.masked_tokens = masked_tokens
    self.masked_pos = masked_pos
    self.isNext = isNext
  
  def __len__(self):
    return len(self.input_ids)
  
  def __getitem__(self, idx):
    return self.input_ids[idx], self.segment_ids[idx], self.masked_tokens[idx], self.masked_pos[idx], self.isNext[idx]

loader = Data.DataLoader(MyDataSet(input_ids, segment_ids, masked_tokens, masked_pos, isNext), batch_size, True)
           
Pytorch實作Bert模型

四、Bert模型建構

def get_attn_pad_mask(seq_q, seq_k):
    batch_size, seq_len = seq_q.size()
    # eq(zero) is PAD token
    pad_attn_mask = seq_q.data.eq(0).unsqueeze(1)  # [batch_size, 1, seq_len]
    return pad_attn_mask.expand(batch_size, seq_len, seq_len)  # [batch_size, seq_len, seq_len]

# bert論文中提出的新激活函數gelu()
def gelu(x):
    """
      Implementation of the gelu activation function.
      For information: OpenAI GPT's gelu is slightly different (and gives slightly different results):
      0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))
      Also see https://arxiv.org/abs/1606.08415
    """
    return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))

class Embedding(nn.Module):
    def __init__(self):
        super(Embedding, self).__init__()
        self.tok_embed = nn.Embedding(vocab_size, d_model)  # token embedding
        self.pos_embed = nn.Embedding(maxlen, d_model)  # position embedding
        self.seg_embed = nn.Embedding(n_segments, d_model)  # segment(token type) embedding
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, seg):
        seq_len = x.size(1)
        pos = torch.arange(seq_len, dtype=torch.long)
        pos = pos.unsqueeze(0).expand_as(x)  # [seq_len] -> [batch_size, seq_len]
        embedding = self.tok_embed(x) + self.pos_embed(pos) + self.seg_embed(seg) # 最終的embedding為三者相加 三者次元都是相等的
        return self.norm(embedding)

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()
    # 通過 Q 和 K 計算出 scores,然後将 scores 和 V 相乘,得到每個單詞的 context vector
    # Q: [batch_size, seq_len, d_model], K: [batch_size, seq_len, d_model], V: [batch_size, seq_len, d_model]
    def forward(self, Q, K, V, attn_mask):
        # transpose(-1, -2) 相當于将K矩陣轉置
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, seq_len, seq_len]
        # 相乘之後得到的 scores 還不能立刻進行 softmax,需要和 attn_mask 相加,把一些需要屏蔽的資訊屏蔽掉,填充為-1e9 也就是負無窮
        # attn_mask 是一個僅由 True 和 False 組成的 tensor,并且一定會保證 attn_mask 和 scores 的次元四個值相同(不然無法做對應位置相加)
        scores.masked_fill_(attn_mask, -1e9) # Fills elements of self tensor with value where mask is one.
        attn = nn.Softmax(dim=-1)(scores)
        context = torch.matmul(attn, V) # Softmax後的值與 V 相乘
        return context

class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        # 三個矩陣
        self.W_Q = nn.Linear(d_model, d_k * n_heads)
        self.W_K = nn.Linear(d_model, d_k * n_heads)
        self.W_V = nn.Linear(d_model, d_v * n_heads)
    def forward(self, Q, K, V, attn_mask):
        # Q:第一個enc_inputs,K:第二個enc_inputs,V:第三個enc_inputs
        # q: [batch_size, seq_len, d_model], k: [batch_size, seq_len, d_model], v: [batch_size, seq_len, d_model]
        residual, batch_size = Q, Q.size(0)
        # (B, S, D) -proj(線性變換)-> (B, S, D) -split-> (B, S, H, W) -trans(H,S進行轉置)-> (B, H, S, W)
        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)  # q_s: [batch_size, n_heads, seq_len, d_k]
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)  # k_s: [batch_size, n_heads, seq_len, d_k]
        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)  # v_s: [batch_size, n_heads, seq_len, d_v]

        # 将attn_mask三維拓展為四維才能和 Q K V矩陣相乘
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask : [batch_size, n_heads, seq_len, seq_len]

        # context: [batch_size, n_heads, seq_len, d_v], attn: [batch_size, n_heads, seq_len, seq_len]
        context = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
        # context.transpose(1, 2) :[batch_size, seq_len, n_heads, d_v]
        # contiguous() : 把tensor變成在記憶體中連續分布的形式  四維->三維
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size, seq_len, n_heads * d_v]
        output = nn.Linear(n_heads * d_v, d_model)(context)# 将次元變為d_model
        return nn.LayerNorm(d_model)(output + residual) # output: [batch_size, seq_len, d_model] 殘差連接配接 後做正交化

class PoswiseFeedForwardNet(nn.Module):
    def __init__(self):
        super(PoswiseFeedForwardNet, self).__init__()
        # 次元轉換 d_model<-> d_ff
        self.fc1 = nn.Linear(d_model, d_ff) 
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        # (batch_size, seq_len, d_model) -> (batch_size, seq_len, d_ff) -> (batch_size, seq_len, d_model)
        return self.fc2(gelu(self.fc1(x))) # 激活函數

class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.enc_self_attn = MultiHeadAttention() # 多頭注意力機制
        self.pos_ffn = PoswiseFeedForwardNet() # 前饋神經網絡

    def forward(self, enc_inputs, enc_self_attn_mask):
        # self-attention 通過MultiHeadAttention實作 傳入三個enc_inputs作用是分别于W(Q,K,V)相乘生成 Q,K,V矩陣
        enc_outputs = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) # enc_inputs to same Q,K,V
        # pos_ffn 特征提取 通過PoswiseFeedForwardNet實作
        enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size, seq_len, d_model]
        return enc_outputs

class BERT(nn.Module):
    def __init__(self):
        super(BERT, self).__init__()
        self.embedding = Embedding() # 建構詞向量矩陣
        # 使用ModuleList對6個encoder進行堆疊
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])  # n_layers = 6 # encoder的層數 
        self.fc = nn.Sequential(
            nn.Linear(d_model, d_model), # 添加全連接配接層,次元沒有發生變化 cls二分類任務
            nn.Dropout(0.5),
            nn.Tanh(), # 激活函數 Tanh()
        )
        self.classifier = nn.Linear(d_model, 2) # cls最後的輸出,從d_model-> 2維,也就是二分類任務
        self.linear = nn.Linear(d_model, d_model) # 添加全連接配接層,次元沒有發生變化 mlm任務
        self.activ2 = gelu # 激活函數2 gelu()
        
        # fc2 is shared with embedding layer
        embed_weight = self.embedding.tok_embed.weight
        self.fc2 = nn.Linear(d_model, vocab_size, bias=False) # d_model-> vocab_size 解碼
        self.fc2.weight = embed_weight

    # encoder的輸入包括三個部分 
    def forward(self, input_ids, segment_ids, masked_pos): 
        # 生成input_ids對應的embedding 以及 segment_ids對應的embedding
        output = self.embedding(input_ids, segment_ids) # [bach_size, seq_len, d_model]
        enc_self_attn_mask = get_attn_pad_mask(input_ids, input_ids) # [batch_size, maxlen, maxlen]
        for layer in self.layers:
            # output: [batch_size, max_len, d_model]
            output = layer(output, enc_self_attn_mask)
        # it will be decided by first token(CLS) 取第一個cls ,fc裡面包括了前饋神經網絡層 以及激活函數 
        h_pooled = self.fc(output[:, 0]) # [batch_size, d_model]
        logits_clsf = self.classifier(h_pooled) # [batch_size, 2] predict isNext 在最後過二分類得到預測輸出

        # 例如masked_pos = [6,5,17,0,0] 也就是說第6,5,17個被mask
        masked_pos = masked_pos[:, :, None].expand(-1, -1, d_model) # [batch_size, max_pred, d_model]
        # 根據masked_pos利用gather()在output中取出第6,5,17個token 
        # 将token 與masked_pos進行對齊(因為一開始token是按照[0,1,2...]排列的) 要将token轉變成[6,5,17,...]才能對齊
        h_masked = torch.gather(output, 1, masked_pos) # masking position [batch_size, max_pred, d_model]
        # 讓h_masked(帶mask的token)過全連接配接層 ,再過激活函數2 
        h_masked = self.activ2(self.linear(h_masked)) # [batch_size, max_pred, d_model]
        # 再解碼到詞表大小 d_model-> vocab_size
        logits_lm = self.fc2(h_masked) # [batch_size, max_pred, vocab_size]
        return logits_lm, logits_clsf # logits_lm計算mask對應的詞 logits_clsf計算前後兩個句子是否連續
model = BERT()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adadelta(model.parameters(), lr=0.001)
           

五、torch.gather()函數實作的功能

out = torch.gather(input, dim, index)
# out[i][j][k] = input[index[i][j][k]][j][k] # dim = 0
# out[i][j][k] = input[i][index[i][j][k]][k] # dim = 1
# out[i][j][k] = input[i][j][index[i][j][k]] # dim = 2
           

輸入的資料以三維為例:

當dim = 0 時,輸出的第一維被替換為index[i][j][k],第二維和第三維不變;

當dim = 1 時,輸出的第二維被替換為index[i][j][k],第一維和第三維不變;

當dim = 2 時,輸出的第三維被替換為index[i][j][k],第一維和第二維不變。

以具體向量為例:

index = torch.from_numpy(np.array([[1, 2, 0], [2, 0, 1]])).type(torch.LongTensor)
index = index[:, :, None].expand(-1, -1, 10)
print(index)
           

首先定義兩個tensor:[1,2,0]、[2,0,1],将其拓展為三維,每一維長度為10

Pytorch實作Bert模型

接着随機生成一個 [2, 3, 10] 維的 tensor,可以了解為有 2 個 batch,每個 batch 有 3 句話,每句話由 10 個詞構成,隻不過這裡的詞不是以正整數(索引)的形式出現,而是連續的數值。

input = torch.rand(2, 3, 10)
print(input)
           
Pytorch實作Bert模型

調用 torch.gather(input, 1, index) 函數

Pytorch實作Bert模型

index 中第一行的 tensor 會作用于 input 的第一個 batch,具體來說,原本三句話的順序是 [0, 1, 2],現在會根據 [1, 2, 0] 調換順序。index 中第 2 行的 tensor 會作用于 input 的第二個 batch,具體來說,原本三句話的順序是 [0, 1, 2],現在會根據 [2, 0, 1] 調換順序。

六、transpose()函數作用

實作兩個矩陣轉置

index = torch.from_numpy(np.array([[1, 2, 0], [2, 0, 1]])).type(torch.LongTensor)
print(index)
print(index.transpose(-1, -2))
           
Pytorch實作Bert模型

七、訓練 & 計算損失

for epoch in range(180):
    for input_ids, segment_ids, masked_tokens, masked_pos, isNext in loader:
      logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)
      loss_lm = criterion(logits_lm.view(-1, vocab_size), masked_tokens.view(-1)) # for masked LM
      loss_lm = (loss_lm.float()).mean()
      loss_clsf = criterion(logits_clsf, isNext) # for sentence classification
      loss = loss_lm + loss_clsf # 損失為兩者之和
      if (epoch + 1) % 10 == 0:
          print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
           
Pytorch實作Bert模型

八、測試

# Predict mask tokens ans isNext
input_ids, segment_ids, masked_tokens, masked_pos, isNext = batch[1]
print(text)
print([idx2word[w] for w in input_ids if idx2word[w] != '[PAD]'])

logits_lm, logits_clsf = model(torch.LongTensor([input_ids]), \
                 torch.LongTensor([segment_ids]), torch.LongTensor([masked_pos]))
logits_lm = logits_lm.data.max(2)[1][0].data.numpy()
print('masked tokens list : ',[pos for pos in masked_tokens if pos != 0])
print('predict masked tokens list : ',[pos for pos in logits_lm if pos != 0])

logits_clsf = logits_clsf.data.max(1)[1].data.numpy()[0]
print('isNext : ', True if isNext else False)
print('predict isNext : ',True if logits_clsf else False)
           
Pytorch實作Bert模型