【自然語言處理(NLP)】基于LSTM的謠言檢測
作者簡介:在校大學生一枚,華為雲享專家,阿裡雲專家部落客,騰雲先鋒(TDP)成員,雲曦智劃項目總負責人,全國高等學校計算機教學與産業實踐資源建設專家委員會(TIPCC)志願者,以及程式設計愛好者,期待和大家一起學習,一起進步~
.
部落格首頁:ぃ靈彧が的學習日志
.
本文專欄:機器學習
.
專欄寄語:若你決定燦爛,山無遮,海無攔
.
(文章目錄)
前言
(一)、任務描述
本次實踐使用基于循環神經網絡(RNN)的謠言檢測模型,将文本中的謠言事件向量化,通過循環神經網絡的學習訓練來挖掘表示文本深層的特征,避免了特征建構的問題,并能發現那些不容易被人發現的特征,進而産生更好的效果。
資料集介紹:
本次實踐所使用的資料是從新浪微網誌不實資訊舉報平台抓取的中文謠言資料,資料集中共包含1538條謠言和1849條非謠言。如下圖所示,每條資料均為json格式,其中text字段代表微網誌原文的文字内容。
更多資料集介紹請參考https://github.com/thunlp/Chinese_Rumor_Dataset
(二)、環境配置
本示例基于飛槳開源架構2.0版本。
import paddle
import numpy as np
import matplotlib.pyplot as plt
print(paddle.__version__)
輸出結果如下圖1所示:
一、資料準備
(1)解壓資料,讀取并解析資料,生成all_data.txt
(2)生成資料字典,即dict.txt
(3)生成資料清單,并進行訓練集與驗證集的劃分,train_list.txt 、eval_list.txt
(4)定義訓練資料集提供器
(一)、解壓資料
import os, zipfile
src_path="data/data20519/Rumor_Dataset.zip"
target_path="/home/aistudio/data/Chinese_Rumor_Dataset-master"
if(not os.path.isdir(target_path)):
z = zipfile.ZipFile(src_path, 'r')
z.extractall(path=target_path)
z.close()
import io
import random
import json
#謠言資料檔案路徑
rumor_class_dirs = os.listdir(target_path+"/Chinese_Rumor_Dataset-master/CED_Dataset/rumor-repost/")
#非謠言資料檔案路徑
non_rumor_class_dirs = os.listdir(target_path+"/Chinese_Rumor_Dataset-master/CED_Dataset/non-rumor-repost/")
original_microblog = target_path+"/Chinese_Rumor_Dataset-master/CED_Dataset/original-microblog/"
#謠言标簽為0,非謠言标簽為1
rumor_label="0"
non_rumor_label="1"
#分别統計謠言資料與非謠言資料的總數
rumor_num = 0
non_rumor_num = 0
all_rumor_list = []
all_non_rumor_list = []
#解析謠言資料
for rumor_class_dir in rumor_class_dirs:
if(rumor_class_dir != '.DS_Store'):
#周遊謠言資料,并解析
with open(original_microblog + rumor_class_dir, 'r') as f:
rumor_content = f.read()
rumor_dict = json.loads(rumor_content)
all_rumor_list.append(rumor_label+"\t"+rumor_dict["text"]+"\n")
rumor_num +=1
#解析非謠言資料
for non_rumor_class_dir in non_rumor_class_dirs:
if(non_rumor_class_dir != '.DS_Store'):
with open(original_microblog + non_rumor_class_dir, 'r') as f2:
non_rumor_content = f2.read()
non_rumor_dict = json.loads(non_rumor_content)
all_non_rumor_list.append(non_rumor_label+"\t"+non_rumor_dict["text"]+"\n")
non_rumor_num +=1
print("謠言資料總量為:"+str(rumor_num))
print("非謠言資料總量為:"+str(non_rumor_num))
輸出結果如下圖2所示:
(二)、寫入all_data.txt
#全部資料進行亂序後寫入all_data.txt
data_list_path="/home/aistudio/data/"
all_data_path=data_list_path + "all_data.txt"
all_data_list = all_rumor_list + all_non_rumor_list
random.shuffle(all_data_list)
#在生成all_data.txt之前,首先将其清空
with open(all_data_path, 'w') as f:
f.seek(0)
f.truncate()
with open(all_data_path, 'a') as f:
for data in all_data_list:
f.write(data)
(三)、生成資料字典
# 生成資料字典
def create_dict(data_path, dict_path):
with open(dict_path, 'w') as f:
f.seek(0)
f.truncate()
dict_set = set()
# 讀取全部資料
with open(data_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 把資料生成一個元組
for line in lines:
content = line.split('\t')[-1].replace('\n', '')
for s in content:
dict_set.add(s)
# 把元組轉換成字典,一個字對應一個數字
dict_list = []
i = 0
for s in dict_set:
dict_list.append([s, i])
i += 1
# 添加未知字元
dict_txt = dict(dict_list)
end_dict = {"<unk>": i}
dict_txt.update(end_dict)
end_dict = {"<pad>": i+1}
dict_txt.update(end_dict)
# 把這些字典儲存到本地中
with open(dict_path, 'w', encoding='utf-8') as f:
f.write(str(dict_txt))
print("資料字典生成完成!")
(四)、資料集劃分
# 建立序列化表示的資料,并按照一定比例劃分訓練資料train_list.txt與驗證資料eval_list.txt
def create_data_list(data_list_path):
#在生成資料之前,首先将eval_list.txt和train_list.txt清空
with open(os.path.join(data_list_path, 'eval_list.txt'), 'w', encoding='utf-8') as f_eval:
f_eval.seek(0)
f_eval.truncate()
with open(os.path.join(data_list_path, 'train_list.txt'), 'w', encoding='utf-8') as f_train:
f_train.seek(0)
f_train.truncate()
with open(os.path.join(data_list_path, 'dict.txt'), 'r', encoding='utf-8') as f_data:
dict_txt = eval(f_data.readlines()[0])
with open(os.path.join(data_list_path, 'all_data.txt'), 'r', encoding='utf-8') as f_data:
lines = f_data.readlines()
i = 0
maxlen = 0
with open(os.path.join(data_list_path, 'eval_list.txt'), 'a', encoding='utf-8') as f_eval,open(os.path.join(data_list_path, 'train_list.txt'), 'a', encoding='utf-8') as f_train:
for line in lines:
words = line.split('\t')[-1].replace('\n', '')
maxlen = max(maxlen, len(words))
label = line.split('\t')[0]
labs = ""
# 每8個 抽取一個資料用于驗證
if i % 8 == 0:
for s in words:
lab = str(dict_txt[s])
labs = labs + lab + ','
labs = labs[:-1]
labs = labs + '\t' + label + '\n'
f_eval.write(labs)
else:
for s in words:
lab = str(dict_txt[s])
labs = labs + lab + ','
labs = labs[:-1]
labs = labs + '\t' + label + '\n'
f_train.write(labs)
i += 1
print("資料清單生成完成!")
print("樣本最長長度:" + str(maxlen))
# 把生成的資料清單都放在自己的總類别檔案夾中
data_root_path = "/home/aistudio/data/"
data_path = os.path.join(data_root_path, 'all_data.txt')
dict_path = os.path.join(data_root_path, "dict.txt")
# 建立資料字典
create_dict(data_path, dict_path)
# 建立資料清單
create_data_list(data_root_path)
輸出結果如下圖3所示:
(五)、定義訓練資料集提供器
def load_vocab(file_path):
fr = open(file_path, 'r', encoding='utf8')
vocab = eval(fr.read()) #讀取的str轉換為字典
fr.close()
return vocab
# 列印前2條訓練資料
vocab = load_vocab(os.path.join(data_root_path, 'dict.txt'))
def ids_to_str(ids):
words = []
for k in ids:
w = list(vocab.keys())[list(vocab.values()).index(int(k))]
words.append(w if isinstance(w, str) else w.decode('ASCII'))
return " ".join(words)
file_path = os.path.join(data_root_path, 'train_list.txt')
with io.open(file_path, "r", encoding='utf8') as fin:
i = 0
for line in fin:
i += 1
cols = line.strip().split("\t")
if len(cols) != 2:
sys.stderr.write("[NOTICE] Error Format Line!")
continue
label = int(cols[1])
wids = cols[0].split(",")
print(str(i)+":")
print('sentence list id is:', wids)
print('sentence list is: ', ids_to_str(wids))
print('sentence label id is:', label)
print('---------------------------------')
if i == 2: break
輸出結果如下圖4所示:
vocab = load_vocab(os.path.join(data_root_path, 'dict.txt'))
class RumorDataset(paddle.io.Dataset):
def __init__(self, data_dir):
self.data_dir = data_dir
self.all_data = []
with io.open(self.data_dir, "r", encoding='utf8') as fin:
for line in fin:
cols = line.strip().split("\t")
if len(cols) != 2:
sys.stderr.write("[NOTICE] Error Format Line!")
continue
label = []
label.append(int(cols[1]))
wids = cols[0].split(",")
if len(wids)>=150:
wids = np.array(wids[:150]).astype('int64')
else:
wids = np.concatenate([wids, [vocab["<pad>"]]*(150-len(wids))]).astype('int64')
label = np.array(label).astype('int64')
self.all_data.append((wids, label))
def __getitem__(self, index):
data, label = self.all_data[index]
return data, label
def __len__(self):
return len(self.all_data)
batch_size = 32
train_dataset = RumorDataset(os.path.join(data_root_path, 'train_list.txt'))
test_dataset = RumorDataset(os.path.join(data_root_path, 'eval_list.txt'))
train_loader = paddle.io.DataLoader(train_dataset, places=paddle.CPUPlace(), return_list=True,
shuffle=True, batch_size=batch_size, drop_last=True)
test_loader = paddle.io.DataLoader(test_dataset, places=paddle.CPUPlace(), return_list=True,
shuffle=True, batch_size=batch_size, drop_last=True)
#check
print('=============train_dataset =============')
for data, label in train_dataset:
print(data)
print(np.array(data).shape)
print(label)
break
print('=============test_dataset =============')
for data, label in test_dataset:
print(data)
print(np.array(data).shape)
print(label)
break
輸出結果如下圖5所示:
二、模型配置
import paddle
from paddle.nn import Conv2D, Linear, Embedding
from paddle import to_tensor
import paddle.nn.functional as F
class RNN(paddle.nn.Layer):
def __init__(self):
super(RNN, self).__init__()
self.dict_dim = vocab["<pad>"]
self.emb_dim = 128
self.hid_dim = 128
self.class_dim = 2
self.embedding = Embedding(
self.dict_dim + 1, self.emb_dim,
sparse=False)
self._fc1 = Linear(self.emb_dim, self.hid_dim)
self.lstm = paddle.nn.LSTM(self.hid_dim, self.hid_dim)
self.fc2 = Linear(19200, self.class_dim)
def forward(self, inputs):
# [32, 150]
emb = self.embedding(inputs)
# [32, 150, 128]
fc_1 = self._fc1(emb)
# [32, 150, 128]
x = self.lstm(fc_1)
x = paddle.reshape(x[0], [0, -1])
x = self.fc2(x)
x = paddle.nn.functional.softmax(x)
return x
rnn = RNN()
paddle.summary(rnn,(32,150),"int64")
輸出結果如下圖6所示:
三、模型訓練
# 建構遷移網絡,使用ERNIE的token-level輸出
query = outputs["sequence_output"]
title = outputs['sequence_output_2']
# 建立pointwise文本比對任務
pointwise_matching_task = hub.PointwiseTextMatchingTask(
dataset=dataset,
query_feature=query,
title_feature=title,
tokenizer=tokenizer,
config=config)
四、開始Finetune
def draw_process(title,color,iters,data,label):
plt.title(title, fontsize=24)
plt.xlabel("iter", fontsize=20)
plt.ylabel(label, fontsize=20)
plt.plot(iters, data,color=color,label=label)
plt.legend()
plt.grid()
plt.show()
def train(model):
model.train()
opt = paddle.optimizer.Adam(learning_rate=0.002, parameters=model.parameters())
steps = 0
Iters, total_loss, total_acc = [], [], []
for epoch in range(3):
for batch_id, data in enumerate(train_loader):
steps += 1
sent = data[0]
label = data[1]
logits = model(sent)
loss = paddle.nn.functional.cross_entropy(logits, label)
acc = paddle.metric.accuracy(logits, label)
if batch_id % 50 == 0:
Iters.append(steps)
total_loss.append(loss.numpy()[0])
total_acc.append(acc.numpy()[0])
print("epoch: {}, batch_id: {}, loss is: {}".format(epoch, batch_id, loss.numpy()))
loss.backward()
opt.step()
opt.clear_grad()
# evaluate model after one epoch
model.eval()
accuracies = []
losses = []
for batch_id, data in enumerate(test_loader):
sent = data[0]
label = data[1]
logits = model(sent)
loss = paddle.nn.functional.cross_entropy(logits, label)
acc = paddle.metric.accuracy(logits, label)
accuracies.append(acc.numpy())
losses.append(loss.numpy())
avg_acc, avg_loss = np.mean(accuracies), np.mean(losses)
print("[validation] accuracy: {}, loss: {}".format(avg_acc, avg_loss))
model.train()
paddle.save(model.state_dict(),"model_final.pdparams")
draw_process("trainning loss","red",Iters,total_loss,"trainning loss")
draw_process("trainning acc","green",Iters,total_acc,"trainning acc")
model = RNN()
train(model)
輸出結果如下圖7、8、9所示:
五、模型評估
代碼如下:
'''
模型評估
'''
model_state_dict = paddle.load('model_final.pdparams')
model = RNN()
model.set_state_dict(model_state_dict)
model.eval()
label_map = {0:"是", 1:"否"}
samples = []
predictions = []
accuracies = []
losses = []
for batch_id, data in enumerate(test_loader):
sent = data[0]
label = data[1]
logits = model(sent)
for idx,probs in enumerate(logits):
# 映射分類label
label_idx = np.argmax(probs)
labels = label_map[label_idx]
predictions.append(labels)
samples.append(sent[idx].numpy())
loss = paddle.nn.functional.cross_entropy(logits, label)
acc = paddle.metric.accuracy(logits, label)
accuracies.append(acc.numpy())
losses.append(loss.numpy())
avg_acc, avg_loss = np.mean(accuracies), np.mean(losses)
print("[validation] accuracy: {}, loss: {}".format(avg_acc, avg_loss))
print('資料: {} \n\n是否謠言: {}'.format(ids_to_str(samples[0]), predictions[0]))
輸出結果如下圖10所示:
總結
本系列文章内容為根據清華社出版的《自然語言處理實踐》所作的相關筆記和感悟,其中代碼均為基于百度飛槳開發,若有任何侵權和不妥之處,請私信于我,定積極配合處理,看到必回!!!