天天看点

pytorch实现文本分类实战

本文包括一下流程:

1.下载数据集,这次是使用的斯坦福提供的数据集[IMDB]

2.数据集处理

3.构建网络模型

4.训练和测试

一.数据集处理

首先加载数据集.我的项目路径如下图:

pytorch实现文本分类实战
# 读取文本数据,data形式:[[['hello','word'],标签]....]
def load_data(path, flag='train'):
    labels = ['pos', 'neg']
    data = []
    for label in labels:
        files = os.listdir(os.path.join(path, flag, label))
        # 去除标点符号
        r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
        for file in files:
            with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
                temp = rf.read().replace('\n', '')
                temp = temp.replace('<br /><br />', ' ')
                temp = re.sub(r, '', temp)
                temp = temp.split(' ')
                temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
                if label == 'pos':
                    data.append([temp, 1])
                elif label == 'neg':
                    data.append([temp, 0])
    return data

train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')
           

得到数据集后,需要统计词频和构建词典

def fit(sentence):
    """
    统计词频
    :param sentence:
    :return:
    """
    for word in sentence:
        # 字典(Dictionary) get(key,default=None) 函数返回指定键的值,如果值不在字典中返回默认值。
        word_count[word] = word_count.get(word, 0) + 1


def build_vocab(min_count=5, max_count=1000, max_features=25000):
    """

    :param min_count: 最小词频
    :param max_count: 最大词频
    :param max_features: 最大词语数
    :return:
    """
    global word_count
    global word_idx
    word_count = {word: count for word, count in word_count.items() if count > min_count}
    if max_count is not None:
        word_count = {word: count for word, count in word_count.items() if count <= max_count}
    if max_features is not None:
        # 排序
        word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])

    for word in word_count:
        # 将单词对应自己的id
        word_idx[word] = len(word_idx)  # 每次word对应一个序号
           

然后需要将数据集中的句子有单词形式转换成数字形式.

def transform(sentence, max_len=200):
    """
    把句子转换为数字序列
    :param sentence:
    :param max_len: 句子的最大长度
    :return:
    """
    if len(sentence) > max_len:
        # 句子太长时进行截断
        sentence = sentence[:max_len]
    else:
        # 句子长度不够标准长度时,进行填充
        sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
    # 句子中的单词没有出现过再词典中的设置为数字1
    return [word_idx.get(word, UNK) for word in sentence]
           

利用上面三个函数,我们能够得到可以在模型中使用的数据集.

# 生成数字形式的数据集
def make_data(train_Datas, test_Datas):
    train_inputs = []
    train_labels = []
    test_inputs = []
    test_labels = []
    for Datas in [train_Datas, test_Datas]:
        for data in Datas:
            fit(data[0])

    build_vocab()  # 给单词编号

    # 得到转换为数字序列的inputs,labels数据
    # 得到训练数据集
    for data in train_Datas:
        train_inputs.append(transform(data[0]))
        train_labels.append(data[1])

    for data in test_Datas:
        test_inputs.append(transform(data[0]))
        test_labels.append(data[1])
    return train_inputs, train_labels, test_inputs, test_labels
           

之后将数据集转换成torch.LongTensor格式,并将数据集加载成批处理.

train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)

train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)

test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)

# 加载训练数据集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# 加载测试数据集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)
           

完整代码如下:

import os
import torch
from torch import optim
from torch.nn import RNN, LSTM, LSTMCell
import numpy as np
import torch.nn.functional as F
import re
import torch.nn as nn
import random
import torch.utils.data as Data

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

UNK_TAG = "<UNK>"  # 未知单词赋值
PAD_TAG = "<PAD>"  # 句子长度不够补值
UNK = 1
PAD = 0

word_idx = {PAD_TAG: PAD, UNK_TAG: UNK}  # 词典
word_count = {}  # 统计词频


def fit(sentence):
    """
    统计词频
    :param sentence:
    :return:
    """
    for word in sentence:
        # 字典(Dictionary) get(key,default=None) 函数返回指定键的值,如果值不在字典中返回默认值。
        word_count[word] = word_count.get(word, 0) + 1


def build_vocab(min_count=5, max_count=1000, max_features=25000):
    """

    :param min_count: 最小词频
    :param max_count: 最大词频
    :param max_features: 最大词语数
    :return:
    """
    global word_count
    global word_idx
    word_count = {word: count for word, count in word_count.items() if count > min_count}
    if max_count is not None:
        word_count = {word: count for word, count in word_count.items() if count <= max_count}
    if max_features is not None:
        # 排序
        word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])

    for word in word_count:
        # 将单词对应自己的id
        word_idx[word] = len(word_idx)  # 每次word对应一个序号


def transform(sentence, max_len=200):
    """
    把句子转换为数字序列
    :param sentence:
    :param max_len: 句子的最大长度
    :return:
    """
    if len(sentence) > max_len:
        # 句子太长时进行截断
        sentence = sentence[:max_len]
    else:
        # 句子长度不够标准长度时,进行填充
        sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
    # 句子中的单词没有出现过再词典中的设置为数字1
    return [word_idx.get(word, UNK) for word in sentence]


# 生成数字形式的数据集
def make_data(train_Datas, test_Datas):
    train_inputs = []
    train_labels = []
    test_inputs = []
    test_labels = []
    for Datas in [train_Datas, test_Datas]:
        for data in Datas:
            fit(data[0])

    build_vocab()  # 给单词编号

    # 得到转换为数字序列的inputs,labels数据
    # 得到训练数据集
    for data in train_Datas:
        train_inputs.append(transform(data[0]))
        train_labels.append(data[1])

    for data in test_Datas:
        test_inputs.append(transform(data[0]))
        test_labels.append(data[1])
    return train_inputs, train_labels, test_inputs, test_labels


# 读取文本数据,data形式:[[['hello','word'],标签]....]
def load_data(path, flag='train'):
    labels = ['pos', 'neg']
    data = []
    for label in labels:
        files = os.listdir(os.path.join(path, flag, label))
        # 去除标点符号
        r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
        for file in files:
            with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
                temp = rf.read().replace('\n', '')
                temp = temp.replace('<br /><br />', ' ')
                temp = re.sub(r, '', temp)
                temp = temp.split(' ')
                temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
                if label == 'pos':
                    data.append([temp, 1])
                elif label == 'neg':
                    data.append([temp, 0])
    return data

train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')

train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)

train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)

test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)

# 加载训练数据集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# 加载测试数据集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)

vocab_size = len(word_idx)  # 词典数量
dmodel = 512  # embedding层词向量

num_filter = 100  # 卷积核个数
filter_size = [2, 3, 4]  # 卷积核的长,取了三种
output_dim = 2  # 种类


# 这个模型也可以使用
# class MyModel(nn.Module):
#     def __init__(self):
#         super(MyModel, self).__init__()
#
#         self.w = nn.Embedding(vocab_size,dmodel)
#         self.dropout = nn.Dropout(0.05)
#         self.fc2 = nn.Linear(dmodel, 2)
#
#     def forward(self,x):
#         embedded = self.dropout(self.w(x))  # [batch_size,seq_len,embeding_size]
#         # [batch_size, embedding_dim]把单词长度的维度压扁为1,并降维
#         # embedded 为input_size,(embedded.shape[1], 1)) 为kernel_size
#         # squeeze(1)表示删除索引为1的那个维度
#         pooled = F.avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1)
#         return self.fc2(pooled)

# 这个是使用卷积神经网络实现
class MyModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_filter,
                 filter_sizes, output_dim, dropout=0.2, pad_idx=0):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=num_filter,
                      kernel_size=(fs, embedding_dim))
            for fs in filter_sizes
        ])
        # in_channels:输入的channel,文字都是1
        # out_channels:输出的channel维度
        # fs:每次滑动窗口计算用到几个单词,相当于n-gram中的n
        # for fs in filter_sizes用好几个卷积模型最后concate起来看效果。

        self.fc = nn.Linear(len(filter_sizes) * num_filter, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))  # [batch size, sent len, emb dim]
        embedded = embedded.unsqueeze(1)  # [batch size, 1, sent len, emb dim]
        # 升维是为了和nn.Conv2d的输入维度吻合,把channel列升维。
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        # conved = [batch size, num_filter, sent len - filter_sizes+1]
        # 有几个filter_sizes就有几个conved

        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]  # [batch,num_filter]

        cat = self.dropout(torch.cat(pooled, dim=1))
        # cat = [batch size, num_filter * len(filter_sizes)]
        # 把 len(filter_sizes)个卷积模型concate起来传到全连接层。

        return self.fc(cat)


model = MyModel(vocab_size, dmodel, num_filter=num_filter, filter_sizes=filter_size, output_dim=output_dim).to(device)
optimizer = optim.Adam(model.parameters(), 0.01)
criterion = nn.CrossEntropyLoss().to(device)


# Training
def train():
    model.train()
    acc_M = 0.0
    for epoch in range(20):
        correct = 0.0
        num = 0.0
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            pred = model(batch_x)

            loss = criterion(pred, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # 求正确率
            pred = pred.max(1)[1]
            correct += (pred == batch_y).sum()
            num += len(batch_y)
        # if (epoch + 1) % 10 == 0:
        print("epoch:", epoch + 1)
        # print('loss=', '{:.6f}'.format(loss.item()))
        acc = correct.item() / num
        print(f'loss: {loss.item():.4f}  | acc: {acc*100:.2f}%')
        if acc > acc_M:
            acc_M = acc
            torch.save(model.state_dict(), './model.pkl')


# 加载训练好的模型
if os.path.exists("./model.pkl"):
    model.load_state_dict(torch.load("./model.pkl"))


def test():
    loss_M = 0.0
    correct = 0.0
    num = 0

    for batch_x, batch_y in test_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        pred = model(batch_x)
        loss = criterion(pred, batch_y)
        # loss_M += loss.item()

        # 求正确率
        pred = pred.max(1)[1]
        correct += (pred == batch_y).sum()
        num += len(batch_y)
    acc = correct.item() / num
    # loss = loss_M / len(test_loader)
    print(f'loss: {loss.item():.4f}  | acc: {acc*100:.2f}%')


if __name__ == '__main__':
    # print("开始训练")
    # train()
    print("开始测试")
    test()
           

希望能对您有所帮助!