天天看點

pytorch實作文本分類實戰

本文包括一下流程:

1.下載下傳資料集,這次是使用的斯坦福提供的資料集[IMDB]

2.資料集處理

3.建構網絡模型

4.訓練和測試

一.資料集處理

首先加載資料集.我的項目路徑如下圖:

pytorch實作文本分類實戰
# 讀取文本資料,data形式:[[['hello','word'],标簽]....]
def load_data(path, flag='train'):
    labels = ['pos', 'neg']
    data = []
    for label in labels:
        files = os.listdir(os.path.join(path, flag, label))
        # 去除标點符号
        r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
        for file in files:
            with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
                temp = rf.read().replace('\n', '')
                temp = temp.replace('<br /><br />', ' ')
                temp = re.sub(r, '', temp)
                temp = temp.split(' ')
                temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
                if label == 'pos':
                    data.append([temp, 1])
                elif label == 'neg':
                    data.append([temp, 0])
    return data

train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')
           

得到資料集後,需要統計詞頻和建構詞典

def fit(sentence):
    """
    統計詞頻
    :param sentence:
    :return:
    """
    for word in sentence:
        # 字典(Dictionary) get(key,default=None) 函數傳回指定鍵的值,如果值不在字典中傳回預設值。
        word_count[word] = word_count.get(word, 0) + 1


def build_vocab(min_count=5, max_count=1000, max_features=25000):
    """

    :param min_count: 最小詞頻
    :param max_count: 最大詞頻
    :param max_features: 最大詞語數
    :return:
    """
    global word_count
    global word_idx
    word_count = {word: count for word, count in word_count.items() if count > min_count}
    if max_count is not None:
        word_count = {word: count for word, count in word_count.items() if count <= max_count}
    if max_features is not None:
        # 排序
        word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])

    for word in word_count:
        # 将單詞對應自己的id
        word_idx[word] = len(word_idx)  # 每次word對應一個序号
           

然後需要将資料集中的句子有單詞形式轉換成數字形式.

def transform(sentence, max_len=200):
    """
    把句子轉換為數字序列
    :param sentence:
    :param max_len: 句子的最大長度
    :return:
    """
    if len(sentence) > max_len:
        # 句子太長時進行截斷
        sentence = sentence[:max_len]
    else:
        # 句子長度不夠标準長度時,進行填充
        sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
    # 句子中的單詞沒有出現過再詞典中的設定為數字1
    return [word_idx.get(word, UNK) for word in sentence]
           

利用上面三個函數,我們能夠得到可以在模型中使用的資料集.

# 生成數字形式的資料集
def make_data(train_Datas, test_Datas):
    train_inputs = []
    train_labels = []
    test_inputs = []
    test_labels = []
    for Datas in [train_Datas, test_Datas]:
        for data in Datas:
            fit(data[0])

    build_vocab()  # 給單詞編号

    # 得到轉換為數字序列的inputs,labels資料
    # 得到訓練資料集
    for data in train_Datas:
        train_inputs.append(transform(data[0]))
        train_labels.append(data[1])

    for data in test_Datas:
        test_inputs.append(transform(data[0]))
        test_labels.append(data[1])
    return train_inputs, train_labels, test_inputs, test_labels
           

之後将資料集轉換成torch.LongTensor格式,并将資料集加載成批處理.

train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)

train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)

test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)

# 加載訓練資料集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# 加載測試資料集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)
           

完整代碼如下:

import os
import torch
from torch import optim
from torch.nn import RNN, LSTM, LSTMCell
import numpy as np
import torch.nn.functional as F
import re
import torch.nn as nn
import random
import torch.utils.data as Data

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

UNK_TAG = "<UNK>"  # 未知單詞指派
PAD_TAG = "<PAD>"  # 句子長度不夠補值
UNK = 1
PAD = 0

word_idx = {PAD_TAG: PAD, UNK_TAG: UNK}  # 詞典
word_count = {}  # 統計詞頻


def fit(sentence):
    """
    統計詞頻
    :param sentence:
    :return:
    """
    for word in sentence:
        # 字典(Dictionary) get(key,default=None) 函數傳回指定鍵的值,如果值不在字典中傳回預設值。
        word_count[word] = word_count.get(word, 0) + 1


def build_vocab(min_count=5, max_count=1000, max_features=25000):
    """

    :param min_count: 最小詞頻
    :param max_count: 最大詞頻
    :param max_features: 最大詞語數
    :return:
    """
    global word_count
    global word_idx
    word_count = {word: count for word, count in word_count.items() if count > min_count}
    if max_count is not None:
        word_count = {word: count for word, count in word_count.items() if count <= max_count}
    if max_features is not None:
        # 排序
        word_count = dict(sorted(word_count.items(), key=lambda x: x[-1], reverse=True)[:max_features])

    for word in word_count:
        # 将單詞對應自己的id
        word_idx[word] = len(word_idx)  # 每次word對應一個序号


def transform(sentence, max_len=200):
    """
    把句子轉換為數字序列
    :param sentence:
    :param max_len: 句子的最大長度
    :return:
    """
    if len(sentence) > max_len:
        # 句子太長時進行截斷
        sentence = sentence[:max_len]
    else:
        # 句子長度不夠标準長度時,進行填充
        sentence = sentence + [PAD_TAG] * (max_len - len(sentence))
    # 句子中的單詞沒有出現過再詞典中的設定為數字1
    return [word_idx.get(word, UNK) for word in sentence]


# 生成數字形式的資料集
def make_data(train_Datas, test_Datas):
    train_inputs = []
    train_labels = []
    test_inputs = []
    test_labels = []
    for Datas in [train_Datas, test_Datas]:
        for data in Datas:
            fit(data[0])

    build_vocab()  # 給單詞編号

    # 得到轉換為數字序列的inputs,labels資料
    # 得到訓練資料集
    for data in train_Datas:
        train_inputs.append(transform(data[0]))
        train_labels.append(data[1])

    for data in test_Datas:
        test_inputs.append(transform(data[0]))
        test_labels.append(data[1])
    return train_inputs, train_labels, test_inputs, test_labels


# 讀取文本資料,data形式:[[['hello','word'],标簽]....]
def load_data(path, flag='train'):
    labels = ['pos', 'neg']
    data = []
    for label in labels:
        files = os.listdir(os.path.join(path, flag, label))
        # 去除标點符号
        r = '[’!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n。!,]+'
        for file in files:
            with open(os.path.join(path, flag, label, file), 'r', encoding='utf8') as rf:
                temp = rf.read().replace('\n', '')
                temp = temp.replace('<br /><br />', ' ')
                temp = re.sub(r, '', temp)
                temp = temp.split(' ')
                temp = [temp[i].lower() for i in range(len(temp)) if temp[i] != '']
                if label == 'pos':
                    data.append([temp, 1])
                elif label == 'neg':
                    data.append([temp, 0])
    return data

train_data = load_data('Imdb', 'train')
test_data = load_data('Imdb', 'test')

train_inputs, train_labels, test_inputs, test_labels = make_data(train_data, test_data)

train_inputs, train_labels = torch.LongTensor(train_inputs), torch.LongTensor(train_labels)

test_inputs, test_labels = torch.LongTensor(test_inputs), torch.LongTensor(test_labels)

# 加載訓練資料集
train_dataset = Data.TensorDataset(train_inputs, train_labels)
train_loader = Data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# 加載測試資料集
test_dataset = Data.TensorDataset(test_inputs, test_labels)
test_loader = Data.DataLoader(test_dataset, batch_size=64, shuffle=True)

vocab_size = len(word_idx)  # 詞典數量
dmodel = 512  # embedding層詞向量

num_filter = 100  # 卷積核個數
filter_size = [2, 3, 4]  # 卷積核的長,取了三種
output_dim = 2  # 種類


# 這個模型也可以使用
# class MyModel(nn.Module):
#     def __init__(self):
#         super(MyModel, self).__init__()
#
#         self.w = nn.Embedding(vocab_size,dmodel)
#         self.dropout = nn.Dropout(0.05)
#         self.fc2 = nn.Linear(dmodel, 2)
#
#     def forward(self,x):
#         embedded = self.dropout(self.w(x))  # [batch_size,seq_len,embeding_size]
#         # [batch_size, embedding_dim]把單詞長度的次元壓扁為1,并降維
#         # embedded 為input_size,(embedded.shape[1], 1)) 為kernel_size
#         # squeeze(1)表示删除索引為1的那個次元
#         pooled = F.avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1)
#         return self.fc2(pooled)

# 這個是使用卷積神經網絡實作
class MyModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_filter,
                 filter_sizes, output_dim, dropout=0.2, pad_idx=0):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=num_filter,
                      kernel_size=(fs, embedding_dim))
            for fs in filter_sizes
        ])
        # in_channels:輸入的channel,文字都是1
        # out_channels:輸出的channel次元
        # fs:每次滑動視窗計算用到幾個單詞,相當于n-gram中的n
        # for fs in filter_sizes用好幾個卷積模型最後concate起來看效果。

        self.fc = nn.Linear(len(filter_sizes) * num_filter, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))  # [batch size, sent len, emb dim]
        embedded = embedded.unsqueeze(1)  # [batch size, 1, sent len, emb dim]
        # 升維是為了和nn.Conv2d的輸入次元吻合,把channel列升維。
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        # conved = [batch size, num_filter, sent len - filter_sizes+1]
        # 有幾個filter_sizes就有幾個conved

        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]  # [batch,num_filter]

        cat = self.dropout(torch.cat(pooled, dim=1))
        # cat = [batch size, num_filter * len(filter_sizes)]
        # 把 len(filter_sizes)個卷積模型concate起來傳到全連接配接層。

        return self.fc(cat)


model = MyModel(vocab_size, dmodel, num_filter=num_filter, filter_sizes=filter_size, output_dim=output_dim).to(device)
optimizer = optim.Adam(model.parameters(), 0.01)
criterion = nn.CrossEntropyLoss().to(device)


# Training
def train():
    model.train()
    acc_M = 0.0
    for epoch in range(20):
        correct = 0.0
        num = 0.0
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            pred = model(batch_x)

            loss = criterion(pred, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # 求正确率
            pred = pred.max(1)[1]
            correct += (pred == batch_y).sum()
            num += len(batch_y)
        # if (epoch + 1) % 10 == 0:
        print("epoch:", epoch + 1)
        # print('loss=', '{:.6f}'.format(loss.item()))
        acc = correct.item() / num
        print(f'loss: {loss.item():.4f}  | acc: {acc*100:.2f}%')
        if acc > acc_M:
            acc_M = acc
            torch.save(model.state_dict(), './model.pkl')


# 加載訓練好的模型
if os.path.exists("./model.pkl"):
    model.load_state_dict(torch.load("./model.pkl"))


def test():
    loss_M = 0.0
    correct = 0.0
    num = 0

    for batch_x, batch_y in test_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        pred = model(batch_x)
        loss = criterion(pred, batch_y)
        # loss_M += loss.item()

        # 求正确率
        pred = pred.max(1)[1]
        correct += (pred == batch_y).sum()
        num += len(batch_y)
    acc = correct.item() / num
    # loss = loss_M / len(test_loader)
    print(f'loss: {loss.item():.4f}  | acc: {acc*100:.2f}%')


if __name__ == '__main__':
    # print("開始訓練")
    # train()
    print("開始測試")
    test()
           

希望能對您有所幫助!