天天看點

中文命名實體識别ner

轉載自:https://github.com/stephen-v/zh-NER-keras

原理方面知識:https://www.cnblogs.com/vipyoumay/p/ner-chinese-keras.html

中文命名實體識别一般來說采用的方法為bilstm+crf

這裡不說原理方面,直接用keras實作中文命名實體識别

首先crf函數不在keras官方裡,由keras社群裡貢獻

首先安裝keras-contrib

pip install git+https://www.github.com/keras-team/keras-contrib.git

資料和代碼直接在github上git下來

1. 資料預處理

import numpy as np
from collections import Counter
from keras.preprocessing.sequence import pad_sequences
import pickle
import platform
from keras.models import Sequential
from keras.layers import Embedding, Bidirectional, LSTM
from keras_contrib.layers import CRF


def load_data():
    train = _parse_data(open(r'C:\Users\admin\Desktop\text_cf\ner\train_data.data','rb'))
    test = _parse_data(open(r'C:\Users\admin\Desktop\text_cf\ner\test_data.data', 'rb'))

    word_counts = Counter(row[0].lower() for sample in train for row in sample)
    vocab = [w for w, f in iter(word_counts.items()) if f >= 2]
    chunk_tags = ['O', 'B-PER', 'I-PER', 'B-LOC', 'I-LOC', "B-ORG", "I-ORG"]

    # save initial config data
    with open(r'C:\Users\admin\Desktop\text_cf\ner\config.pkl', 'wb') as outp:
        pickle.dump((vocab, chunk_tags), outp)

    train = _process_data(train, vocab, chunk_tags)
    test = _process_data(test, vocab, chunk_tags)
    return train, test, (vocab, chunk_tags)


def _parse_data(fh):
    #  in windows the new line is '\r\n\r\n' the space is '\r\n' . 
    #  so if you use windows system,
    #  you have to use recorsponding instructions

    if platform.system() == 'Windows':
        split_text = '\n'
    else:
        split_text = '\n'

    string = fh.read().decode('utf-8')
    data = [[row.split() for row in sample.split(split_text)] for
            sample in
            string.strip().split(split_text + split_text)]
    fh.close()
    return data


def _process_data(data, vocab, chunk_tags, maxlen=None, onehot=False):
    if maxlen is None:
        maxlen = max(len(s) for s in data)
    word2idx = dict((w, i) for i, w in enumerate(vocab))
    # set to <unk> (index 1) if not in vocab
    x = [[word2idx.get(w[0].lower(), 1) for w in s] for s in data]  

    y_chunk = [[chunk_tags.index(w[1]) for w in s] for s in data]

    x = pad_sequences(x, maxlen)  # left padding

    y_chunk = pad_sequences(y_chunk, maxlen, value=-1)

    if onehot:
        y_chunk = np.eye(len(chunk_tags), dtype='float32')[y_chunk]
    else:
        y_chunk = np.expand_dims(y_chunk, 2)
    return x, y_chunk


def process_data(data, vocab, maxlen=100):
    word2idx = dict((w, i) for i, w in enumerate(vocab))
    x = [word2idx.get(w[0].lower(), 1) for w in data]
    length = len(x)
    x = pad_sequences([x], maxlen)  # left padding
    return x, length
           

2. 建立模型并訓練和驗證

EMBED_DIM = 200
BiRNN_UNITS = 200


def create_model(train=True):
    if train:
        (train_x, train_y), (test_x, test_y), (vocab, chunk_tags) = load_data()
    else:
        with open(r'C:\Users\admin\Desktop\text_cf\ner\config.pkl', 'rb') as inp:
            (vocab, chunk_tags) = pickle.load(inp)
    model = Sequential()
    model.add(Embedding(len(vocab), EMBED_DIM, mask_zero=True))  # Random embedding
    model.add(Bidirectional(LSTM(BiRNN_UNITS // 2, return_sequences=True)))
    crf = CRF(len(chunk_tags), sparse_target=True)
    model.add(crf)
    model.summary()
    model.compile('adam', loss=crf.loss_function, metrics=[crf.accuracy])
    if train:
        return model, (train_x, train_y), (test_x, test_y)
    else:
        return model, (vocab, chunk_tags)

EPOCHS = 10
model, (train_x, train_y), (test_x, test_y) = create_model()
# train model
model.fit(train_x, train_y,batch_size=16,epochs=EPOCHS, validation_data=[test_x, test_y])

model.save(r'C:\Users\admin\Desktop\text_cf\ner\crf.h5')

model, (vocab, chunk_tags) = create_model(train=False)
predict_text = '中華人民共和國國務院總理周恩來在外交部長陳毅的陪同下,\
連續通路了埃塞俄比亞等非洲10國以及阿爾巴尼亞'
str, length = process_data(predict_text, vocab)
model.load_weights(r'C:\Users\admin\Desktop\text_cf\ner\crf.h5')
raw = model.predict(str)[0][-length:]
result = [np.argmax(row) for row in raw]
result_tags = [chunk_tags[i] for i in result]

per, loc, org = '', '', ''

for s, t in zip(predict_text, result_tags):
    if t in ('B-PER', 'I-PER'):
        per += ' ' + s if (t == 'B-PER') else s
    if t in ('B-ORG', 'I-ORG'):
        org += ' ' + s if (t == 'B-ORG') else s
    if t in ('B-LOC', 'I-LOC'):
        loc += ' ' + s if (t == 'B-LOC') else s

print(['person:' + per, 'location:' + loc, 'organzation:' + org])
           

繼續閱讀