天天看點

TextCNN文本分類(Pytorch實作)使用textCNN進行文本分類建構TextCNN模型完整代碼

使用textCNN進行文本分類

介紹論文的主要參數和意義

TextCNN文本分類(Pytorch實作)使用textCNN進行文本分類建構TextCNN模型完整代碼

圖中,句子的長度是7,每個字的次元是5,我們可以通過nn.Embedding(vocab_num, 5)可以建構;其次圖中第二部分一共有6個矩陣,主要是分為3個塊(卷積核),在代碼中可建構一個類來表示;然後,得到卷積後的結果;接着通過最大池化層輸出最大值;最後,進行拼接,進行分類。(下文會介紹具體變化過程)

讀取資料和建構資料疊代器

讀取資料

資料儲存在txt檔案中,其格式如下:

中華女子學院:大學層次僅1專業招男生  3
兩天價網站背後重重迷霧:做個網站究竟要多少錢 4
東5環海棠公社230-290平2居準現房98折優惠  1
卡佩羅:告訴你德國腳生猛的原因 不希望英德戰踢點球  7      
def read_data(train_test, num=None):
    # num的意義在于可以選擇部分資料,進行切分
    with open(os.path.join('..', 'data', train_test + '.txt'), 'r', encoding='utf-8') as f:
        all_data = f.read().split('\n')
    all_texts = []
    all_labels = []
    for data in all_data:

        if data:
            t, l = data.split('\t')
            all_texts.append(t)
            all_labels.append(l)
    if num is None:

        return all_texts, all_labels
    else:
        return all_texts[:num], all_labels[:num]
           

傳回所有的文本和标簽(在該資料集中,一共有10個類别) 

建構word2index

def build_corpus(texts):
    word_2_index = {'UNK': 0, 'PAD': 1}
    for text in texts:
        for word in text:
            if word not in word_2_index:
                word_2_index[word] = len(word_2_index)
    return word_2_index, list(word_2_index)
           

建構資料疊代器

class TextDataset(Dataset):
    def __init__(self, all_texts, all_labels, word_2_index, max_len, ):
        self.all_texts = all_texts
        self.all_labels = all_labels
        self.word_2_index = word_2_index
        self.max_len = max_len

    def __getitem__(self, item):
        text = self.all_texts[item][:self.max_len]
        text_idx = [self.word_2_index.get(i, 0) for i in text]
        text_idx = text_idx + [1] * (self.max_len - len(text))
        label = int(self.all_labels[item])
        return torch.tensor(text_idx), torch.tensor(label)

    def __len__(self):
        return len(self.all_texts)
           

建構TextCNN模型的卷積部分 

1、輸入部分

self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
           

 使用CNN時,文本類型的資料和圖像類型的資料。在建構字向量的時候,我們會産生一個二維的矩陣(seq_len,embedding_dim),但是nn.Conv2d中,我們需要人為的設定,in_channels=1,是以在後續資料的處理過程中,我們需要加一個次元1,使其形狀為(batch_size,1,max_len, embedding_dim)

如:

output = self.emb(batch_idx)
output = output.unsqueeze(dim=1)
           

2、卷積部分 

self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
           

out_channel就是輸出的通道數,也是卷積核的個數,在該論文中,卷積核的個數是2(我們也可以自己進行參數的改變)

例如:本文中次元是:7*5,通過卷積之後,獲得2個(4*1,5*1,6*1)的矩陣,如何得來的?
第一個次元:4 = 7 - kernel_s + 1;5 = 7 - kernel_s + 1;6 = 7 - kernel_s + 1;
第二個次元:1 = 5 - embed_num + 1
是以,kernel_size=(kernel_s, embed_num)的第二個次元需要和詞向量次元相同,才會輸出最後結果為1維。
      

3、最大池化層(MaxPool1d)

MaxPool1d的輸入輸出,由下圖可以看出,MaxPool1d主要是改變最後一維的大小。      
self.maxp = nn.MaxPool1d(kernel_size=(max_lens - kernel_s + 1))      

這裡kernel_size是滑動視窗的大小

當卷積核大小為:4*5,得到 輸出為:4*1,此時, MaxPool1d(kernel_size=(max_lens - kernel_s + 1))----kernel_size=(7 - 4 + 1=4),也就是在4*1的矩陣中,劃出一個視窗為4的内容,從中選取最大值。

在代碼中,我們經過cnn卷積得到的次元是output.shape = torch.Size([1, 2, 6, 1])

但是,最大池化層我們需要2或者3個次元,是以,最後的1維去掉需要去掉

output1 = output.squeeze(3)
output2 = self.maxp(output1)
      
TextCNN文本分類(Pytorch實作)使用textCNN進行文本分類建構TextCNN模型完整代碼

 最後,我們需要将最終的輸出進行拼接,得到一個6*1的矩陣

在最大池化之後,次元變成===batch*2*1,因為需要拼接,是以,需要将池化層次元進行改變

output2 = self.maxp(output1)
return output2.squeeze(dim=-1)  # 去掉1維的内容      

4、cnn代碼

class Block(nn.Module):
    def __init__(self, out_channel, max_lens, kernel_s, embed_num):
        super(Block, self).__init__()
        # 這裡out_channel是卷積核的個數
        self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
        self.act = nn.ReLU()
        self.maxp = nn.MaxPool1d(kernel_size=(max_lens - kernel_s + 1))

    def forward(self, emb):
        # emb.shape = torch.Size([1, 7, 5]),我們需要加一個次元1,來達到輸入通道要求
        output = self.cnn(emb)
        # output.shape = torch.Size([1, 2, 6, 1])
        output1 = self.act(output)
        # 最大池化我們2-3個次元,是以,最後的1需要去掉
        output1 = output1.squeeze(3)
        output2 = self.maxp(output1)
        return output2.squeeze(dim=-1)
           

建構TextCNN模型

1、完整代碼

class TextCnnModel(nn.Module):
    def __init__(self, vocab_num, out_channel, max_lens, embed_num, class_num):
        super(TextCnnModel, self).__init__()
        self.emb = nn.Embedding(vocab_num, embed_num)
        self.block1 = Block(out_channel, max_lens, 2, embed_num)
        self.block2 = Block(out_channel, max_lens, 3, embed_num)
        self.block3 = Block(out_channel, max_lens, 4, embed_num)

        self.classifier = nn.Linear(3 * out_channel, class_num)
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, batch_idx, batch_label=None):
        output = self.emb(batch_idx)
        output = output.unsqueeze(dim=1)
        b1 = self.block1(output)
        b2 = self.block2(output)
        b3 = self.block3(output)

        feature = torch.cat([b1, b2, b3], dim=1)

        pre = self.classifier(feature)

        if batch_label is not None:
            loss = self.loss_fn(pre, batch_label)
            return loss
        else:
            return torch.argmax(pre, dim=-1)
           

注意:

self.classifier = nn.Linear(3 * out_channel, class_num)

為什麼是    (3 * out_channel)?

先解釋3這個參數。是因為在論文中分别使用了三次卷積,在上面代碼部分(建構TextCNN模型)中有b1-3 = self.block1-3(output);如果你增加卷積塊,那麼就要改變這個參數!

再解釋out_channel這個參數。這個也就是你卷積核的個數,你有幾個卷積核,就會有幾個輸出。在文中,卷積核的個數是2,那麼每次輸出的結果就會有2個矩陣

最後,将三個卷積塊的結果拼接起來,就會得到 (3 * out_channel)!

是以,分類器的參數為nn.Linear(3 * out_channel, class_num)

class_num是分類的類别

完整代碼

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import os


# 讀取資料
def read_data(train_test, num=None):
    with open(os.path.join('..', 'data', train_test + '.txt'), 'r', encoding='utf-8') as f:
        all_data = f.read().split('\n')
    all_texts = []
    all_labels = []
    for data in all_data:

        if data:
            t, l = data.split('\t')
            all_texts.append(t)
            all_labels.append(l)
    if num is None:

        return all_texts, all_labels
    else:
        return all_texts[:num], all_labels[:num]


# 建構詞編碼
def build_corpus(texts):
    word_2_index = {'UNK': 0, 'PAD': 1}
    for text in texts:
        for word in text:
            if word not in word_2_index:
                word_2_index[word] = len(word_2_index)
    return word_2_index, list(word_2_index)


# 建構資料類
class TextDataset(Dataset):
    def __init__(self, all_texts, all_labels, word_2_index, max_len, ):
        self.all_texts = all_texts
        self.all_labels = all_labels
        self.word_2_index = word_2_index
        self.max_len = max_len

    def __getitem__(self, item):
        text = self.all_texts[item][:self.max_len]
        text_idx = [self.word_2_index.get(i, 0) for i in text]
        text_idx = text_idx + [1] * (self.max_len - len(text))
        label = int(self.all_labels[item])
        return torch.tensor(text_idx), torch.tensor(label)

    def __len__(self):
        return len(self.all_texts)


# 構模組化型
class Block(nn.Module):
    def __init__(self, out_channel, max_lens, kernel_s, embed_num):
        super(Block, self).__init__()
        # 這裡out_channel是卷積核的個數
        self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
        self.act = nn.ReLU()
        self.maxp = nn.MaxPool1d(kernel_size=(max_lens - kernel_s + 1))

    def forward(self, emb):
        # emb.shape = torch.Size([1, 7, 5]),我們需要加一個次元1,來達到輸入通道要求
        output = self.cnn(emb)
        # output.shape = torch.Size([1, 2, 6, 1])
        output1 = self.act(output)
        # 最大池化我們2-3個次元,是以,最後的1需要去掉
        output1 = output1.squeeze(3)
        output2 = self.maxp(output1)
        return output2.squeeze(dim=-1)


class TextCnnModel(nn.Module):
    def __init__(self, vocab_num, out_channel, max_lens, embed_num, class_num):
        super(TextCnnModel, self).__init__()
        self.emb = nn.Embedding(vocab_num, embed_num)
        self.block1 = Block(out_channel, max_lens, 2, embed_num)
        self.block2 = Block(out_channel, max_lens, 3, embed_num)
        self.block3 = Block(out_channel, max_lens, 4, embed_num)

        self.classifier = nn.Linear(3 * out_channel, class_num)
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, batch_idx, batch_label=None):
        output = self.emb(batch_idx)
        output = output.unsqueeze(dim=1)
        b1 = self.block1(output)
        b2 = self.block2(output)
        b3 = self.block3(output)

        feature = torch.cat([b1, b2, b3], dim=1)

        pre = self.classifier(feature)

        if batch_label is not None:
            loss = self.loss_fn(pre, batch_label)
            return loss
        else:
            return torch.argmax(pre, dim=-1)


if __name__ == '__main__':
    train_text, train_label = read_data('train')
    dev_text, dev_label = read_data('dev')
    word_2_index, _ = build_corpus(train_text)

    batch_size = 32
    max_len = 32
    epochs = 10
    out_channel = 2
    embed_num = 50
    lr = 2e-3

    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    train_set = TextDataset(train_text, train_label, word_2_index, max_len)
    train_loader = DataLoader(train_set, batch_size)

    dev_set = TextDataset(dev_text, dev_label, word_2_index, max_len)
    dev_loader = DataLoader(dev_set, batch_size)

    model = TextCnnModel(len(word_2_index), out_channel, max_len, embed_num, len(set(train_label))).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr)

    for e in range(epochs):
        model.train()
        for batch_idx, batch_label in tqdm(train_loader):
            loss = model(batch_idx.to(device), batch_label.to(device))
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        print(f'epoch:{e},loss={loss:.3f}')

        model.eval()
        right_num = 0
        for batch_idx, batch_label in tqdm(dev_loader):
            pre = model(batch_idx.to(device))
            batch_label = batch_label.to(device)
            right_num += torch.sum(pre==batch_label)
        print(f'acc = {right_num/len(dev_text)*100:.3f}%')


           

繼續閱讀