本文包括一下流程:
1.下載下傳資料集,這次是使用的斯坦福提供的資料集[IMDB]
2.資料集處理
3.建構網絡模型
4.訓練和測試
一.資料集處理
首先加載資料集.我的項目路徑如下圖:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiIXZ05WZj91YpB3In5GcuIzN1MTNxIjM1ETMxAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
# 讀取文本資料,data形式:[[['hello','word'],标簽]....]
def load_data(path, flag='train'):
labels = ['pos', 'neg']
data = []
for label in labels:
files = os.listdir(os.path.join(path, flag, label))
# 去除标點符号
r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
for file in files:
with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
temp = rf.read().replace('\n', '')
temp = temp.replace('<br /><br />', ' ')
temp = re.sub(r, '', temp)
temp = temp.split(' ')
temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
if label == 'pos':
data.append([temp, 1])
elif label == 'neg':
data.append([temp, 0])
return data
train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')
得到資料集後,需要統計詞頻和建構詞典
def fit(sentence):
"""
統計詞頻
:param sentence:
:return:
"""
for word in sentence:
# 字典(Dictionary) get(key,default=None) 函數傳回指定鍵的值,如果值不在字典中傳回預設值。
word_count[word] = word_count.get(word, 0) + 1
def build_vocab(min_count=5, max_count=1000, max_features=25000):
"""
:param min_count: 最小詞頻
:param max_count: 最大詞頻
:param max_features: 最大詞語數
:return:
"""
global word_count
global word_idx
word_count = {word: count for word, count in word_count.items() if count > min_count}
if max_count is not None:
word_count = {word: count for word, count in word_count.items() if count <= max_count}
if max_features is not None:
# 排序
word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])
for word in word_count:
# 将單詞對應自己的id
word_idx[word] = len(word_idx) # 每次word對應一個序号
然後需要将資料集中的句子有單詞形式轉換成數字形式.
def transform(sentence, max_len=200):
"""
把句子轉換為數字序列
:param sentence:
:param max_len: 句子的最大長度
:return:
"""
if len(sentence) > max_len:
# 句子太長時進行截斷
sentence = sentence[:max_len]
else:
# 句子長度不夠标準長度時,進行填充
sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
# 句子中的單詞沒有出現過再詞典中的設定為數字1
return [word_idx.get(word, UNK) for word in sentence]
利用上面三個函數,我們能夠得到可以在模型中使用的資料集.
# 生成數字形式的資料集
def make_data(train_Datas, test_Datas):
train_inputs = []
train_labels = []
test_inputs = []
test_labels = []
for Datas in [train_Datas, test_Datas]:
for data in Datas:
fit(data[0])
build_vocab() # 給單詞編号
# 得到轉換為數字序列的inputs,labels資料
# 得到訓練資料集
for data in train_Datas:
train_inputs.append(transform(data[0]))
train_labels.append(data[1])
for data in test_Datas:
test_inputs.append(transform(data[0]))
test_labels.append(data[1])
return train_inputs, train_labels, test_inputs, test_labels
之後将資料集轉換成torch.LongTensor格式,并将資料集加載成批處理.
train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)
train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)
test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)
# 加載訓練資料集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)
# 加載測試資料集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)
完整代碼如下:
import os
import torch
from torch import optim
from torch.nn import RNN, LSTM, LSTMCell
import numpy as np
import torch.nn.functional as F
import re
import torch.nn as nn
import random
import torch.utils.data as Data
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
UNK_TAG = "<UNK>" # 未知單詞指派
PAD_TAG = "<PAD>" # 句子長度不夠補值
UNK = 1
PAD = 0
word_idx = {PAD_TAG: PAD, UNK_TAG: UNK} # 詞典
word_count = {} # 統計詞頻
def fit(sentence):
"""
統計詞頻
:param sentence:
:return:
"""
for word in sentence:
# 字典(Dictionary) get(key,default=None) 函數傳回指定鍵的值,如果值不在字典中傳回預設值。
word_count[word] = word_count.get(word, 0) + 1
def build_vocab(min_count=5, max_count=1000, max_features=25000):
"""
:param min_count: 最小詞頻
:param max_count: 最大詞頻
:param max_features: 最大詞語數
:return:
"""
global word_count
global word_idx
word_count = {word: count for word, count in word_count.items() if count > min_count}
if max_count is not None:
word_count = {word: count for word, count in word_count.items() if count <= max_count}
if max_features is not None:
# 排序
word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])
for word in word_count:
# 将單詞對應自己的id
word_idx[word] = len(word_idx) # 每次word對應一個序号
def transform(sentence, max_len=200):
"""
把句子轉換為數字序列
:param sentence:
:param max_len: 句子的最大長度
:return:
"""
if len(sentence) > max_len:
# 句子太長時進行截斷
sentence = sentence[:max_len]
else:
# 句子長度不夠标準長度時,進行填充
sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
# 句子中的單詞沒有出現過再詞典中的設定為數字1
return [word_idx.get(word, UNK) for word in sentence]
# 生成數字形式的資料集
def make_data(train_Datas, test_Datas):
train_inputs = []
train_labels = []
test_inputs = []
test_labels = []
for Datas in [train_Datas, test_Datas]:
for data in Datas:
fit(data[0])
build_vocab() # 給單詞編号
# 得到轉換為數字序列的inputs,labels資料
# 得到訓練資料集
for data in train_Datas:
train_inputs.append(transform(data[0]))
train_labels.append(data[1])
for data in test_Datas:
test_inputs.append(transform(data[0]))
test_labels.append(data[1])
return train_inputs, train_labels, test_inputs, test_labels
# 讀取文本資料,data形式:[[['hello','word'],标簽]....]
def load_data(path, flag='train'):
labels = ['pos', 'neg']
data = []
for label in labels:
files = os.listdir(os.path.join(path, flag, label))
# 去除标點符号
r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
for file in files:
with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
temp = rf.read().replace('\n', '')
temp = temp.replace('<br /><br />', ' ')
temp = re.sub(r, '', temp)
temp = temp.split(' ')
temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
if label == 'pos':
data.append([temp, 1])
elif label == 'neg':
data.append([temp, 0])
return data
train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')
train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)
train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)
test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)
# 加載訓練資料集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)
# 加載測試資料集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)
vocab_size = len(word_idx) # 詞典數量
dmodel = 512 # embedding層詞向量
num_filter = 100 # 卷積核個數
filter_size = [2, 3, 4] # 卷積核的長,取了三種
output_dim = 2 # 種類
# 這個模型也可以使用
# class MyModel(nn.Module):
# def __init__(self):
# super(MyModel, self).__init__()
#
# self.w = nn.Embedding(vocab_size,dmodel)
# self.dropout = nn.Dropout(0.05)
# self.fc2 = nn.Linear(dmodel, 2)
#
# def forward(self,x):
# embedded = self.dropout(self.w(x)) # [batch_size,seq_len,embeding_size]
# # [batch_size, embedding_dim]把單詞長度的次元壓扁為1,并降維
# # embedded 為input_size,(embedded.shape[1], 1)) 為kernel_size
# # squeeze(1)表示删除索引為1的那個次元
# pooled = F.avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1)
# return self.fc2(pooled)
# 這個是使用卷積神經網絡實作
class MyModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, num_filter,
filter_sizes, output_dim, dropout=0.2, pad_idx=0):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.convs = nn.ModuleList([
nn.Conv2d(in_channels=1, out_channels=num_filter,
kernel_size=(fs, embedding_dim))
for fs in filter_sizes
])
# in_channels:輸入的channel,文字都是1
# out_channels:輸出的channel次元
# fs:每次滑動視窗計算用到幾個單詞,相當于n-gram中的n
# for fs in filter_sizes用好幾個卷積模型最後concate起來看效果。
self.fc = nn.Linear(len(filter_sizes) * num_filter, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
embedded = self.dropout(self.embedding(text)) # [batch size, sent len, emb dim]
embedded = embedded.unsqueeze(1) # [batch size, 1, sent len, emb dim]
# 升維是為了和nn.Conv2d的輸入次元吻合,把channel列升維。
conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
# conved = [batch size, num_filter, sent len - filter_sizes+1]
# 有幾個filter_sizes就有幾個conved
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved] # [batch,num_filter]
cat = self.dropout(torch.cat(pooled, dim=1))
# cat = [batch size, num_filter * len(filter_sizes)]
# 把 len(filter_sizes)個卷積模型concate起來傳到全連接配接層。
return self.fc(cat)
model = MyModel(vocab_size, dmodel, num_filter=num_filter, filter_sizes=filter_size, output_dim=output_dim).to(device)
optimizer = optim.Adam(model.parameters(), 0.01)
criterion = nn.CrossEntropyLoss().to(device)
# Training
def train():
model.train()
acc_M = 0.0
for epoch in range(20):
correct = 0.0
num = 0.0
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
pred = model(batch_x)
loss = criterion(pred, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 求正确率
pred = pred.max(1)[1]
correct += (pred == batch_y).sum()
num += len(batch_y)
# if (epoch + 1) % 10 == 0:
print("epoch:", epoch + 1)
# print('loss=', '{:.6f}'.format(loss.item()))
acc = correct.item() / num
print(f'loss: {loss.item():.4f} | acc: {acc*100:.2f}%')
if acc > acc_M:
acc_M = acc
torch.save(model.state_dict(), './model.pkl')
# 加載訓練好的模型
if os.path.exists("./model.pkl"):
model.load_state_dict(torch.load("./model.pkl"))
def test():
loss_M = 0.0
correct = 0.0
num = 0
for batch_x, batch_y in test_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
pred = model(batch_x)
loss = criterion(pred, batch_y)
# loss_M += loss.item()
# 求正确率
pred = pred.max(1)[1]
correct += (pred == batch_y).sum()
num += len(batch_y)
acc = correct.item() / num
# loss = loss_M / len(test_loader)
print(f'loss: {loss.item():.4f} | acc: {acc*100:.2f}%')
if __name__ == '__main__':
# print("開始訓練")
# train()
print("開始測試")
test()
希望能對您有所幫助!