天天看點

HMM 隐馬爾可夫模型 代碼實作

#encoding:utf-8
import sys
import pickle
from copy import deepcopy

is_train = False

DEFAULT_PROB = 0.000000000001
MIN_PROB = -1 * float('inf')

train_path = "train.in"
test_path = "test.in"
output_path = "test.out"

#統計 各個次數 作為 各個機率
def train():
    print "start training ..."

    # 以下5個元素是HMM模型的參數
    V = set() # 觀測集合
    Q = set() # 狀态集合
    A = {} # 狀态轉移機率矩陣,P(狀态|狀态),是一個二層dict 具體是 pre_state->(state->prob)
    B = {} # 觀測機率矩陣,P(觀測|狀态),是一個二層dict 具體是 state->(observ->prob)
    PI = {} # 初始狀态機率向量

    # 統計模型參數
    with open(train_path, "rb") as infile:
        pre_s = -1 # t-1時刻的狀态
        for line in infile:
            segs = line.rstrip().split('\t')
            if len(segs) != 2: # 遇到空行時
                pre_s = -1
            else:
                o = segs[0] # t時刻的觀測o
                s = segs[1] # t時刻的狀态s
                # 統計狀态s到觀測o的次數
                B[s][o] = B.setdefault(s, {}).setdefault(o, 0) + 1
                V.add(o)
                Q.add(s)
                if pre_s == -1: # 統計每個句子開頭第一個狀态的次數
                    PI[s] = PI.setdefault(s, 0) + 1
                else: # 統計狀态pre_s到狀态s的次數
                    A[pre_s][s] = A.setdefault(pre_s, {}).setdefault(s, 0) + 1
                pre_s = s #切換到下一個狀态
    # 機率歸一化
    for i in A.keys():
        prob_sum = 0
        for j in A[i].keys():
            prob_sum += A[i][j]
        for j in A[i].keys():
            A[i][j] = 1.0 * A[i][j] / prob_sum

    for i in B.keys():
        prob_sum = 0
        for j in B[i].keys():
            prob_sum += B[i][j]
        for j in B[i].keys():
            B[i][j] = 1.0 * B[i][j] / prob_sum

    prob_sum = sum(PI.values())
    for i in PI.keys():
        PI[i] = 1.0 * PI[i] / prob_sum
    print "finished training ..."

    return A, B, PI, V, Q

def saveModel(A, B, PI, V, Q):
    with open("A.param", "wb") as outfile:
        pickle.dump(A, outfile)
    with open("B.param", "wb") as outfile:
        pickle.dump(B, outfile)
    with open("PI.param", "wb") as outfile:
        pickle.dump(PI, outfile)
    with open("V.param", "wb") as outfile:
        pickle.dump(V, outfile)
    with open("Q.param", "wb") as outfile:
        pickle.dump(Q, outfile)

#維特比
def predict(X, A, B, PI, V, Q):
    W = [{} for t in range(len(X))] #相當于書上的δ
    path = {}
    for s in Q:
        W[0][s] = 1.0 * PI.get(s, DEFAULT_PROB) * B.get(s, {}).get(X[0], DEFAULT_PROB) #0時刻狀态為s的機率
        path[s] = [s]
    for t in range(1, len(X)):
        new_path = {}
        for s in Q: #兩輪循環暴力求解
            max_prob = MIN_PROB
            max_s = ''
            for pre_s in Q:
                prob = W[t-1][pre_s] * \
                       A.get(pre_s, {}).get(s, DEFAULT_PROB) * \
                       B.get(s, {}).get(X[t], DEFAULT_PROB)
                (max_prob, max_s) = max((max_prob, max_s), (prob, pre_s)) #全由第一個prob決定
            W[t][s] = max_prob #t時刻狀态為s的最大機率
            tmp = deepcopy(path[max_s])
            tmp.append(s)
            new_path[s] = tmp
        path = new_path
    (max_prob, max_s) = max((W[len(X)-1][s], s) for s in Q)# 最後一個時刻各個狀态的機率的最大的
    return path[max_s]

def getModel():
    with open("A.param", "rb") as infile:
        A = pickle.load(infile)
    with open("B.param", "rb") as infile:
        B = pickle.load(infile)
    with open("PI.param", "rb") as infile:
        PI = pickle.load(infile)
    with open("V.param", "rb") as infile:
        V = pickle.load(infile)
    with open("Q.param", "rb") as infile:
        Q = pickle.load(infile)     
    return A, B, PI, V, Q

def test(A, B, PI, V, Q):
    print "start testing"
    with open(test_path, "rb") as infile, \
         open(output_path, "wb") as outfile:
        X_test = []
        y_test = []
        for line in infile:
            segs = line.strip().split('\t')
            if len(segs) != 2: # 遇到空行時
                if len(X_test) == 0:#一整句 比如NBAD
                    continue
                preds = predict(X_test, A, B, PI, V, Q)
                for vals in zip(X_test, y_test, preds):
                    outfile.write("\t".join(vals) + "\n")   
                outfile.write("\n")
                X_test = []
                y_test = []
            else:
                o = segs[0] # t時刻的觀測o
                s = segs[1] # t時刻的狀态s       
                X_test.append(o)
                y_test.append(s)

    print "finished testing"

def main():
    if is_train:
        A, B, PI, V, Q = train()
        saveModel(A, B, PI, V, Q)
    else:
        A, B, PI, V, Q = getModel()

    test(A, B, PI, V, Q)

if __name__ == '__main__':
    main()