項目代碼見Github:
-
算法介紹
具體内容詳見本人文檔,下載下傳連結
-
代碼所用資料
原網站
檔案結構
├─doc_classification.py
├─stopwords.txt
├─vocabulary.txt
├─train.data
├─train.label
├─train.map
├─test.data
├─test.label
└─test.map
-
python代碼
需要安裝的庫:
pandas, liblinearutil
注:Windows平台下 liblinearutil 安裝包(32/64)
# doc_classification.py
import pandas as pd
import math
from liblinearutil import *
import time
# 讀取資料
def loadOriginData(src='train'):
# train.data
dataSrc = r'%s.data' % src
# train.label
labelSrc = r'%s.label' % src
label = pd.read_table(labelSrc, sep=' ', names=['label'])
# train.map
mapSrc = r'%s.map' % src
# 每個文檔擁有的terms
doc2term = {}
# 每個term出現在哪些文檔
term2doc = {}
# 每個類别下有哪些docs
cate2docs = {}
# TF值
TF = {}
with open(dataSrc, 'r') as f:
for line in f:
str_docIdx, str_wordIdx, str_cnt = line.split()
docIdx = int(str_docIdx)
wordIdx = int(str_wordIdx)
cnt = int(str_cnt)
# update 資料結構
doc2term.setdefault(docIdx, []).append(wordIdx)
term2doc.setdefault(wordIdx, []).append(docIdx)
TF.setdefault(docIdx, {})[wordIdx] = cnt
# 統計每個類别下有哪些文檔
with open(labelSrc, 'r') as f:
for line_index, line in enumerate(f, ):
labelVal = int(line.strip())
cate2docs.setdefault(labelVal, []).append(line_index)
return TF, doc2term, term2doc, cate2docs, label
# 特征選擇
def featureSel(doc2term, term2doc, cate2docs):
# CHI衡量的是特征項ti和類别Cj之間的關聯程度, A,B, C, D是四個統計量
CHI_cat2term = {}
# N:total number of documents
N = len(doc2term)
# A + B + C + D = N
# A: term出現在某類别中的文檔總數
A = {}
# B: term出現在除某類别外的其他文檔數
B = {}
# C: 該類别中不包含term的文檔總數
C = {}
# D: 其他類别中不包含term的文檔總數
D = {}
DF = {}
# 所有類别
categories = list(cate2docs.keys())
# 停用詞詞彙表
stopwords = {}
stopwordsSrc = r'stopwords.txt'
with open(stopwordsSrc) as f:
for line in f:
stopwords[line.strip()] = True
# 訓練資料資料詞彙表
vocSrc = r'vocabulary.txt'
voc = pd.read_table(vocSrc, names=['voc'])
# 儲存所有的特征
features = set()
# 計算一個類别标簽下各個詞的CHI
for category in categories:
# 屬于第category類的文檔為docs
docs = cate2docs[category]
sumVal =
for term in term2doc:
# 如果是停用詞, 則将CHI置零
if stopwords.get(voc['voc'][term - ], False):
CHI_cat2term.setdefault(category, {})[term] =
continue
# 屬于某類且包含term
AVal = len(set(term2doc[term]).intersection(set(docs)))
# 不屬于某類但包含term
BVal = len(term2doc[term]) - AVal
# 屬于某類,但不包含term
CVal = len(docs) - AVal
# 不屬于某類, 不包含term
DVal = N - AVal - BVal - CVal
CHIVal = N * (AVal * DVal - CVal * BVal)** / ((AVal + CVal) * (BVal + DVal) * (AVal + BVal) * (CVal + DVal))
# CHIVal = math.log(AVal * N / ((AVal + CVal) * (AVal + BVal)))
A.setdefault((term, category), AVal)
B.setdefault((term, category), BVal)
C.setdefault((term, category), CVal)
D.setdefault((term, category), DVal)
CHI_cat2term.setdefault(category, {})[term] = CHIVal
DF[term] = AVal + BVal
sumVal += CHIVal
# 選出類别中CHI高于平均值的詞
terms = CHI_cat2term[category]
meanVal = sumVal / len(terms)
for term in terms:
if CHI_cat2term[category][term] > meanVal:
features.add(term)
# for feature in features:
# print(voc['voc'][feature])
print('There are %d features in VSM model.\n' % len(features))
return features, DF
def buildSVMData(TF, DF, features, N, label, cate2docs, doc2terms):
isFeatures = dict(zip(features, [True] * len(features)))
categories = list(cate2docs.keys())
# 如果是訓練樣本, 則計算歸一化縮放因子,并傳回
# y: label值
y = [] * N
# x: 稀疏矩陣
x = []
for i in range(N):
x.append({})
for category in categories:
for doc in cate2docs[category]:
# 給y進行标記類别
y[doc - ] = label.iat[doc - , ]
scale_factor = -
for term in doc2terms[doc]:
if isFeatures.get(term, False): # 如果term是特征
# TF值
TFVal = TF[doc].get(term, )
# TF-IDF值
tf_idf = TFVal * math.log(N / DF[term])
x[doc - ][term] = tf_idf
# 更新特征最大值
if scale_factor < tf_idf:
scale_factor = tf_idf
alpha =
# 按一篇文檔中特征詞最大的tf-idf, 對該文檔中的所有特征詞進行歸一化
for term in doc2terms[doc]:
if isFeatures.get(term, False): # 如果term是特征
# x[doc - 1][term] = alpha + (1 - alpha) * x[doc - 1][term] / scale_factor
x[doc - ][term] /= scale_factor
print("Data for SVM has been built.\n")
return x, y
# 計算DF
def getDF(doc2term, term2doc, cate2docs):
DF = {}
for term in term2doc:
DF[term] = len(term2doc[term])
return DF
if __name__ == '__main__':
start = time.time()
# # 主程式
TF, doc2term, term2doc, cate2docs, label = loadOriginData()
# 特征選擇
features, DF = featureSel(doc2term, term2doc, cate2docs)
# 讀取資料(train.data)
TF, doc2term, term2doc, cate2docs, label = loadOriginData()
# 特征選擇
features, DF = featureSel(doc2term, term2doc, cate2docs)
# build SVM model
x, y = buildSVMData(TF, DF, features, len(doc2term), label, cate2docs, doc2term)
# 讀取測試資料(test.data)
TF_test, doc2term_test, term2doc_test, cate2docs_test, label_test = loadOriginData(src='test')
DF_test = getDF(doc2term_test, term2doc_test, cate2docs_test)
# TF, DF, features, len(doc2term), label, cate2docs, doc2term, scales)
x_test, y_test = buildSVMData(TF_test, DF_test, features, len(doc2term_test), label_test, cate2docs_test, doc2term_test)
print("處理資料使用了 %s s時間。\n" % (time.time() - start))
# # 調用 liblinear 庫進行分類
prob = problem(y, x)
param = parameter('-s 0 -c 4 -B 1')
# 訓練
m = train(prob, param)
# 預測test.data
p_label, p_acc, p_vals = predict(y_test, x_test, m, '-b 1')
# 評價
ACC, MSE, SCC = evaluations(y_test, p_label)
print('ACC:\n', ACC)
print('MSE', MSE)
print('SCC', SCC)
# 統計每類中錯誤率
categoriesErrs = {}
for doc_index, doc_label in enumerate(y_test):
if doc_label != int(p_label[doc_index]):
cateogory = label_test.iat[doc_index, ]
categoriesErrs.setdefault(cateogory, []).append(doc_index + )
# with open('outcome.txt', 'wb') as f:
print("錯誤分類的樣本為:\n")
for categoryErr in categoriesErrs:
numOfErr = len(categoriesErrs[categoryErr])
print('第%d類共 %d樣本, 被錯分的個數為 %d, 比例為 %f %%.\n' % (categoryErr,len(cate2docs_test[categoryErr]), numOfErr, numOfErr/len(cate2docs_test[categoryErr])))
end = time.time()
print("Total time cost is %s s.\n" % (end - start))