天天看點

利用python,基于SVM實作文本分類

項目代碼見Github:

  1. 算法介紹

    具體内容詳見本人文檔,下載下傳連結

  2. 代碼所用資料

    原網站

    檔案結構

    ├─doc_classification.py

    ├─stopwords.txt

    ├─vocabulary.txt

    ├─train.data

    ├─train.label

    ├─train.map

    ├─test.data

    ├─test.label

    └─test.map

  3. python代碼

    需要安裝的庫:

    pandas, liblinearutil

    注:Windows平台下 liblinearutil 安裝包(32/64)

# doc_classification.py
import pandas as pd
import math
from liblinearutil import *
import time

# 讀取資料
def loadOriginData(src='train'):
    # train.data
    dataSrc = r'%s.data' % src
    # train.label
    labelSrc = r'%s.label' % src
    label = pd.read_table(labelSrc, sep=' ', names=['label'])
    # train.map
    mapSrc = r'%s.map' % src

    # 每個文檔擁有的terms
    doc2term = {}
    # 每個term出現在哪些文檔
    term2doc = {}
    # 每個類别下有哪些docs
    cate2docs = {}
    # TF值
    TF = {}
    with open(dataSrc, 'r') as f:
        for line in f:
            str_docIdx, str_wordIdx, str_cnt = line.split()
            docIdx = int(str_docIdx)
            wordIdx = int(str_wordIdx)
            cnt = int(str_cnt)
            # update 資料結構
            doc2term.setdefault(docIdx, []).append(wordIdx)
            term2doc.setdefault(wordIdx, []).append(docIdx)
            TF.setdefault(docIdx, {})[wordIdx] = cnt
    # 統計每個類别下有哪些文檔
    with open(labelSrc, 'r') as f:
        for line_index, line in enumerate(f, ):
            labelVal = int(line.strip())
            cate2docs.setdefault(labelVal, []).append(line_index)
    return TF, doc2term, term2doc, cate2docs, label


#  特征選擇
def featureSel(doc2term, term2doc, cate2docs):
    # CHI衡量的是特征項ti和類别Cj之間的關聯程度, A,B, C, D是四個統計量
    CHI_cat2term = {}
    # N:total  number of documents
    N = len(doc2term)
    # A + B + C + D = N
    # A: term出現在某類别中的文檔總數
    A = {}
    # B: term出現在除某類别外的其他文檔數
    B = {}
    # C:  該類别中不包含term的文檔總數
    C = {}
    # D: 其他類别中不包含term的文檔總數
    D = {}
    DF = {}
    # 所有類别
    categories = list(cate2docs.keys())
    # 停用詞詞彙表
    stopwords = {}
    stopwordsSrc = r'stopwords.txt'
    with open(stopwordsSrc) as f:
        for line in f:
            stopwords[line.strip()] = True
    # 訓練資料資料詞彙表
    vocSrc = r'vocabulary.txt'
    voc = pd.read_table(vocSrc, names=['voc'])
    # 儲存所有的特征
    features = set()
    # 計算一個類别标簽下各個詞的CHI
    for category in categories:
        # 屬于第category類的文檔為docs
        docs = cate2docs[category]
        sumVal = 
        for term in term2doc:
            # 如果是停用詞, 則将CHI置零
            if stopwords.get(voc['voc'][term - ], False):
                CHI_cat2term.setdefault(category, {})[term] = 
                continue
            # 屬于某類且包含term
            AVal = len(set(term2doc[term]).intersection(set(docs)))
            # 不屬于某類但包含term
            BVal = len(term2doc[term]) - AVal
            # 屬于某類,但不包含term
            CVal = len(docs) - AVal
            # 不屬于某類, 不包含term
            DVal = N - AVal - BVal - CVal
            CHIVal = N * (AVal * DVal - CVal * BVal)** / ((AVal + CVal) * (BVal + DVal) * (AVal + BVal) * (CVal + DVal))
            # CHIVal = math.log(AVal * N / ((AVal + CVal) * (AVal + BVal)))
            A.setdefault((term, category), AVal)
            B.setdefault((term, category), BVal)
            C.setdefault((term, category), CVal)
            D.setdefault((term, category), DVal)

            CHI_cat2term.setdefault(category, {})[term] = CHIVal
            DF[term] = AVal + BVal
            sumVal += CHIVal
        # 選出類别中CHI高于平均值的詞
        terms = CHI_cat2term[category]
        meanVal = sumVal / len(terms)
        for term in terms:
            if CHI_cat2term[category][term] > meanVal:
                features.add(term)
    # for feature in features:
    #     print(voc['voc'][feature])
    print('There are %d features in VSM model.\n' % len(features))
    return features,  DF


def buildSVMData(TF, DF, features, N, label, cate2docs, doc2terms):
    isFeatures = dict(zip(features, [True] * len(features)))
    categories = list(cate2docs.keys())
    # 如果是訓練樣本, 則計算歸一化縮放因子,并傳回
    # y: label值
    y = [] * N
    # x: 稀疏矩陣
    x = []
    for i in range(N):
        x.append({})
    for category in categories:
        for doc in cate2docs[category]:
            # 給y進行标記類别
            y[doc - ] = label.iat[doc - , ]
            scale_factor = -
            for term in doc2terms[doc]:
                if isFeatures.get(term, False):  # 如果term是特征
                    # TF值
                    TFVal = TF[doc].get(term, )
                    # TF-IDF值
                    tf_idf = TFVal * math.log(N / DF[term])
                    x[doc - ][term] = tf_idf
                    # 更新特征最大值
                    if scale_factor < tf_idf:
                        scale_factor = tf_idf
            alpha = 
            # 按一篇文檔中特征詞最大的tf-idf, 對該文檔中的所有特征詞進行歸一化
            for term in doc2terms[doc]:
                if isFeatures.get(term, False):  # 如果term是特征
                    # x[doc - 1][term] = alpha + (1 - alpha) * x[doc - 1][term] / scale_factor
                    x[doc - ][term] /= scale_factor
    print("Data for SVM has been built.\n")
    return x, y

# 計算DF
def getDF(doc2term, term2doc, cate2docs):
    DF = {}
    for term in term2doc:
        DF[term] = len(term2doc[term])
    return DF

if __name__ == '__main__':
    start = time.time()
    # # 主程式
    TF, doc2term, term2doc, cate2docs, label = loadOriginData()
    # 特征選擇
    features, DF = featureSel(doc2term, term2doc, cate2docs)
    # 讀取資料(train.data)
    TF, doc2term, term2doc, cate2docs, label = loadOriginData()
    # 特征選擇
    features, DF = featureSel(doc2term, term2doc, cate2docs)
    # build SVM model
    x, y = buildSVMData(TF, DF, features, len(doc2term), label, cate2docs, doc2term)
    # 讀取測試資料(test.data)
    TF_test, doc2term_test, term2doc_test, cate2docs_test, label_test = loadOriginData(src='test')
    DF_test = getDF(doc2term_test, term2doc_test, cate2docs_test)
    # TF, DF, features, len(doc2term), label, cate2docs, doc2term, scales)
    x_test, y_test = buildSVMData(TF_test, DF_test, features, len(doc2term_test), label_test, cate2docs_test, doc2term_test)

    print("處理資料使用了 %s s時間。\n" % (time.time() - start))
    # # 調用 liblinear 庫進行分類
    prob = problem(y, x)
    param = parameter('-s 0 -c 4 -B 1')
    # 訓練
    m = train(prob, param)
    # 預測test.data
    p_label, p_acc, p_vals = predict(y_test, x_test, m, '-b 1')
    # 評價
    ACC, MSE, SCC = evaluations(y_test, p_label)
    print('ACC:\n', ACC)
    print('MSE', MSE)
    print('SCC', SCC)
    # 統計每類中錯誤率
    categoriesErrs = {}
    for doc_index, doc_label in enumerate(y_test):
        if doc_label != int(p_label[doc_index]):
            cateogory = label_test.iat[doc_index, ]
            categoriesErrs.setdefault(cateogory, []).append(doc_index + )
    # with open('outcome.txt', 'wb') as f:
    print("錯誤分類的樣本為:\n")
    for categoryErr in categoriesErrs:
        numOfErr = len(categoriesErrs[categoryErr])
        print('第%d類共 %d樣本, 被錯分的個數為 %d, 比例為 %f %%.\n' % (categoryErr,len(cate2docs_test[categoryErr]), numOfErr, numOfErr/len(cate2docs_test[categoryErr])))

    end = time.time()
    print("Total time cost is  %s s.\n" % (end - start))