天天看點

HIT 語音信号處理 Lab3 HMM實作部分

Description

《語音信号處理》Lab3

基于HMM模型實作孤立詞識别系統

  • 本着不重複造輪子的原則,使用hmmlearn庫搭建viterbi-BaumWelch的算法架構
  • 首先使用viterbi算法疊代計算模型初始參數,然後使用BaumWelch算法進行模型參數的疊代重估
  • 使用scikits.talkbox中的mfcc計算代碼(python3原因裝不上直接拷貝代碼)

封裝類

import pickle
import re
from collections import Counter
from talk.mfcc import mfcc
from scipy.io import wavfile
from hmmlearn import hmm
import numpy as np
from scipy.special import logsumexp


class Speech:
    ''' 
    Speech
    --------------
    語料的包裝類
    '''

    def __init__(self, dirName, fileName):
        self.fileName = fileName    # file name
        self.dirName = dirName
        self.features = None    # feature matrix
        self.soundSamplerate, self.sound = wavfile.read(dirName + fileName)
        
        pattern = re.compile(r'(\d+)_(\d+)_(\d+).wav')
        self.personId, self.categoryId, self.idx = pattern.match(self.fileName).group(1,2,3)

    def extractFeature(self):
        ''' mfcc feature extraction '''
        self.features = mfcc(self.sound, nwin=int(
            self.soundSamplerate * 0.03), fs=self.soundSamplerate, nceps=24)[0]


class SpeechRecognizer:
    ''' 
    SpeechRecognizer
    ----------------------
    HMM模型的包裝類
    '''

    def __init__(self, categoryId):
        self.categoryId = categoryId
        self.trainData = []
        self.flagDataStacked = False
        self.trainDataLengths = None
        self.hmmModel = None

        self.nComp = 5  # number of states
        self.n_iter = 10    # number of iterations

    def initModelParam(self, nComp, n_iter):
        ''' 初始化 Gaussian HMM Model的參數 '''
        self.nComp = nComp  # number of states
        self.n_iter = n_iter    # number of iterations

    def stackTrainData(self):
        if self.flagDataStacked:
            return
        self.trainDataLengths = [x.shape[0] for x in self.trainData]
        # 對數組進行堆疊[[frames * [mfccwidth*1]],...] => [[frames x mfccwidth],...]
        self.trainData = np.vstack(self.trainData)
        self.flagDataStacked = True


    def saveHmmModel(self):
        with open("models/md_{0}.pkl".format(self.categoryId), "wb") as file: 
            pickle.dump(self.hmmModel, file)

    def loadHmmModel(self):
        with open("models/md_{0}.pkl".format(self.categoryId), "rb") as file: 
            self.hmmModel = pickle.load(file)

    def initHmmModel(self,load_model=False):
        '''
        initHmmModel
        -----------------
        初始化Hmm模型,如果load_model為真則加載已有模型
        '''
        if load_model:
            self.loadHmmModel()
        else:
            self.getHmmModel()

    def getHmmModel(self):
        ''' 進行Gaussian Hmm模型的初始化 '''

        # Gaussian HMM
        # nMix  GMM 總的狀态數目
        # transmat_prior 轉移機率矩陣的先驗
        # startprob_prior 開始轉移機率的先驗
        # convariance_type HMM 中使用的協方差矩陣的類型
        #   預設‘diag’代表對于每一個狀态都是用一個對角協方差矩陣
        # n_iter 進行 WM 算法疊代的次數
        model = hmm.GaussianHMM(n_components=self.nComp, n_iter=self.n_iter)
        self.hmmModel = model
        self.hmmModel._init(self.trainData, self.trainDataLengths)

    def trainHmmModel(self):
        ''' 訓練Gaussian Hmm 模型 '''
        self.hmmModel.init_params = ''
        self.hmmModel.fit(self.trainData, self.trainDataLengths)


    def viterbi(self):
        '''
            使用 viterbi 算法推測目前訓練資料及模型情況下最可能的隐藏序列
            獲得隐藏序列之後通過簡單統計方法估計模型參數
        '''
        state_seq_list = []
        for (i, j) in iter_from_X_lengths(self.trainData, self.trainDataLengths):
            framelogprob = self.hmmModel._compute_log_likelihood(
                self.trainData[i:j])
            logprob, state_sequence = self.hmmModel._do_viterbi_pass(
                framelogprob)
            state_seq_list.append(state_sequence)
        # 利用 state_seq_list 計算新的模型參數

        # 計算 startprob
        accum_map = Counter([x[0] for x in state_seq_list])
        self.hmmModel.startprob_ = np.array([accum_map[x]/len(self.trainDataLengths) for x in range(self.nComp)], dtype=np.float64)
        acc_transmat = np.zeros((self.nComp, self.nComp))
        for rki, state_seq in enumerate(state_seq_list):
            for rkj, state in enumerate(state_seq[1:]):
                pre_state = state_seq[rkj-1]
                acc_transmat[pre_state][state] = acc_transmat[pre_state][state] + 1
        self.hmmModel.transmat_ = acc_transmat / np.sum(acc_transmat, axis=1)[:, None]
        
def iter_from_X_lengths(X, lengths):
    if lengths is None:
        yield 0, len(X)
    else:
        n_samples = X.shape[0]
        end = np.cumsum(lengths).astype(np.int32)
        start = end - lengths
        if end[-1] > n_samples:
            raise ValueError("more than {:d} samples in lengths array {!s}"
                             .format(n_samples, lengths))

        for i in range(len(lengths)):
            yield start[i], end[i]
           

Code

github

繼續閱讀