天天看點

基于神經網絡的依存句法分析依存句法分析背景介紹

論文A Fast and Accurate Dependency Parser using Neural Networks程式注釋

依存句法分析

最近接觸到依存句法分析,參考的是14年Chen&Manning的論文(A Fast and Accurate Dependency Parser using Neural Networks),下載下傳了一個程式(原網址),注釋下,當做備忘

背景介紹

據說這篇文檔比較出名是由于作者最早把神經網絡用在句法分析上,後來的大多是更改了下神經網絡的類型,或調整了層次或激活函數

依存句法分析中,句子中的每個單詞都會議一定的關系依賴于另外一個單詞或定義的root節點,同時本文的解析算法要求句子結構是投射的,投射簡單來說就是每隔大弧都會包括着小弧,而不會出現小弧穿過大弧的情況例如下圖分别為非投射和投射:

基于神經網絡的依存句法分析依存句法分析背景介紹

自己用思維導圖形式整理下相關内容,如下:

基于神經網絡的依存句法分析依存句法分析背景介紹

此神經網絡的輸入包括三個部分:

  1. 目前移位操作時棧和buffer中的部分詞彙以及棧中部分單詞的依賴單詞
  2. 上述單詞的詞性
  3. 上述依賴單詞的依賴關系

    此程式上的詞嵌入用的是C&W模型,神經網絡的結構如下圖:

    基于神經網絡的依存句法分析依存句法分析背景介紹
    程式中涉及到的類主要如下:
    基于神經網絡的依存句法分析依存句法分析背景介紹
    下面是特征處理部分代碼注釋,其他部分見個人GitHub
# -*- coding:utf-8 -*-
# 訓練參數,模型參數,定義訓練時需要的類,
import os
import numpy as np
import datetime
from enum import Enum
from general_utils import get_pickle, dump_pickle, get_vocab_dict

NULL = "<null>"  # 空字元
UNK = "<unk>"  # 不存在于字典中的字元,比如隻出現一次的詞
ROOT = "<root>"  # root節點
pos_prefix = "<p>:"
dep_prefix = "<d>:"
punc_pos = ["''", "``", ":", ".", ","]

today_date = str(datetime.datetime.now().date())


class DataConfig:  # data, embedding, model path etc.
    # Data Paths
    data_dir_path = "./data"  # 資料檔案夾
    train_path = "train.conll"  # 訓練資料
    valid_path = "dev.conll"
    test_path = "test.conll"

    # embedding
    embedding_file = "en-cw.txt"  # 詞嵌入檔案

    # model saver儲存模型
    model_dir = "params_" + today_date
    model_name = "parser.weights"

    # summary儲存檔案位置
    summary_dir = "params_" + today_date
    train_summ_dir = "train_summaries"
    test_summ_dir = "valid_summaries"

    # dump - vocab儲存字典對應表
    dump_dir = "./data/dump"
    word_vocab_file = "word2idx.pkl"
    pos_vocab_file = "pos2idx.pkl"
    dep_vocab_file = "dep2idx.pkl"

    # dump - embedding詞嵌入檔案名
    word_emb_file = "word_emb.pkl"  # 2d array
    pos_emb_file = "pos_emb.pkl"  # 2d array
    dep_emb_file = "dep_emb.pkl"  # 2d array


class ModelConfig(object):  # 模型參數
    # Input
    word_features_types = None
    pos_features_types = None
    dep_features_types = None
    num_features_types = None
    embedding_dim =   # 輸入詞嵌入的次元,使用C&W模型

    # hidden_size
    l1_hidden_size = 
    l2_hidden_size = 

    # output
    num_classes = 

    # Vocab
    word_vocab_size = None
    pos_vocab_size = None
    dep_vocab_size = None

    # num_epochs
    n_epochs = 

    # batch_size
    batch_size = 

    # dropout
    keep_prob = 
    reg_val = 

    # learning_rate
    lr = 

    # load existing vocab
    load_existing_vocab = False

    # summary
    write_summary_after_epochs = 

    # valid run
    run_valid_after_epochs = 


class SettingsConfig:  # enabling and disabling features, feature types
    # Features
    use_word = True
    use_pos = True
    use_dep = True
    is_lower = True


class Flags(Enum):
    TRAIN = 
    VALID = 
    TEST = 


class Token(object):
    '''定義token類,用來儲存訓練模型資料,包括單詞,依賴,關系等'''

    def __init__(self, token_id, word, pos, dep, head_id):
        self.token_id = token_id  # token index
        self.word = word.lower() if SettingsConfig.is_lower else word
        self.pos = pos_prefix + pos
        self.dep = dep_prefix + dep
        self.head_id = head_id  # head token index
        self.predicted_head_id = None
        self.left_children = list()
        self.right_children = list()

    def is_root_token(self):
        if self.word == ROOT:
            return True
        return False

    def is_null_token(self):
        if self.word == NULL:
            return True
        return False

    def is_unk_token(self):
        if self.word == UNK:
            return True
        return False

    def reset_predicted_head_id(self):
        self.predicted_head_id = None


NULL_TOKEN = Token(-, NULL, NULL, NULL, -)
ROOT_TOKEN = Token(-, ROOT, ROOT, ROOT, -)
UNK_TOKEN = Token(-, UNK, UNK, UNK, -)


class Sentence(object):
    # 定義句子類,儲存棧,buffer,依賴,預測結果等
    def __init__(self, tokens):
        self.Root = Token(-, ROOT, ROOT, ROOT, -)
        self.tokens = tokens
        self.buff = [token for token in self.tokens]
        self.stack = [self.Root]
        self.dependencies = []
        self.predicted_dependencies = []

    def load_gold_dependency_mapping(self):
        for token in self.tokens:
            if token.head_id != -:
                token.parent = self.tokens[token.head_id]
                if token.head_id > token.token_id:
                    token.parent.left_children.append(token.token_id)
                else:
                    token.parent.right_children.append(token.token_id)
            else:
                token.parent = self.Root

        for token in self.tokens:
            token.left_children.sort()
            token.right_children.sort()

    def update_child_dependencies(self, curr_transition):  # 更新左右子依賴
        if curr_transition == :
            head = self.stack[-]
            dependent = self.stack[-]
        elif curr_transition == :
            head = self.stack[-]
            dependent = self.stack[-]

        if head.token_id > dependent.token_id:
            head.left_children.append(dependent.token_id)
            head.left_children.sort()
        else:
            head.right_children.append(dependent.token_id)
            head.right_children.sort()
            # dependent.head_id = head.token_id

    # 判斷若有子依賴,則傳回子依賴,沒有則傳回空
    def get_child_by_index_and_depth(self, token, index, direction, depth):
        if depth == :
            return token

        if direction == "left":
            if len(token.left_children) > index:
                return self.get_child_by_index_and_depth(
                    self.tokens[token.left_children[index]], index, direction, depth - )
            return NULL_TOKEN
        else:
            if len(token.right_children) > index:
                return self.get_child_by_index_and_depth(
                    self.tokens[token.right_children[::-][index]], index, direction, depth - )
            return NULL_TOKEN

    def get_legal_labels(self):  # 目前可以允許的移位操作
        labels = ([] if len(self.stack) >  else [])
        labels += ([] if len(self.stack) >=  else [])
        labels += [] if len(self.buff) >  else []
        return labels

    def get_transition_from_current_state(self):  # 由棧和buffer的資料根據标準弧生成移位操作
        if len(self.stack) < :
            return   # shift

        stack_token_0 = self.stack[-]#
        stack_token_1 = self.stack[-]
        if stack_token_1.token_id >=  and stack_token_1.head_id == stack_token_0.token_id:  # left arc
            return #倒數第二個的支配詞是倒數第一個,則産生一個左弧
        elif stack_token_1.token_id >= - and stack_token_0.head_id == stack_token_1.token_id \
                and stack_token_0.token_id not in map(lambda x: x.head_id, self.buff):
            return   # 倒數第一個的支配詞是倒數第二個,并且倒數第一個不是其他詞的支配詞,則産生一個右弧
        else:
            return  if len(self.buff) !=  else None

    def update_state_by_transition(self, transition, gold=True):  # 根據移位更新棧和buffer的資料
        if transition is not None:
            if transition == :  # shift
                self.stack.append(self.buff[])#棧中增加buffer第一個資料
                self.buff = self.buff[:] if len(self.buff) >  else []#buffer删除第一個資料
            elif transition == :  # left arc
                self.dependencies.append(
                    (self.stack[-], self.stack[-])) if gold else self.predicted_dependencies.append(
                    (self.stack[-], self.stack[-]))
                self.stack = self.stack[:-] + self.stack[-:]
            elif transition == :  # right arc
                self.dependencies.append(
                    (self.stack[-], self.stack[-])) if gold else self.predicted_dependencies.append(
                    (self.stack[-], self.stack[-]))
                self.stack = self.stack[:-]

    def reset_to_initial_state(self):  # 棧和buffer複原
        self.buff = [token for token in self.tokens]
        self.stack = [self.Root]

    def clear_prediction_dependencies(self):
        self.predicted_dependencies = []

    def clear_children_info(self):
        for token in self.tokens:
            token.left_children = []
            token.right_children = []


class Dataset(object):  # 資料集和索引
    def __init__(self, model_config, train_data, valid_data, test_data, feature_extractor):
        self.model_config = model_config
        self.train_data = train_data
        self.valid_data = valid_data
        self.test_data = test_data
        self.feature_extractor = feature_extractor

        # Vocab
        self.word2idx = None
        self.idx2word = None
        self.pos2idx = None
        self.idx2pos = None
        self.dep2idx = None
        self.idx2dep = None

        # Embedding Matrix
        self.word_embedding_matrix = None
        self.pos_embedding_matrix = None
        self.dep_embedding_matrix = None

        # input & outputs
        self.train_inputs, self.train_targets = None, None
        self.valid_inputs, self.valid_targets = None, None
        self.test_inputs, self.test_targets = None, None

    def build_vocab(self):
        '''生成三種類型輸入的字典索引矩陣,'''
        all_words = set()
        all_pos = set()
        all_dep = set()

        for sentence in self.train_data:
            all_words.update(set(map(lambda x: x.word, sentence.tokens)))
            all_pos.update(set(map(lambda x: x.pos, sentence.tokens)))
            all_dep.update(set(map(lambda x: x.dep, sentence.tokens)))

        all_words.add(ROOT_TOKEN.word)
        all_words.add(NULL_TOKEN.word)
        all_words.add(UNK_TOKEN.word)

        all_pos.add(ROOT_TOKEN.pos)
        all_pos.add(NULL_TOKEN.pos)
        all_pos.add(UNK_TOKEN.pos)

        all_dep.add(ROOT_TOKEN.dep)
        all_dep.add(NULL_TOKEN.dep)
        all_dep.add(UNK_TOKEN.dep)

        word_vocab = list(all_words)
        pos_vocab = list(all_pos)
        dep_vocab = list(all_dep)

        word2idx = get_vocab_dict(word_vocab)
        idx2word = {idx: word for (word, idx) in word2idx.items()}

        pos2idx = get_vocab_dict(pos_vocab)
        idx2pos = {idx: pos for (pos, idx) in pos2idx.items()}

        dep2idx = get_vocab_dict(dep_vocab)
        idx2dep = {idx: dep for (dep, idx) in dep2idx.items()}

        self.word2idx = word2idx
        self.idx2word = idx2word

        self.pos2idx = pos2idx
        self.idx2pos = idx2pos

        self.dep2idx = dep2idx
        self.idx2dep = idx2dep

    def build_embedding_matrix(self):
        '''生成特征矩陣,word為讀入C&W詞嵌入檔案'''
        # load word vectors
        word_vectors = {}
        embedding_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.embedding_file), "r").readlines()
        for line in embedding_lines:
            sp = line.strip().split()
            word_vectors[sp[]] = [float(x) for x in sp[:]]

        # word embedding
        self.model_config.word_vocab_size = len(self.word2idx)
        word_embedding_matrix = np.asarray(
            np.random.normal(, , size=(self.model_config.word_vocab_size, self.model_config.embedding_dim)),
            dtype=np.float32)
        for (word, idx) in self.word2idx.items():
            if word in word_vectors:
                word_embedding_matrix[idx] = word_vectors[word]
            elif word.lower() in word_vectors:
                word_embedding_matrix[idx] = word_vectors[word.lower()]
        self.word_embedding_matrix = word_embedding_matrix

        # pos embedding
        self.model_config.pos_vocab_size = len(self.pos2idx)
        pos_embedding_matrix = np.asarray(
            np.random.normal(, , size=(self.model_config.pos_vocab_size, self.model_config.embedding_dim)),
            dtype=np.float32)
        self.pos_embedding_matrix = pos_embedding_matrix

        # dep embedding
        self.model_config.dep_vocab_size = len(self.dep2idx)
        dep_embedding_matrix = np.asarray(
            np.random.normal(, , size=(self.model_config.dep_vocab_size, self.model_config.embedding_dim)),
            dtype=np.float32)
        self.dep_embedding_matrix = dep_embedding_matrix

    def convert_data_to_ids(self):  # 将資料轉換成模型的輸入
        self.train_inputs, self.train_targets = self.feature_extractor. \
            create_instances_for_data(self.train_data, self.word2idx, self.pos2idx, self.dep2idx)

        # self.valid_inputs, self.valid_targets = self.feature_extractor.\
        #     create_instances_for_data(self.valid_data, self.word2idx)
        # self.test_inputs, self.test_targets = self.feature_extractor.\
        #     create_instances_for_data(self.test_data, self.word2idx)

    def add_to_vocab(self, words, prefix=""):
        idx = len(self.word2idx)
        for token in words:
            if prefix + token not in self.word2idx:
                self.word2idx[prefix + token] = idx
                self.idx2word[idx] = prefix + token
                idx += 


class FeatureExtractor(object):
    '''棧和buffer的移位操作'''

    def __init__(self, model_config):
        self.model_config = model_config

    def extract_from_stack_and_buffer(self, sentence, num_words=):
        tokens = []
        # 棧中後三個資料和buffer中前三個資料,不過則補null
        tokens.extend([NULL_TOKEN for _ in range(num_words - len(sentence.stack))])
        tokens.extend(sentence.stack[-num_words:])

        tokens.extend(sentence.buff[:num_words])
        tokens.extend([NULL_TOKEN for _ in range(num_words - len(sentence.buff))])
        return tokens  # 6 features

    def extract_children_from_stack(self, sentence, num_stack_words=):
        children_tokens = []
        # 遞歸,棧中後兩個詞的左右子依賴
        for i in range(num_stack_words):
            if len(sentence.stack) > i:
                #lc0棧中後兩個詞第一個左依賴單詞
                #rc0棧中後兩個詞第一個右依賴單詞
                #lc1棧中後兩個詞第二個左依賴單詞
                #rc1棧中後兩個詞第二個右依賴單詞
                #llc0棧中後兩個詞第一個左依賴單詞的依賴詞
                #rrc0棧中後兩個詞第一個右依賴單詞的依賴詞
                lc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "left", )
                rc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "right", )

                lc1 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "left",
                                                            ) if lc0 != NULL_TOKEN else NULL_TOKEN#
                rc1 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "right",
                                                            ) if rc0 != NULL_TOKEN else NULL_TOKEN

                llc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "left",
                                                             ) if lc0 != NULL_TOKEN else NULL_TOKEN
                rrc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "right",
                                                             ) if rc0 != NULL_TOKEN else NULL_TOKEN

                children_tokens.extend([lc0, rc0, lc1, rc1, llc0, rrc0])
            else:
                [children_tokens.append(NULL_TOKEN) for _ in range()]

        return children_tokens  # 12 features


    def extract_for_current_state(self, sentence, word2idx, pos2idx, dep2idx):
        direct_tokens = self.extract_from_stack_and_buffer(sentence, num_words=)  # 棧和緩沖的資料,棧後三,緩沖前三
        children_tokens = self.extract_children_from_stack(sentence, num_stack_words=)  # 棧中最後兩個詞的左右依賴單詞

        word_features = []
        pos_features = []
        dep_features = []

        # Word features -> 18
        word_features.extend(map(lambda x: x.word, direct_tokens))  # 單詞
        word_features.extend(map(lambda x: x.word, children_tokens))

        # pos features -> 18
        pos_features.extend(map(lambda x: x.pos, direct_tokens))  # 詞屬性
        pos_features.extend(map(lambda x: x.pos, children_tokens))

        # dep features -> 12 (only children)
        dep_features.extend(map(lambda x: x.dep, children_tokens))  # 依賴關系

        word_input_ids = [word2idx[word] if word in word2idx else word2idx[UNK_TOKEN.word] for word in
                          word_features]  # 生成對應序号
        pos_input_ids = [pos2idx[pos] if pos in pos2idx else pos2idx[UNK_TOKEN.pos] for pos in pos_features]
        dep_input_ids = [dep2idx[dep] if dep in dep2idx else dep2idx[UNK_TOKEN.dep] for dep in dep_features]

        return [word_input_ids, pos_input_ids, dep_input_ids]  # 48 features

    def create_instances_for_data(self, data, word2idx, pos2idx, dep2idx):
        lables = []
        word_inputs = []
        pos_inputs = []
        dep_inputs = []
        for i, sentence in enumerate(data):  # 按句子輸入資料
            num_words = len(sentence.tokens)  # 句子總共單詞數

            for _ in range(num_words * ):
                word_input, pos_input, dep_input = self.extract_for_current_state(sentence, word2idx, pos2idx, dep2idx)
                legal_labels = sentence.get_legal_labels()  # 确認目前可以進行的移位方式
                curr_transition = sentence.get_transition_from_current_state()  # 标準弧解析,擷取目前進行的移位操作,0rigth,1left,2shift
                if curr_transition is None:
                    break
                assert legal_labels[curr_transition] ==   # 目前的移位操作是可以進行的

                # Update left/right children
                if curr_transition != :
                    sentence.update_child_dependencies(curr_transition)  # 在左右移位操作時,更行子依賴單詞

                sentence.update_state_by_transition(curr_transition)  # 按照移位條件調整棧和buffer中的資料
                lables.append(curr_transition)  # 目前移位
                word_inputs.append(word_input)#儲存所有的移位資料當做輸入
                pos_inputs.append(pos_input)
                dep_inputs.append(dep_input)

            else:
                sentence.reset_to_initial_state()  # 一個句子結束

            # reset stack and buffer to default state
            sentence.reset_to_initial_state()

        targets = np.zeros((len(lables), self.model_config.num_classes), dtype=np.int32)
        targets[np.arange(len(targets)), lables] = 

        return [word_inputs, pos_inputs, dep_inputs], targets


class DataReader(object):
    def __init__(self):
        print "A"

    def read_conll(self, token_lines):
        tokens = []
        for each in token_lines:
            fields = each.strip().split("\t")
            token_index = int(fields[]) - 
            word = fields[]
            pos = fields[]
            dep = fields[]
            head_index = int(fields[]) - 
            token = Token(token_index, word, pos, dep, head_index)
            tokens.append(token)
        sentence = Sentence(tokens)

        # sentence.load_gold_dependency_mapping()
        return sentence

    def read_data(self, data_lines):
        data_objects = []
        token_lines = []
        for token_conll in data_lines:
            token_conll = token_conll.strip()
            if len(token_conll) > :
                token_lines.append(token_conll)
            else:
                data_objects.append(self.read_conll(token_lines))
                token_lines = []
        if len(token_lines) > :
            data_objects.append(self.read_conll(token_lines))
        return data_objects


def load_datasets(load_existing_dump=False):  # load_existing_dump是否加載已經存在的字典
    model_config = ModelConfig()

    data_reader = DataReader()
    train_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.train_path), "r").readlines()  # 加載路徑
    valid_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.valid_path), "r").readlines()
    test_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.test_path), "r").readlines()

    # Load data
    train_data = data_reader.read_data(train_lines)
    print ("Loaded Train data")
    valid_data = data_reader.read_data(valid_lines)
    print ("Loaded Dev data")
    test_data = data_reader.read_data(test_lines)
    print ("Loaded Test data")

    feature_extractor = FeatureExtractor(model_config)
    dataset = Dataset(model_config, train_data, valid_data, test_data,
                      feature_extractor)  # dataset定義dataset類,儲存資料,詞典,矩陣,輸入輸出

    # Vocab processing
    if load_existing_dump:  # 加載已經存在的字典和向量檔案
        dataset.word2idx = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.word_vocab_file))
        dataset.idx2word = {idx: word for (word, idx) in dataset.word2idx.items()}
        dataset.pos2idx = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.pos_vocab_file))
        dataset.idx2pos = {idx: pos for (pos, idx) in dataset.pos2idx.items()}
        dataset.dep2idx = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.dep_vocab_file))
        dataset.idx2dep = {idx: dep for (dep, idx) in dataset.dep2idx.items()}

        dataset.model_config.load_existing_vocab = True
        print "loaded existing Vocab!"
        dataset.word_embedding_matrix = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.word_emb_file))
        dataset.pos_embedding_matrix = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.pos_emb_file))
        dataset.dep_embedding_matrix = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.dep_emb_file))
        print "loaded existing embedding matrix!"

    else:
        dataset.build_vocab()
        dump_pickle(dataset.word2idx, os.path.join(DataConfig.dump_dir, DataConfig.word_vocab_file))
        dump_pickle(dataset.pos2idx, os.path.join(DataConfig.dump_dir, DataConfig.pos_vocab_file))
        dump_pickle(dataset.dep2idx, os.path.join(DataConfig.dump_dir, DataConfig.dep_vocab_file))
        dataset.model_config.load_existing_vocab = True
        print "Vocab Build Done!"
        dataset.build_embedding_matrix()
        print "embedding matrix Build Done"
        dump_pickle(dataset.word_embedding_matrix, os.path.join(DataConfig.dump_dir, DataConfig.word_emb_file))
        dump_pickle(dataset.pos_embedding_matrix, os.path.join(DataConfig.dump_dir, DataConfig.pos_emb_file))
        dump_pickle(dataset.dep_embedding_matrix, os.path.join(DataConfig.dump_dir, DataConfig.dep_emb_file))

    print "converting data into ids.."
    dataset.convert_data_to_ids()  # 轉換成直接訓練的格式
    print "Done!"
    dataset.model_config.word_features_types = len(dataset.train_inputs[][])
    dataset.model_config.pos_features_types = len(dataset.train_inputs[][])
    dataset.model_config.dep_features_types = len(dataset.train_inputs[][])
    dataset.model_config.num_features_types = dataset.model_config.word_features_types + \
                                              dataset.model_config.pos_features_types + dataset.model_config.dep_features_types
    dataset.model_config.num_classes = len(dataset.train_targets[])

    return dataset
           

繼續閱讀