#encoding:utf-8
import sys
import pickle
from copy import deepcopy
is_train = False
DEFAULT_PROB = 0.000000000001
MIN_PROB = -1 * float('inf')
train_path = "train.in"
test_path = "test.in"
output_path = "test.out"
#統計 各個次數 作為 各個機率
def train():
print "start training ..."
# 以下5個元素是HMM模型的參數
V = set() # 觀測集合
Q = set() # 狀态集合
A = {} # 狀态轉移機率矩陣,P(狀态|狀态),是一個二層dict 具體是 pre_state->(state->prob)
B = {} # 觀測機率矩陣,P(觀測|狀态),是一個二層dict 具體是 state->(observ->prob)
PI = {} # 初始狀态機率向量
# 統計模型參數
with open(train_path, "rb") as infile:
pre_s = -1 # t-1時刻的狀态
for line in infile:
segs = line.rstrip().split('\t')
if len(segs) != 2: # 遇到空行時
pre_s = -1
else:
o = segs[0] # t時刻的觀測o
s = segs[1] # t時刻的狀态s
# 統計狀态s到觀測o的次數
B[s][o] = B.setdefault(s, {}).setdefault(o, 0) + 1
V.add(o)
Q.add(s)
if pre_s == -1: # 統計每個句子開頭第一個狀态的次數
PI[s] = PI.setdefault(s, 0) + 1
else: # 統計狀态pre_s到狀态s的次數
A[pre_s][s] = A.setdefault(pre_s, {}).setdefault(s, 0) + 1
pre_s = s #切換到下一個狀态
# 機率歸一化
for i in A.keys():
prob_sum = 0
for j in A[i].keys():
prob_sum += A[i][j]
for j in A[i].keys():
A[i][j] = 1.0 * A[i][j] / prob_sum
for i in B.keys():
prob_sum = 0
for j in B[i].keys():
prob_sum += B[i][j]
for j in B[i].keys():
B[i][j] = 1.0 * B[i][j] / prob_sum
prob_sum = sum(PI.values())
for i in PI.keys():
PI[i] = 1.0 * PI[i] / prob_sum
print "finished training ..."
return A, B, PI, V, Q
def saveModel(A, B, PI, V, Q):
with open("A.param", "wb") as outfile:
pickle.dump(A, outfile)
with open("B.param", "wb") as outfile:
pickle.dump(B, outfile)
with open("PI.param", "wb") as outfile:
pickle.dump(PI, outfile)
with open("V.param", "wb") as outfile:
pickle.dump(V, outfile)
with open("Q.param", "wb") as outfile:
pickle.dump(Q, outfile)
#維特比
def predict(X, A, B, PI, V, Q):
W = [{} for t in range(len(X))] #相當于書上的δ
path = {}
for s in Q:
W[0][s] = 1.0 * PI.get(s, DEFAULT_PROB) * B.get(s, {}).get(X[0], DEFAULT_PROB) #0時刻狀态為s的機率
path[s] = [s]
for t in range(1, len(X)):
new_path = {}
for s in Q: #兩輪循環暴力求解
max_prob = MIN_PROB
max_s = ''
for pre_s in Q:
prob = W[t-1][pre_s] * \
A.get(pre_s, {}).get(s, DEFAULT_PROB) * \
B.get(s, {}).get(X[t], DEFAULT_PROB)
(max_prob, max_s) = max((max_prob, max_s), (prob, pre_s)) #全由第一個prob決定
W[t][s] = max_prob #t時刻狀态為s的最大機率
tmp = deepcopy(path[max_s])
tmp.append(s)
new_path[s] = tmp
path = new_path
(max_prob, max_s) = max((W[len(X)-1][s], s) for s in Q)# 最後一個時刻各個狀态的機率的最大的
return path[max_s]
def getModel():
with open("A.param", "rb") as infile:
A = pickle.load(infile)
with open("B.param", "rb") as infile:
B = pickle.load(infile)
with open("PI.param", "rb") as infile:
PI = pickle.load(infile)
with open("V.param", "rb") as infile:
V = pickle.load(infile)
with open("Q.param", "rb") as infile:
Q = pickle.load(infile)
return A, B, PI, V, Q
def test(A, B, PI, V, Q):
print "start testing"
with open(test_path, "rb") as infile, \
open(output_path, "wb") as outfile:
X_test = []
y_test = []
for line in infile:
segs = line.strip().split('\t')
if len(segs) != 2: # 遇到空行時
if len(X_test) == 0:#一整句 比如NBAD
continue
preds = predict(X_test, A, B, PI, V, Q)
for vals in zip(X_test, y_test, preds):
outfile.write("\t".join(vals) + "\n")
outfile.write("\n")
X_test = []
y_test = []
else:
o = segs[0] # t時刻的觀測o
s = segs[1] # t時刻的狀态s
X_test.append(o)
y_test.append(s)
print "finished testing"
def main():
if is_train:
A, B, PI, V, Q = train()
saveModel(A, B, PI, V, Q)
else:
A, B, PI, V, Q = getModel()
test(A, B, PI, V, Q)
if __name__ == '__main__':
main()