論文A Fast and Accurate Dependency Parser using Neural Networks程式注釋
依存句法分析
最近接觸到依存句法分析,參考的是14年Chen&Manning的論文(A Fast and Accurate Dependency Parser using Neural Networks),下載下傳了一個程式(原網址),注釋下,當做備忘
背景介紹
據說這篇文檔比較出名是由于作者最早把神經網絡用在句法分析上,後來的大多是更改了下神經網絡的類型,或調整了層次或激活函數
依存句法分析中,句子中的每個單詞都會議一定的關系依賴于另外一個單詞或定義的root節點,同時本文的解析算法要求句子結構是投射的,投射簡單來說就是每隔大弧都會包括着小弧,而不會出現小弧穿過大弧的情況例如下圖分别為非投射和投射:

自己用思維導圖形式整理下相關内容,如下:
此神經網絡的輸入包括三個部分:
- 目前移位操作時棧和buffer中的部分詞彙以及棧中部分單詞的依賴單詞
- 上述單詞的詞性
-
上述依賴單詞的依賴關系
此程式上的詞嵌入用的是C&W模型,神經網絡的結構如下圖:
程式中涉及到的類主要如下:基于神經網絡的依存句法分析依存句法分析背景介紹 下面是特征處理部分代碼注釋,其他部分見個人GitHub基于神經網絡的依存句法分析依存句法分析背景介紹
# -*- coding:utf-8 -*-
# 訓練參數,模型參數,定義訓練時需要的類,
import os
import numpy as np
import datetime
from enum import Enum
from general_utils import get_pickle, dump_pickle, get_vocab_dict
NULL = "<null>" # 空字元
UNK = "<unk>" # 不存在于字典中的字元,比如隻出現一次的詞
ROOT = "<root>" # root節點
pos_prefix = "<p>:"
dep_prefix = "<d>:"
punc_pos = ["''", "``", ":", ".", ","]
today_date = str(datetime.datetime.now().date())
class DataConfig: # data, embedding, model path etc.
# Data Paths
data_dir_path = "./data" # 資料檔案夾
train_path = "train.conll" # 訓練資料
valid_path = "dev.conll"
test_path = "test.conll"
# embedding
embedding_file = "en-cw.txt" # 詞嵌入檔案
# model saver儲存模型
model_dir = "params_" + today_date
model_name = "parser.weights"
# summary儲存檔案位置
summary_dir = "params_" + today_date
train_summ_dir = "train_summaries"
test_summ_dir = "valid_summaries"
# dump - vocab儲存字典對應表
dump_dir = "./data/dump"
word_vocab_file = "word2idx.pkl"
pos_vocab_file = "pos2idx.pkl"
dep_vocab_file = "dep2idx.pkl"
# dump - embedding詞嵌入檔案名
word_emb_file = "word_emb.pkl" # 2d array
pos_emb_file = "pos_emb.pkl" # 2d array
dep_emb_file = "dep_emb.pkl" # 2d array
class ModelConfig(object): # 模型參數
# Input
word_features_types = None
pos_features_types = None
dep_features_types = None
num_features_types = None
embedding_dim = # 輸入詞嵌入的次元,使用C&W模型
# hidden_size
l1_hidden_size =
l2_hidden_size =
# output
num_classes =
# Vocab
word_vocab_size = None
pos_vocab_size = None
dep_vocab_size = None
# num_epochs
n_epochs =
# batch_size
batch_size =
# dropout
keep_prob =
reg_val =
# learning_rate
lr =
# load existing vocab
load_existing_vocab = False
# summary
write_summary_after_epochs =
# valid run
run_valid_after_epochs =
class SettingsConfig: # enabling and disabling features, feature types
# Features
use_word = True
use_pos = True
use_dep = True
is_lower = True
class Flags(Enum):
TRAIN =
VALID =
TEST =
class Token(object):
'''定義token類,用來儲存訓練模型資料,包括單詞,依賴,關系等'''
def __init__(self, token_id, word, pos, dep, head_id):
self.token_id = token_id # token index
self.word = word.lower() if SettingsConfig.is_lower else word
self.pos = pos_prefix + pos
self.dep = dep_prefix + dep
self.head_id = head_id # head token index
self.predicted_head_id = None
self.left_children = list()
self.right_children = list()
def is_root_token(self):
if self.word == ROOT:
return True
return False
def is_null_token(self):
if self.word == NULL:
return True
return False
def is_unk_token(self):
if self.word == UNK:
return True
return False
def reset_predicted_head_id(self):
self.predicted_head_id = None
NULL_TOKEN = Token(-, NULL, NULL, NULL, -)
ROOT_TOKEN = Token(-, ROOT, ROOT, ROOT, -)
UNK_TOKEN = Token(-, UNK, UNK, UNK, -)
class Sentence(object):
# 定義句子類,儲存棧,buffer,依賴,預測結果等
def __init__(self, tokens):
self.Root = Token(-, ROOT, ROOT, ROOT, -)
self.tokens = tokens
self.buff = [token for token in self.tokens]
self.stack = [self.Root]
self.dependencies = []
self.predicted_dependencies = []
def load_gold_dependency_mapping(self):
for token in self.tokens:
if token.head_id != -:
token.parent = self.tokens[token.head_id]
if token.head_id > token.token_id:
token.parent.left_children.append(token.token_id)
else:
token.parent.right_children.append(token.token_id)
else:
token.parent = self.Root
for token in self.tokens:
token.left_children.sort()
token.right_children.sort()
def update_child_dependencies(self, curr_transition): # 更新左右子依賴
if curr_transition == :
head = self.stack[-]
dependent = self.stack[-]
elif curr_transition == :
head = self.stack[-]
dependent = self.stack[-]
if head.token_id > dependent.token_id:
head.left_children.append(dependent.token_id)
head.left_children.sort()
else:
head.right_children.append(dependent.token_id)
head.right_children.sort()
# dependent.head_id = head.token_id
# 判斷若有子依賴,則傳回子依賴,沒有則傳回空
def get_child_by_index_and_depth(self, token, index, direction, depth):
if depth == :
return token
if direction == "left":
if len(token.left_children) > index:
return self.get_child_by_index_and_depth(
self.tokens[token.left_children[index]], index, direction, depth - )
return NULL_TOKEN
else:
if len(token.right_children) > index:
return self.get_child_by_index_and_depth(
self.tokens[token.right_children[::-][index]], index, direction, depth - )
return NULL_TOKEN
def get_legal_labels(self): # 目前可以允許的移位操作
labels = ([] if len(self.stack) > else [])
labels += ([] if len(self.stack) >= else [])
labels += [] if len(self.buff) > else []
return labels
def get_transition_from_current_state(self): # 由棧和buffer的資料根據标準弧生成移位操作
if len(self.stack) < :
return # shift
stack_token_0 = self.stack[-]#
stack_token_1 = self.stack[-]
if stack_token_1.token_id >= and stack_token_1.head_id == stack_token_0.token_id: # left arc
return #倒數第二個的支配詞是倒數第一個,則産生一個左弧
elif stack_token_1.token_id >= - and stack_token_0.head_id == stack_token_1.token_id \
and stack_token_0.token_id not in map(lambda x: x.head_id, self.buff):
return # 倒數第一個的支配詞是倒數第二個,并且倒數第一個不是其他詞的支配詞,則産生一個右弧
else:
return if len(self.buff) != else None
def update_state_by_transition(self, transition, gold=True): # 根據移位更新棧和buffer的資料
if transition is not None:
if transition == : # shift
self.stack.append(self.buff[])#棧中增加buffer第一個資料
self.buff = self.buff[:] if len(self.buff) > else []#buffer删除第一個資料
elif transition == : # left arc
self.dependencies.append(
(self.stack[-], self.stack[-])) if gold else self.predicted_dependencies.append(
(self.stack[-], self.stack[-]))
self.stack = self.stack[:-] + self.stack[-:]
elif transition == : # right arc
self.dependencies.append(
(self.stack[-], self.stack[-])) if gold else self.predicted_dependencies.append(
(self.stack[-], self.stack[-]))
self.stack = self.stack[:-]
def reset_to_initial_state(self): # 棧和buffer複原
self.buff = [token for token in self.tokens]
self.stack = [self.Root]
def clear_prediction_dependencies(self):
self.predicted_dependencies = []
def clear_children_info(self):
for token in self.tokens:
token.left_children = []
token.right_children = []
class Dataset(object): # 資料集和索引
def __init__(self, model_config, train_data, valid_data, test_data, feature_extractor):
self.model_config = model_config
self.train_data = train_data
self.valid_data = valid_data
self.test_data = test_data
self.feature_extractor = feature_extractor
# Vocab
self.word2idx = None
self.idx2word = None
self.pos2idx = None
self.idx2pos = None
self.dep2idx = None
self.idx2dep = None
# Embedding Matrix
self.word_embedding_matrix = None
self.pos_embedding_matrix = None
self.dep_embedding_matrix = None
# input & outputs
self.train_inputs, self.train_targets = None, None
self.valid_inputs, self.valid_targets = None, None
self.test_inputs, self.test_targets = None, None
def build_vocab(self):
'''生成三種類型輸入的字典索引矩陣,'''
all_words = set()
all_pos = set()
all_dep = set()
for sentence in self.train_data:
all_words.update(set(map(lambda x: x.word, sentence.tokens)))
all_pos.update(set(map(lambda x: x.pos, sentence.tokens)))
all_dep.update(set(map(lambda x: x.dep, sentence.tokens)))
all_words.add(ROOT_TOKEN.word)
all_words.add(NULL_TOKEN.word)
all_words.add(UNK_TOKEN.word)
all_pos.add(ROOT_TOKEN.pos)
all_pos.add(NULL_TOKEN.pos)
all_pos.add(UNK_TOKEN.pos)
all_dep.add(ROOT_TOKEN.dep)
all_dep.add(NULL_TOKEN.dep)
all_dep.add(UNK_TOKEN.dep)
word_vocab = list(all_words)
pos_vocab = list(all_pos)
dep_vocab = list(all_dep)
word2idx = get_vocab_dict(word_vocab)
idx2word = {idx: word for (word, idx) in word2idx.items()}
pos2idx = get_vocab_dict(pos_vocab)
idx2pos = {idx: pos for (pos, idx) in pos2idx.items()}
dep2idx = get_vocab_dict(dep_vocab)
idx2dep = {idx: dep for (dep, idx) in dep2idx.items()}
self.word2idx = word2idx
self.idx2word = idx2word
self.pos2idx = pos2idx
self.idx2pos = idx2pos
self.dep2idx = dep2idx
self.idx2dep = idx2dep
def build_embedding_matrix(self):
'''生成特征矩陣,word為讀入C&W詞嵌入檔案'''
# load word vectors
word_vectors = {}
embedding_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.embedding_file), "r").readlines()
for line in embedding_lines:
sp = line.strip().split()
word_vectors[sp[]] = [float(x) for x in sp[:]]
# word embedding
self.model_config.word_vocab_size = len(self.word2idx)
word_embedding_matrix = np.asarray(
np.random.normal(, , size=(self.model_config.word_vocab_size, self.model_config.embedding_dim)),
dtype=np.float32)
for (word, idx) in self.word2idx.items():
if word in word_vectors:
word_embedding_matrix[idx] = word_vectors[word]
elif word.lower() in word_vectors:
word_embedding_matrix[idx] = word_vectors[word.lower()]
self.word_embedding_matrix = word_embedding_matrix
# pos embedding
self.model_config.pos_vocab_size = len(self.pos2idx)
pos_embedding_matrix = np.asarray(
np.random.normal(, , size=(self.model_config.pos_vocab_size, self.model_config.embedding_dim)),
dtype=np.float32)
self.pos_embedding_matrix = pos_embedding_matrix
# dep embedding
self.model_config.dep_vocab_size = len(self.dep2idx)
dep_embedding_matrix = np.asarray(
np.random.normal(, , size=(self.model_config.dep_vocab_size, self.model_config.embedding_dim)),
dtype=np.float32)
self.dep_embedding_matrix = dep_embedding_matrix
def convert_data_to_ids(self): # 将資料轉換成模型的輸入
self.train_inputs, self.train_targets = self.feature_extractor. \
create_instances_for_data(self.train_data, self.word2idx, self.pos2idx, self.dep2idx)
# self.valid_inputs, self.valid_targets = self.feature_extractor.\
# create_instances_for_data(self.valid_data, self.word2idx)
# self.test_inputs, self.test_targets = self.feature_extractor.\
# create_instances_for_data(self.test_data, self.word2idx)
def add_to_vocab(self, words, prefix=""):
idx = len(self.word2idx)
for token in words:
if prefix + token not in self.word2idx:
self.word2idx[prefix + token] = idx
self.idx2word[idx] = prefix + token
idx +=
class FeatureExtractor(object):
'''棧和buffer的移位操作'''
def __init__(self, model_config):
self.model_config = model_config
def extract_from_stack_and_buffer(self, sentence, num_words=):
tokens = []
# 棧中後三個資料和buffer中前三個資料,不過則補null
tokens.extend([NULL_TOKEN for _ in range(num_words - len(sentence.stack))])
tokens.extend(sentence.stack[-num_words:])
tokens.extend(sentence.buff[:num_words])
tokens.extend([NULL_TOKEN for _ in range(num_words - len(sentence.buff))])
return tokens # 6 features
def extract_children_from_stack(self, sentence, num_stack_words=):
children_tokens = []
# 遞歸,棧中後兩個詞的左右子依賴
for i in range(num_stack_words):
if len(sentence.stack) > i:
#lc0棧中後兩個詞第一個左依賴單詞
#rc0棧中後兩個詞第一個右依賴單詞
#lc1棧中後兩個詞第二個左依賴單詞
#rc1棧中後兩個詞第二個右依賴單詞
#llc0棧中後兩個詞第一個左依賴單詞的依賴詞
#rrc0棧中後兩個詞第一個右依賴單詞的依賴詞
lc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "left", )
rc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "right", )
lc1 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "left",
) if lc0 != NULL_TOKEN else NULL_TOKEN#
rc1 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "right",
) if rc0 != NULL_TOKEN else NULL_TOKEN
llc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "left",
) if lc0 != NULL_TOKEN else NULL_TOKEN
rrc0 = sentence.get_child_by_index_and_depth(sentence.stack[-i - ], , "right",
) if rc0 != NULL_TOKEN else NULL_TOKEN
children_tokens.extend([lc0, rc0, lc1, rc1, llc0, rrc0])
else:
[children_tokens.append(NULL_TOKEN) for _ in range()]
return children_tokens # 12 features
def extract_for_current_state(self, sentence, word2idx, pos2idx, dep2idx):
direct_tokens = self.extract_from_stack_and_buffer(sentence, num_words=) # 棧和緩沖的資料,棧後三,緩沖前三
children_tokens = self.extract_children_from_stack(sentence, num_stack_words=) # 棧中最後兩個詞的左右依賴單詞
word_features = []
pos_features = []
dep_features = []
# Word features -> 18
word_features.extend(map(lambda x: x.word, direct_tokens)) # 單詞
word_features.extend(map(lambda x: x.word, children_tokens))
# pos features -> 18
pos_features.extend(map(lambda x: x.pos, direct_tokens)) # 詞屬性
pos_features.extend(map(lambda x: x.pos, children_tokens))
# dep features -> 12 (only children)
dep_features.extend(map(lambda x: x.dep, children_tokens)) # 依賴關系
word_input_ids = [word2idx[word] if word in word2idx else word2idx[UNK_TOKEN.word] for word in
word_features] # 生成對應序号
pos_input_ids = [pos2idx[pos] if pos in pos2idx else pos2idx[UNK_TOKEN.pos] for pos in pos_features]
dep_input_ids = [dep2idx[dep] if dep in dep2idx else dep2idx[UNK_TOKEN.dep] for dep in dep_features]
return [word_input_ids, pos_input_ids, dep_input_ids] # 48 features
def create_instances_for_data(self, data, word2idx, pos2idx, dep2idx):
lables = []
word_inputs = []
pos_inputs = []
dep_inputs = []
for i, sentence in enumerate(data): # 按句子輸入資料
num_words = len(sentence.tokens) # 句子總共單詞數
for _ in range(num_words * ):
word_input, pos_input, dep_input = self.extract_for_current_state(sentence, word2idx, pos2idx, dep2idx)
legal_labels = sentence.get_legal_labels() # 确認目前可以進行的移位方式
curr_transition = sentence.get_transition_from_current_state() # 标準弧解析,擷取目前進行的移位操作,0rigth,1left,2shift
if curr_transition is None:
break
assert legal_labels[curr_transition] == # 目前的移位操作是可以進行的
# Update left/right children
if curr_transition != :
sentence.update_child_dependencies(curr_transition) # 在左右移位操作時,更行子依賴單詞
sentence.update_state_by_transition(curr_transition) # 按照移位條件調整棧和buffer中的資料
lables.append(curr_transition) # 目前移位
word_inputs.append(word_input)#儲存所有的移位資料當做輸入
pos_inputs.append(pos_input)
dep_inputs.append(dep_input)
else:
sentence.reset_to_initial_state() # 一個句子結束
# reset stack and buffer to default state
sentence.reset_to_initial_state()
targets = np.zeros((len(lables), self.model_config.num_classes), dtype=np.int32)
targets[np.arange(len(targets)), lables] =
return [word_inputs, pos_inputs, dep_inputs], targets
class DataReader(object):
def __init__(self):
print "A"
def read_conll(self, token_lines):
tokens = []
for each in token_lines:
fields = each.strip().split("\t")
token_index = int(fields[]) -
word = fields[]
pos = fields[]
dep = fields[]
head_index = int(fields[]) -
token = Token(token_index, word, pos, dep, head_index)
tokens.append(token)
sentence = Sentence(tokens)
# sentence.load_gold_dependency_mapping()
return sentence
def read_data(self, data_lines):
data_objects = []
token_lines = []
for token_conll in data_lines:
token_conll = token_conll.strip()
if len(token_conll) > :
token_lines.append(token_conll)
else:
data_objects.append(self.read_conll(token_lines))
token_lines = []
if len(token_lines) > :
data_objects.append(self.read_conll(token_lines))
return data_objects
def load_datasets(load_existing_dump=False): # load_existing_dump是否加載已經存在的字典
model_config = ModelConfig()
data_reader = DataReader()
train_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.train_path), "r").readlines() # 加載路徑
valid_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.valid_path), "r").readlines()
test_lines = open(os.path.join(DataConfig.data_dir_path, DataConfig.test_path), "r").readlines()
# Load data
train_data = data_reader.read_data(train_lines)
print ("Loaded Train data")
valid_data = data_reader.read_data(valid_lines)
print ("Loaded Dev data")
test_data = data_reader.read_data(test_lines)
print ("Loaded Test data")
feature_extractor = FeatureExtractor(model_config)
dataset = Dataset(model_config, train_data, valid_data, test_data,
feature_extractor) # dataset定義dataset類,儲存資料,詞典,矩陣,輸入輸出
# Vocab processing
if load_existing_dump: # 加載已經存在的字典和向量檔案
dataset.word2idx = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.word_vocab_file))
dataset.idx2word = {idx: word for (word, idx) in dataset.word2idx.items()}
dataset.pos2idx = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.pos_vocab_file))
dataset.idx2pos = {idx: pos for (pos, idx) in dataset.pos2idx.items()}
dataset.dep2idx = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.dep_vocab_file))
dataset.idx2dep = {idx: dep for (dep, idx) in dataset.dep2idx.items()}
dataset.model_config.load_existing_vocab = True
print "loaded existing Vocab!"
dataset.word_embedding_matrix = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.word_emb_file))
dataset.pos_embedding_matrix = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.pos_emb_file))
dataset.dep_embedding_matrix = get_pickle(os.path.join(DataConfig.dump_dir, DataConfig.dep_emb_file))
print "loaded existing embedding matrix!"
else:
dataset.build_vocab()
dump_pickle(dataset.word2idx, os.path.join(DataConfig.dump_dir, DataConfig.word_vocab_file))
dump_pickle(dataset.pos2idx, os.path.join(DataConfig.dump_dir, DataConfig.pos_vocab_file))
dump_pickle(dataset.dep2idx, os.path.join(DataConfig.dump_dir, DataConfig.dep_vocab_file))
dataset.model_config.load_existing_vocab = True
print "Vocab Build Done!"
dataset.build_embedding_matrix()
print "embedding matrix Build Done"
dump_pickle(dataset.word_embedding_matrix, os.path.join(DataConfig.dump_dir, DataConfig.word_emb_file))
dump_pickle(dataset.pos_embedding_matrix, os.path.join(DataConfig.dump_dir, DataConfig.pos_emb_file))
dump_pickle(dataset.dep_embedding_matrix, os.path.join(DataConfig.dump_dir, DataConfig.dep_emb_file))
print "converting data into ids.."
dataset.convert_data_to_ids() # 轉換成直接訓練的格式
print "Done!"
dataset.model_config.word_features_types = len(dataset.train_inputs[][])
dataset.model_config.pos_features_types = len(dataset.train_inputs[][])
dataset.model_config.dep_features_types = len(dataset.train_inputs[][])
dataset.model_config.num_features_types = dataset.model_config.word_features_types + \
dataset.model_config.pos_features_types + dataset.model_config.dep_features_types
dataset.model_config.num_classes = len(dataset.train_targets[])
return dataset