本文包括一下流程:
1.下载数据集,这次是使用的斯坦福提供的数据集[IMDB]
2.数据集处理
3.构建网络模型
4.训练和测试
一.数据集处理
首先加载数据集.我的项目路径如下图:
# 读取文本数据,data形式:[[['hello','word'],标签]....]
def load_data(path, flag='train'):
labels = ['pos', 'neg']
data = []
for label in labels:
files = os.listdir(os.path.join(path, flag, label))
# 去除标点符号
r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
for file in files:
with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
temp = rf.read().replace('\n', '')
temp = temp.replace('<br /><br />', ' ')
temp = re.sub(r, '', temp)
temp = temp.split(' ')
temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
if label == 'pos':
data.append([temp, 1])
elif label == 'neg':
data.append([temp, 0])
return data
train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')
得到数据集后,需要统计词频和构建词典
def fit(sentence):
"""
统计词频
:param sentence:
:return:
"""
for word in sentence:
# 字典(Dictionary) get(key,default=None) 函数返回指定键的值,如果值不在字典中返回默认值。
word_count[word] = word_count.get(word, 0) + 1
def build_vocab(min_count=5, max_count=1000, max_features=25000):
"""
:param min_count: 最小词频
:param max_count: 最大词频
:param max_features: 最大词语数
:return:
"""
global word_count
global word_idx
word_count = {word: count for word, count in word_count.items() if count > min_count}
if max_count is not None:
word_count = {word: count for word, count in word_count.items() if count <= max_count}
if max_features is not None:
# 排序
word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])
for word in word_count:
# 将单词对应自己的id
word_idx[word] = len(word_idx) # 每次word对应一个序号
然后需要将数据集中的句子有单词形式转换成数字形式.
def transform(sentence, max_len=200):
"""
把句子转换为数字序列
:param sentence:
:param max_len: 句子的最大长度
:return:
"""
if len(sentence) > max_len:
# 句子太长时进行截断
sentence = sentence[:max_len]
else:
# 句子长度不够标准长度时,进行填充
sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
# 句子中的单词没有出现过再词典中的设置为数字1
return [word_idx.get(word, UNK) for word in sentence]
利用上面三个函数,我们能够得到可以在模型中使用的数据集.
# 生成数字形式的数据集
def make_data(train_Datas, test_Datas):
train_inputs = []
train_labels = []
test_inputs = []
test_labels = []
for Datas in [train_Datas, test_Datas]:
for data in Datas:
fit(data[0])
build_vocab() # 给单词编号
# 得到转换为数字序列的inputs,labels数据
# 得到训练数据集
for data in train_Datas:
train_inputs.append(transform(data[0]))
train_labels.append(data[1])
for data in test_Datas:
test_inputs.append(transform(data[0]))
test_labels.append(data[1])
return train_inputs, train_labels, test_inputs, test_labels
之后将数据集转换成torch.LongTensor格式,并将数据集加载成批处理.
train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)
train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)
test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)
# 加载训练数据集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)
# 加载测试数据集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)
完整代码如下:
import os
import torch
from torch import optim
from torch.nn import RNN, LSTM, LSTMCell
import numpy as np
import torch.nn.functional as F
import re
import torch.nn as nn
import random
import torch.utils.data as Data
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
UNK_TAG = "<UNK>" # 未知单词赋值
PAD_TAG = "<PAD>" # 句子长度不够补值
UNK = 1
PAD = 0
word_idx = {PAD_TAG: PAD, UNK_TAG: UNK} # 词典
word_count = {} # 统计词频
def fit(sentence):
"""
统计词频
:param sentence:
:return:
"""
for word in sentence:
# 字典(Dictionary) get(key,default=None) 函数返回指定键的值,如果值不在字典中返回默认值。
word_count[word] = word_count.get(word, 0) + 1
def build_vocab(min_count=5, max_count=1000, max_features=25000):
"""
:param min_count: 最小词频
:param max_count: 最大词频
:param max_features: 最大词语数
:return:
"""
global word_count
global word_idx
word_count = {word: count for word, count in word_count.items() if count > min_count}
if max_count is not None:
word_count = {word: count for word, count in word_count.items() if count <= max_count}
if max_features is not None:
# 排序
word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])
for word in word_count:
# 将单词对应自己的id
word_idx[word] = len(word_idx) # 每次word对应一个序号
def transform(sentence, max_len=200):
"""
把句子转换为数字序列
:param sentence:
:param max_len: 句子的最大长度
:return:
"""
if len(sentence) > max_len:
# 句子太长时进行截断
sentence = sentence[:max_len]
else:
# 句子长度不够标准长度时,进行填充
sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
# 句子中的单词没有出现过再词典中的设置为数字1
return [word_idx.get(word, UNK) for word in sentence]
# 生成数字形式的数据集
def make_data(train_Datas, test_Datas):
train_inputs = []
train_labels = []
test_inputs = []
test_labels = []
for Datas in [train_Datas, test_Datas]:
for data in Datas:
fit(data[0])
build_vocab() # 给单词编号
# 得到转换为数字序列的inputs,labels数据
# 得到训练数据集
for data in train_Datas:
train_inputs.append(transform(data[0]))
train_labels.append(data[1])
for data in test_Datas:
test_inputs.append(transform(data[0]))
test_labels.append(data[1])
return train_inputs, train_labels, test_inputs, test_labels
# 读取文本数据,data形式:[[['hello','word'],标签]....]
def load_data(path, flag='train'):
labels = ['pos', 'neg']
data = []
for label in labels:
files = os.listdir(os.path.join(path, flag, label))
# 去除标点符号
r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
for file in files:
with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
temp = rf.read().replace('\n', '')
temp = temp.replace('<br /><br />', ' ')
temp = re.sub(r, '', temp)
temp = temp.split(' ')
temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
if label == 'pos':
data.append([temp, 1])
elif label == 'neg':
data.append([temp, 0])
return data
train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')
train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)
train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)
test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)
# 加载训练数据集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)
# 加载测试数据集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)
vocab_size = len(word_idx) # 词典数量
dmodel = 512 # embedding层词向量
num_filter = 100 # 卷积核个数
filter_size = [2, 3, 4] # 卷积核的长,取了三种
output_dim = 2 # 种类
# 这个模型也可以使用
# class MyModel(nn.Module):
# def __init__(self):
# super(MyModel, self).__init__()
#
# self.w = nn.Embedding(vocab_size,dmodel)
# self.dropout = nn.Dropout(0.05)
# self.fc2 = nn.Linear(dmodel, 2)
#
# def forward(self,x):
# embedded = self.dropout(self.w(x)) # [batch_size,seq_len,embeding_size]
# # [batch_size, embedding_dim]把单词长度的维度压扁为1,并降维
# # embedded 为input_size,(embedded.shape[1], 1)) 为kernel_size
# # squeeze(1)表示删除索引为1的那个维度
# pooled = F.avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1)
# return self.fc2(pooled)
# 这个是使用卷积神经网络实现
class MyModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, num_filter,
filter_sizes, output_dim, dropout=0.2, pad_idx=0):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.convs = nn.ModuleList([
nn.Conv2d(in_channels=1, out_channels=num_filter,
kernel_size=(fs, embedding_dim))
for fs in filter_sizes
])
# in_channels:输入的channel,文字都是1
# out_channels:输出的channel维度
# fs:每次滑动窗口计算用到几个单词,相当于n-gram中的n
# for fs in filter_sizes用好几个卷积模型最后concate起来看效果。
self.fc = nn.Linear(len(filter_sizes) * num_filter, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
embedded = self.dropout(self.embedding(text)) # [batch size, sent len, emb dim]
embedded = embedded.unsqueeze(1) # [batch size, 1, sent len, emb dim]
# 升维是为了和nn.Conv2d的输入维度吻合,把channel列升维。
conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
# conved = [batch size, num_filter, sent len - filter_sizes+1]
# 有几个filter_sizes就有几个conved
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved] # [batch,num_filter]
cat = self.dropout(torch.cat(pooled, dim=1))
# cat = [batch size, num_filter * len(filter_sizes)]
# 把 len(filter_sizes)个卷积模型concate起来传到全连接层。
return self.fc(cat)
model = MyModel(vocab_size, dmodel, num_filter=num_filter, filter_sizes=filter_size, output_dim=output_dim).to(device)
optimizer = optim.Adam(model.parameters(), 0.01)
criterion = nn.CrossEntropyLoss().to(device)
# Training
def train():
model.train()
acc_M = 0.0
for epoch in range(20):
correct = 0.0
num = 0.0
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
pred = model(batch_x)
loss = criterion(pred, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 求正确率
pred = pred.max(1)[1]
correct += (pred == batch_y).sum()
num += len(batch_y)
# if (epoch + 1) % 10 == 0:
print("epoch:", epoch + 1)
# print('loss=', '{:.6f}'.format(loss.item()))
acc = correct.item() / num
print(f'loss: {loss.item():.4f} | acc: {acc*100:.2f}%')
if acc > acc_M:
acc_M = acc
torch.save(model.state_dict(), './model.pkl')
# 加载训练好的模型
if os.path.exists("./model.pkl"):
model.load_state_dict(torch.load("./model.pkl"))
def test():
loss_M = 0.0
correct = 0.0
num = 0
for batch_x, batch_y in test_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
pred = model(batch_x)
loss = criterion(pred, batch_y)
# loss_M += loss.item()
# 求正确率
pred = pred.max(1)[1]
correct += (pred == batch_y).sum()
num += len(batch_y)
acc = correct.item() / num
# loss = loss_M / len(test_loader)
print(f'loss: {loss.item():.4f} | acc: {acc*100:.2f}%')
if __name__ == '__main__':
# print("开始训练")
# train()
print("开始测试")
test()
希望能对您有所帮助!