天天看點

基于CRF的命名實體識别思路與實作

本文參考了https://github.com/liuhuanyong的CRF實作分詞的思路

CRF的實作思路類似于HMM,需要求解幾個機率(詞與詞的轉移機率,狀态與狀态的轉移機率、發射機率、初始詞機率),然後用verbiter方法求解,verbiter方法的原理簡單來說就是給出目前狀态,求解最有可能轉移至該狀态的上一個狀态,這個原理和思路也是實作CRF的核心。

首先給出宗成慶老師PPT的一個關于CRF中文分詞例子(實體識别無非是把字轉為詞,訓練樣本是帶有标記的):

基于CRF的命名實體識别思路與實作
基于CRF的命名實體識别思路與實作
基于CRF的命名實體識别思路與實作
基于CRF的命名實體識别思路與實作
基于CRF的命名實體識别思路與實作
基于CRF的命名實體識别思路與實作

由宗成慶老師PPT的例子可以看到,若使用CRF實作中文分詞,總結需要以下幾個機率:

1.詞與詞的轉移機率:如下圖第一項目前字被标記為B時上一個字為null的機率,但是在本人的實作中,僅僅計算了詞與詞之間的轉移機率,即某詞和詞之間轉移機率不為None,則f為1,λ為某詞和詞之間的轉移機率,否則f=0。如下圖若第三項'乒'轉移至'乒'的機率不為None,則f(乒,乓,B) = word_trans(乒,乓) 

基于CRF的命名實體識别思路與實作

2.初始詞機率:從上圖第一項f(null,乒,B)則發現又要計算一個機率:null->句子的首詞的轉移機率,本人的實作中,直接計算每個句子的首詞在訓練樣本中出現的機率strat_word(乒)代替f(null,乒,B)

3.發射機率:即上圖的第二項f(乒,B),所謂發射機率即為在某個狀态中,某個詞出現的可能性有多大。如狀态B中有['乒':0.03,'乓':0.02,'我':0.06]

4.轉移機率:verbiter方法的原理簡單來說就是給出目前狀态,求解最有可能轉移至該狀态的上一個狀态。基于這種思路,和原理,下圖的式子便很好了解了,即若目前狀态為B,求解最有可能轉移至狀态B的上一狀态,式中Teb則是E轉移至B的機率,Tsb則是S轉移至B的機率

基于CRF的命名實體識别思路與實作

故求解出以上幾個機率,則實作verbiter方法無非是套公式了。

現給出實作思路:

                1.根據語料庫求解狀态轉移機率 (根據tag求解 B-LOG --> I--LOG)

                2.根據語料庫求解詞與詞的轉移機率

                3.根據語料庫求解發射機率 (B-LOG中南京的機率)

                4.根據語料庫求解初始詞機率

                5.根據vebiter方法求解

                    句子:陳鼎立畢業于西南科技大學

                    輸入分詞結果:陳 鼎立 畢業 于 西南 科技 大學

                    1)初始化, R1x = W1x = l1*f(null,陳,B) + l2*f(陳,B) + l3*f(陳,B,鼎立)

                        約定:第一項的初始詞為陳的機率,第二項為狀态B中陳的發射機率,第三項為陳->鼎立的轉移機率

                        注意:句子的第二個詞開始則是第i-1個詞轉移至第i個詞的機率 + 第i個詞屬于某個狀态的機率 + 第i個詞轉移至第i+1個詞的機率

                    2)循環,Rb = max{Teb*Re,Tsb*Rs}*Wb

                        約定:Teb為E--B的權重

                    3)回溯

                        根據最後一個詞的狀态回溯路徑

代碼:首先說明,本人在實作的過程中忘記有幾個機率要求,故訓練模型(參數估計)部分代碼寫的非常混亂,不過了解上面的内容和思路,本人的代碼是否參考都無太大意義。

1.訓練狀态轉移機率和發射機率

class CRF_train:
    def __init__(self):
        self.state_list = ['O','B-LOCATION','I-LOCATION','O-LOCATION','B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION','B-PERSON','B-TIME']
        self.trans_dict = {}
        self.emit_dict = {}
        self.count_dict = {}
        self.word_trans = {}
        self.word_count = {}
        self.words_list = []

    def init(self):
        for state in self.state_list:
            self.emit_dict[state] = {}
            self.count_dict[state] = 0

        for state in self.state_list:
            self.trans_dict[state] = {}
            for state1 in self.state_list:
                self.trans_dict[state][state1] = 0

    def train(self):
        self.init()
        for line in open('../data/train.txt',encoding='utf-8'):
            if line:
                line = line.strip()
                word_list = line.split(' ')
                char_list = []

                for word in word_list:
                    word1,tag = word.split('/')
                    char_list.append((word1,tag))

                for i in range(len(char_list) - 1):
                    self.trans_dict[char_list[i][1]][char_list[i+1][1]] += 1
                    self.count_dict[char_list[i][1]] += 1

                for i in range(len(char_list)):
                    state = char_list[i][1]
                    word = char_list[i][0]
                    if word not in self.emit_dict[state]:
                        self.emit_dict[state][word] = 1
                    else:
                        self.emit_dict[state][word] += 1

                for i in range(len(char_list)):
                    word = char_list[i][0]
                    if word not in self.word_count:
                        self.word_count[word] = 1
                    else:
                        self.word_count[word] += 1
            else:
                continue

        for state in self.state_list:
            for state1 in self.state_list:
                self.trans_dict[state][state1] = self.trans_dict[state][state1] / self.count_dict[state]

        for state in self.state_list:
            for word in self.emit_dict[state]:
                self.emit_dict[state][word] = self.emit_dict[state][word] / self.count_dict[state]

        self.save_model(self.emit_dict, './model/emit_dict.txt')
        self.save_model(self.trans_dict, './model/trans_dict.txt')

    def save_model(self,word_dict,model_path):
        f = open(model_path,'w')
        f.write(str(word_dict))
        f.close()

if __name__ == '__main__':
    ct = CRF_train()
    ct.train()
           

2.訓練詞與詞之間的轉移機率,注意的是求解詞與詞之間的轉移機率前,首先需要将訓練樣本中的不重複的詞提取出來,然後憑此來求解詞與詞之間的轉移機率,還有的是,在我的實際實作中,訓練樣本中不重複的詞有80000多個,若采用求解狀态轉移機率的思路,初始化和計算一個80000乘80000的矩陣是非常不現實的,故正确的思路是應像是求解發射機率的思路,即先初始化各個詞為 {詞1:{},詞2:{}},然後根據訓練樣本統計詞與詞轉移的詞頻,再除以每個詞出現的總次數(需要統計每個詞出現的頻率,即詞1、詞2出現的頻率)即求解出詞與詞之間的轉移機率。結果應如{詞1:{詞2:0.03,詞3:0.05},詞2:{詞1:0.02}}的形式。

def save_model(model_path, word_dict):
    f = open(model_path, 'w')
    f.write(str(word_dict))
    f.close()


def load_model(model_path):
    f = open(model_path,'r')
    a = f.read()
    word_dict = eval(a)
    f.close()
    return word_dict


def init(word_dict):
    word_trans = {}
    for word in word_dict:
        word_trans[word] = {}
    return word_trans


if __name__ == '__main__':
    word_dict = load_model('./model/set_list.txt')
    word_trans = init(word_dict)
    count_dict = {}

    for line in open('../data/train.txt', encoding='utf-8'):
        words_list = []
        if line:
            line = line.strip()
            wl = line.split(' ')
            for w in wl:
                word,tag = w.split('/')
                words_list.append(word)
                if word not in count_dict:
                    count_dict[word] = 1
                else:
                    count_dict[word] += 1

            for i in range(len(words_list) - 1):
                if words_list[i+1] not in word_trans[words_list[i]]:
                    word_trans[words_list[i]][words_list[i + 1]] = 1
                else:
                    word_trans[words_list[i]][words_list[i + 1]] += 1
    save_model('./model/word_trans.txt', word_trans)
    save_model('./model/count_dict.txt', word_trans)
    for key in word_trans.keys():
        for key1 in word_trans[key].keys():
            word_trans[key][key1] = word_trans[key][key1]/count_dict[key]


    save_model('./model/prob_word_trans.txt',word_trans)
           

3.初始詞機率,你懂的,就那麼求

def save_model(word_dict,model_path):
    f = open(model_path,'w')
    f.write(str(word_dict))
    f.close()

if __name__ == '__main__':
    start_dict = {}
    line_index = 0
    for line in open('../data/train.txt', encoding='utf-8'):
        if line:
            line = line.strip()
            word_list = line.split(' ')
            init_word,init_tag = word_list[0].split('/')
            if init_word not in start_dict:
                start_dict[init_word] = 1
            else:
                start_dict[init_word] += 1

            line_index += 1

    for key in start_dict:
        start_dict[key] = start_dict[key] / line_index

    save_model(start_dict,'./model/start_word.txt')
           

4.實體識别代碼

verbiter的原理我上面說過,就是在目前狀态下,求解上一最有可能轉移至該狀态的狀态。

class CRF_ner:
    def __init__(self):
        trans_path = './model/trans_dict.txt'
        emit_path = './model/emit_dict.txt'
        word_trans_path = './model/prob_word_trans.txt'
        start_word_path = './model/start_word.txt'

        self.prob_trans = self.load_model(trans_path)
        self.prob_emit = self.load_model(emit_path)
        self.prob_word_trans = self.load_model(word_trans_path)
        self.prob_start_word = self.load_model(start_word_path)

    def load_model(self,model_path):
        f = open(model_path,'r')
        a = f.read()
        word_dict = eval(a)
        f.close()
        return word_dict

    def verbiter(self,sent,state_list):
        V = [{}]
        path = {}
        #state_list = ['O', 'B-LOCATION', 'I-LOCATION', 'O-LOCATION', 'B-ORGANIZATION', 'I-ORGANIZATION',
        #'O-ORGANIZATION', 'B-PERSON', 'B-TIME']
        #初始化
        for state in state_list:
            if self.prob_word_trans.get(sent[0],0) == 0:
                V[0][state] = self.prob_start_word.get(sent[0],0) + self.prob_emit[state].get(sent[0],0)
            else:
                V[0][state] = self.prob_start_word.get(sent[0], 0) + self.prob_emit[state].get(sent[0], 0) + self.prob_word_trans[sent[0]].get(sent[1], 0)
            path[state] = [state]
        for i in range(1,len(sent)):
            V.append({})
            newpath = {}
            state_path = []
            for state in state_list:
                if i == len(sent) - 1:
                    if self.prob_word_trans.get(sent[i-1], 0) == 0:
                        W = self.prob_emit[state].get(sent[i], 0)
                    else:
                        W = self.prob_word_trans[sent[i - 1]].get(sent[i], 0) + self.prob_emit[state].get(sent[i], 0)
                else:
                    W = (self.prob_word_trans[sent[i - 1]].get(sent[i], 0) if self.prob_word_trans.get(sent[i-1],0) != 0 else 0) + self.prob_emit[state].get(sent[i],0) + (self.prob_word_trans[sent[i]].get(sent[i + 1], 0) if self.prob_word_trans.get(sent[i],0) != 0 else 0)
                for state1 in state_list:
                    R = V[i-1][state1] * self.prob_trans[state1].get(state,0)
                    state_path.append((R,state1))
                if state_path == []:
                    (prob,y) = (0.0,'O')
                else:
                    (prob,y) = max(state_path)
                V[i][state] = prob * W
                newpath[state] = path[y] + [state]
            path = newpath
        (prob, state) = max([(V[len(sent) - 1][y], y) for y in state_list])
        return (prob,path[state])

    def cut(self,sent):
        print('========開始計算========')
        state_list = ['O', 'B-LOCATION', 'I-LOCATION', 'O-LOCATION', 'B-ORGANIZATION', 'I-ORGANIZATION','O-ORGANIZATION', 'B-PERSON', 'B-TIME']
        prob,pos_list = self.verbiter(sent,state_list)
        result = []
        sub_result = []
        for i in range(len(pos_list)-1):
            if pos_list[i] in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION'] and pos_list[i+1] in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION']:
                sub_result.append(sent[i])
            elif pos_list[i] in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION'] and pos_list[i+1] not in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION']:
                result.append(sub_result)
                sub_result = []

        last = len(pos_list) - 1
        print(pos_list)
        if pos_list[last-1] in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION'] and pos_list[last] in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION']:
            sub_result.append(sent[last])
            result.append(sub_result)
        elif pos_list[last-1] not in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION'] and pos_list[last] in ['B-ORGANIZATION','I-ORGANIZATION','O-ORGANIZATION']:
            sub_result.append(sent[last])
            result.append(sub_result)

        entities = []
        for entity_list in result:
            entity = ''
            for tmp in entity_list:
                entity += tmp
            entities.append(entity)
        return entities

if __name__ == '__main__':
    sent = ['陳','鼎立','畢業','于','西南','科技','大學']
    # sent = ['中國','政府','要求','美方','遵守','條約']
    # sent = ['清華大學', '在', '北京市', '海', '澱', '區', '清', '華', '園', '1', '号']
    # sent = ['清華大學','副','校長','尤政','宣布','成立','人工','智能','研究院','張钹','院士','擔任','新','研究院','院長']
    ce = CRF_ner()
    result = ce.cut(sent)
    print(result)
           

以上就是全部内容,各位看官需要注意的是在計算詞與詞的轉移機率之前,需要在訓練樣本提取不重複的各詞作為訓練詞與詞轉移機率的前提,在代碼中沒有給出(因為寫完就給删了。。。。。懶的寫),沒幾行,看官可以自行實作。

本人水準較差。若沒有使有緣看到此文的看官浪費時間 ,便好。

本文中沒有給出CRF的公式詳細推導和原理介紹,我覺得給出一個例子用以實作代碼這樣效果是最好的,若各位看官需要詳細學習和了解CRF,嗯,自行百度

繼續閱讀