使用textCNN進行文本分類
介紹論文的主要參數和意義
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiInVGcq5CMxcDMwYDMlJWOwITO5kTNwE2M5QGM1MGN3IGOwIDO28CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpeg)
圖中,句子的長度是7,每個字的次元是5,我們可以通過nn.Embedding(vocab_num, 5)可以建構;其次圖中第二部分一共有6個矩陣,主要是分為3個塊(卷積核),在代碼中可建構一個類來表示;然後,得到卷積後的結果;接着通過最大池化層輸出最大值;最後,進行拼接,進行分類。(下文會介紹具體變化過程)
讀取資料和建構資料疊代器
讀取資料
資料儲存在txt檔案中,其格式如下:
中華女子學院:大學層次僅1專業招男生 3
兩天價網站背後重重迷霧:做個網站究竟要多少錢 4
東5環海棠公社230-290平2居準現房98折優惠 1
卡佩羅:告訴你德國腳生猛的原因 不希望英德戰踢點球 7
def read_data(train_test, num=None):
# num的意義在于可以選擇部分資料,進行切分
with open(os.path.join('..', 'data', train_test + '.txt'), 'r', encoding='utf-8') as f:
all_data = f.read().split('\n')
all_texts = []
all_labels = []
for data in all_data:
if data:
t, l = data.split('\t')
all_texts.append(t)
all_labels.append(l)
if num is None:
return all_texts, all_labels
else:
return all_texts[:num], all_labels[:num]
傳回所有的文本和标簽(在該資料集中,一共有10個類别)
建構word2index
def build_corpus(texts):
word_2_index = {'UNK': 0, 'PAD': 1}
for text in texts:
for word in text:
if word not in word_2_index:
word_2_index[word] = len(word_2_index)
return word_2_index, list(word_2_index)
建構資料疊代器
class TextDataset(Dataset):
def __init__(self, all_texts, all_labels, word_2_index, max_len, ):
self.all_texts = all_texts
self.all_labels = all_labels
self.word_2_index = word_2_index
self.max_len = max_len
def __getitem__(self, item):
text = self.all_texts[item][:self.max_len]
text_idx = [self.word_2_index.get(i, 0) for i in text]
text_idx = text_idx + [1] * (self.max_len - len(text))
label = int(self.all_labels[item])
return torch.tensor(text_idx), torch.tensor(label)
def __len__(self):
return len(self.all_texts)
建構TextCNN模型的卷積部分
1、輸入部分
self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
使用CNN時,文本類型的資料和圖像類型的資料。在建構字向量的時候,我們會産生一個二維的矩陣(seq_len,embedding_dim),但是nn.Conv2d中,我們需要人為的設定,in_channels=1,是以在後續資料的處理過程中,我們需要加一個次元1,使其形狀為(batch_size,1,max_len, embedding_dim)
如:
output = self.emb(batch_idx)
output = output.unsqueeze(dim=1)
2、卷積部分
self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
out_channel就是輸出的通道數,也是卷積核的個數,在該論文中,卷積核的個數是2(我們也可以自己進行參數的改變)
例如:本文中次元是:7*5,通過卷積之後,獲得2個(4*1,5*1,6*1)的矩陣,如何得來的?
第一個次元:4 = 7 - kernel_s + 1;5 = 7 - kernel_s + 1;6 = 7 - kernel_s + 1;
第二個次元:1 = 5 - embed_num + 1
是以,kernel_size=(kernel_s, embed_num)的第二個次元需要和詞向量次元相同,才會輸出最後結果為1維。
3、最大池化層(MaxPool1d)
MaxPool1d的輸入輸出,由下圖可以看出,MaxPool1d主要是改變最後一維的大小。
self.maxp = nn.MaxPool1d(kernel_size=(max_lens - kernel_s + 1))
這裡kernel_size是滑動視窗的大小
當卷積核大小為:4*5,得到 輸出為:4*1,此時, MaxPool1d(kernel_size=(max_lens - kernel_s + 1))----kernel_size=(7 - 4 + 1=4),也就是在4*1的矩陣中,劃出一個視窗為4的内容,從中選取最大值。
在代碼中,我們經過cnn卷積得到的次元是output.shape = torch.Size([1, 2, 6, 1])
但是,最大池化層我們需要2或者3個次元,是以,最後的1維去掉需要去掉
output1 = output.squeeze(3)
output2 = self.maxp(output1)
最後,我們需要将最終的輸出進行拼接,得到一個6*1的矩陣
在最大池化之後,次元變成===batch*2*1,因為需要拼接,是以,需要将池化層次元進行改變
output2 = self.maxp(output1)
return output2.squeeze(dim=-1) # 去掉1維的内容
4、cnn代碼
class Block(nn.Module):
def __init__(self, out_channel, max_lens, kernel_s, embed_num):
super(Block, self).__init__()
# 這裡out_channel是卷積核的個數
self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
self.act = nn.ReLU()
self.maxp = nn.MaxPool1d(kernel_size=(max_lens - kernel_s + 1))
def forward(self, emb):
# emb.shape = torch.Size([1, 7, 5]),我們需要加一個次元1,來達到輸入通道要求
output = self.cnn(emb)
# output.shape = torch.Size([1, 2, 6, 1])
output1 = self.act(output)
# 最大池化我們2-3個次元,是以,最後的1需要去掉
output1 = output1.squeeze(3)
output2 = self.maxp(output1)
return output2.squeeze(dim=-1)
建構TextCNN模型
1、完整代碼
class TextCnnModel(nn.Module):
def __init__(self, vocab_num, out_channel, max_lens, embed_num, class_num):
super(TextCnnModel, self).__init__()
self.emb = nn.Embedding(vocab_num, embed_num)
self.block1 = Block(out_channel, max_lens, 2, embed_num)
self.block2 = Block(out_channel, max_lens, 3, embed_num)
self.block3 = Block(out_channel, max_lens, 4, embed_num)
self.classifier = nn.Linear(3 * out_channel, class_num)
self.loss_fn = nn.CrossEntropyLoss()
def forward(self, batch_idx, batch_label=None):
output = self.emb(batch_idx)
output = output.unsqueeze(dim=1)
b1 = self.block1(output)
b2 = self.block2(output)
b3 = self.block3(output)
feature = torch.cat([b1, b2, b3], dim=1)
pre = self.classifier(feature)
if batch_label is not None:
loss = self.loss_fn(pre, batch_label)
return loss
else:
return torch.argmax(pre, dim=-1)
注意:
self.classifier = nn.Linear(3 * out_channel, class_num)
為什麼是 (3 * out_channel)?
先解釋3這個參數。是因為在論文中分别使用了三次卷積,在上面代碼部分(建構TextCNN模型)中有b1-3 = self.block1-3(output);如果你增加卷積塊,那麼就要改變這個參數!
再解釋out_channel這個參數。這個也就是你卷積核的個數,你有幾個卷積核,就會有幾個輸出。在文中,卷積核的個數是2,那麼每次輸出的結果就會有2個矩陣
最後,将三個卷積塊的結果拼接起來,就會得到 (3 * out_channel)!
是以,分類器的參數為nn.Linear(3 * out_channel, class_num)
class_num是分類的類别
完整代碼
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import os
# 讀取資料
def read_data(train_test, num=None):
with open(os.path.join('..', 'data', train_test + '.txt'), 'r', encoding='utf-8') as f:
all_data = f.read().split('\n')
all_texts = []
all_labels = []
for data in all_data:
if data:
t, l = data.split('\t')
all_texts.append(t)
all_labels.append(l)
if num is None:
return all_texts, all_labels
else:
return all_texts[:num], all_labels[:num]
# 建構詞編碼
def build_corpus(texts):
word_2_index = {'UNK': 0, 'PAD': 1}
for text in texts:
for word in text:
if word not in word_2_index:
word_2_index[word] = len(word_2_index)
return word_2_index, list(word_2_index)
# 建構資料類
class TextDataset(Dataset):
def __init__(self, all_texts, all_labels, word_2_index, max_len, ):
self.all_texts = all_texts
self.all_labels = all_labels
self.word_2_index = word_2_index
self.max_len = max_len
def __getitem__(self, item):
text = self.all_texts[item][:self.max_len]
text_idx = [self.word_2_index.get(i, 0) for i in text]
text_idx = text_idx + [1] * (self.max_len - len(text))
label = int(self.all_labels[item])
return torch.tensor(text_idx), torch.tensor(label)
def __len__(self):
return len(self.all_texts)
# 構模組化型
class Block(nn.Module):
def __init__(self, out_channel, max_lens, kernel_s, embed_num):
super(Block, self).__init__()
# 這裡out_channel是卷積核的個數
self.cnn = nn.Conv2d(1, out_channel, kernel_size=(kernel_s, embed_num))
self.act = nn.ReLU()
self.maxp = nn.MaxPool1d(kernel_size=(max_lens - kernel_s + 1))
def forward(self, emb):
# emb.shape = torch.Size([1, 7, 5]),我們需要加一個次元1,來達到輸入通道要求
output = self.cnn(emb)
# output.shape = torch.Size([1, 2, 6, 1])
output1 = self.act(output)
# 最大池化我們2-3個次元,是以,最後的1需要去掉
output1 = output1.squeeze(3)
output2 = self.maxp(output1)
return output2.squeeze(dim=-1)
class TextCnnModel(nn.Module):
def __init__(self, vocab_num, out_channel, max_lens, embed_num, class_num):
super(TextCnnModel, self).__init__()
self.emb = nn.Embedding(vocab_num, embed_num)
self.block1 = Block(out_channel, max_lens, 2, embed_num)
self.block2 = Block(out_channel, max_lens, 3, embed_num)
self.block3 = Block(out_channel, max_lens, 4, embed_num)
self.classifier = nn.Linear(3 * out_channel, class_num)
self.loss_fn = nn.CrossEntropyLoss()
def forward(self, batch_idx, batch_label=None):
output = self.emb(batch_idx)
output = output.unsqueeze(dim=1)
b1 = self.block1(output)
b2 = self.block2(output)
b3 = self.block3(output)
feature = torch.cat([b1, b2, b3], dim=1)
pre = self.classifier(feature)
if batch_label is not None:
loss = self.loss_fn(pre, batch_label)
return loss
else:
return torch.argmax(pre, dim=-1)
if __name__ == '__main__':
train_text, train_label = read_data('train')
dev_text, dev_label = read_data('dev')
word_2_index, _ = build_corpus(train_text)
batch_size = 32
max_len = 32
epochs = 10
out_channel = 2
embed_num = 50
lr = 2e-3
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_set = TextDataset(train_text, train_label, word_2_index, max_len)
train_loader = DataLoader(train_set, batch_size)
dev_set = TextDataset(dev_text, dev_label, word_2_index, max_len)
dev_loader = DataLoader(dev_set, batch_size)
model = TextCnnModel(len(word_2_index), out_channel, max_len, embed_num, len(set(train_label))).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr)
for e in range(epochs):
model.train()
for batch_idx, batch_label in tqdm(train_loader):
loss = model(batch_idx.to(device), batch_label.to(device))
loss.backward()
optimizer.step()
optimizer.zero_grad()
print(f'epoch:{e},loss={loss:.3f}')
model.eval()
right_num = 0
for batch_idx, batch_label in tqdm(dev_loader):
pre = model(batch_idx.to(device))
batch_label = batch_label.to(device)
right_num += torch.sum(pre==batch_label)
print(f'acc = {right_num/len(dev_text)*100:.3f}%')