天天看點

FastSpeech2 代碼閱讀筆記——資料處理(2)

在進行資料處理前,先将LJSpeech資料集下載下傳至本地,在FastSpeech2論文中使用強制對齊工具MFA從文本和音頻中提取對齊資訊,代碼解析時使用的是作者提供的已經提取好的對齊資訊檔案,感興趣的讀者也可以自行下載下傳、安裝MFA提取對齊資訊。根據倉庫作者提供的連結下載下傳的每一個*.TextGrid檔案與一個音頻對應,其中記錄了word_level和phone_level兩個級别的文本、對應持續時間(機關為秒)等資訊,具體格式如下圖所示,主要差別就是phone_level比word_level經精細,顆粒度更小。

1.TextGrid檔案(MFA對齊檔案)詳解

FastSpeech2 代碼閱讀筆記——資料處理(2)

2.prepare_align.py

該檔案就是相當于一個接口,針對不同的資料集調用對應的檔案函數進行資料準備,主要就是調用資料集對應的prepare_align函數處理資料

import argparse
 
 import yaml
 
 from preprocessor import ljspeech, aishell3, libritts
 
 
 def main(config):
     if "LJSpeech" in config["dataset"]:
         ljspeech.prepare_align(config)
     if "AISHELL3" in config["dataset"]:
         aishell3.prepare_align(config)
     if "LibriTTS" in config["dataset"]:
         libritts.prepare_align(config)
 
 
 if __name__ == "__main__":
     parser = argparse.ArgumentParser()
     parser.add_argument("config", type=str, help="path to preprocess.yaml")  # 加載對應的yaml檔案,便于後面添加相應參數
     args = parser.parse_args()
 
     config = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader)
     main(config)
            

3.LibriTTS.py

雖然該檔案隻定義了prepare_align函數,但是該函數也隻是簡單的将LJSpeech資料集中的音頻資料和文本資料進行了處理并儲存,并沒有提取對齊資訊。

import os
 import librosa
 import numpy as np
 from scipy.io import wavfile
 from tqdm import tqdm
 from text import _clean_text
 
 
 def prepare_align(config):
     in_dir = config["path"]["corpus_path"] # "/home/ming/Data/LibriTTS/train-clean-360"
     out_dir = config["path"]["raw_path"] # "./raw_data/LibriTTS"
     sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
     max_wav_value = config["preprocessing"]["audio"]["max_wav_value"] # 32768
     cleaners = config["preprocessing"]["text"]["text_cleaners"] # english_cleaners
     # os.listdir() 傳回指定目錄下的所有檔案名和目錄名
     # 檔案名為speaker
     for speaker in tqdm(os.listdir(in_dir)):
         # os.path.join()将in_dir與speaker連接配接起來,傳回目錄下的所有檔案名和目錄名
         # in_dir/speaker/chapter
         for chapter in os.listdir(os.path.join(in_dir, speaker)):
             # 傳回in_dir/speaker/chapter目錄下的所有檔案
             # 該目錄下的檔案包括三種:.normalized.txt  .wav  .original.txt
             # file_name檔案名
             for file_name in os.listdir(os.path.join(in_dir, speaker, chapter)):
                 if file_name[-4:] != ".wav":
                     continue
                 # 100_121669_000001_000000.normalized.txt
                 # 100_121669_000001_000000.original.txt
                 # 100_121669_000001_000000.wav
                 # 不是wav檔案就跳過,取wav檔案的檔案名,不取字尾
                 base_name = file_name[:-4]
                 # .normalized.txt檔案中存着一句英語句子,如Tom, the Piper's Son
                 text_path = os.path.join(
                     in_dir, speaker, chapter, "{}.normalized.txt".format(base_name)
                 )
                 wav_path = os.path.join(
                     in_dir, speaker, chapter, "{}.wav".format(base_name)
                 )
                 # 讀取文本内容,如text=Tom, the Piper's Son
                 with open(text_path) as f:
                     text = f.readline().strip("\n")
                     
                 # ######## ①
                 
                 # 亂碼處理、大小寫處理、縮寫展開、空格處理、數字處理
                 text = _clean_text(text, cleaners)
                 # 建立檔案夾out_dir/speaker,且目錄存在不會觸發目錄存在異常
                 os.makedirs(os.path.join(out_dir, speaker), exist_ok=True)
                 # librosa音頻信号處理庫函數
                 # load 從檔案加載音頻資料,而且可以通過參數設定是否保留雙聲道,采樣率,重采樣類型
                 # 傳回類型wav為numpy.ndarray  _為sampling_rate
                 wav, _ = librosa.load(wav_path, sampling_rate)
                 # wav = wav / (max(|wav|) * 32768)
                 # 歸一化,好處1,消除奇異樣本資料的影響,好處2,cond
                 wav = wav / max(abs(wav)) * max_wav_value # 32768.0  ???
 
                 # 将numpy格式的wav寫入到指定檔案中,out_dir/speaker/{base_name}.wav,sr,數值類型轉換
                 wavfile.write(
                     os.path.join(out_dir, speaker, "{}.wav".format(base_name)),
                     sampling_rate,
                     # 設定改類型是由ndarray中的數值大小範圍決定的,int16:-32768~32768
                     wav.astype(np.int16),
                 )
                 # 打開out_dir/speaker/{base_name}.lab,
                 # 将從{base_name}.normalized.txt檔案中讀取出來,然後經過處理的text寫入到{base_name}.lab檔案中
                 with open(
                     os.path.join(out_dir, speaker, "{}.lab".format(base_name)),
                     "w",
                 ) as f1:
                     f1.write(text)           

4.preprocessor.py

該檔案中才是從下載下傳的TextGrid檔案中提取每條音頻對應的duration、pitch和energy資訊;其中的config是通過config/LibriTTS/preprocess.yaml檔案加載而來。

import os
 import random
 import json
 
 import tgt
 import librosa
 import numpy as np
 import pyworld as pw
 from scipy.interpolate import interp1d
 
 from tqdm import tqdm
 
 import audio as Audio
 
 # 定義處理所有資料的處理類
 class Preprocessor:
     def __init__(self, config):
         self.config = config
         # 原始資料的存放路徑"./raw_data/LibriTTS"
         self.in_dir = config["path"]["raw_path"] 
         # 資料處理後的儲存路徑./preprocessed_data/LibriTTS"
         self.out_dir = config["path"]["preprocessed_path"] 
         self.val_size = config["preprocessing"]["val_size"] # 512
         self.sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
         self.hop_length = config["preprocessing"]["stft"]["hop_length"] # 256
 
         # assert condition==False,raise AssertionError()
         # pitch.feature == phoneme_level或者frame_level時繼續運作,否則抛出異常
         assert config["preprocessing"]["pitch"]["feature"] in [
             "phoneme_level",
             "frame_level",
         ]
         # energy.feature == phoneme_level或者frame_level時繼續運作,否則抛出異常
         assert config["preprocessing"]["energy"]["feature"] in [
             "phoneme_level",
             "frame_level",
         ]
         # 是否進行pitch_phoneme_averaging
         self.pitch_phoneme_averaging = (
             config["preprocessing"]["pitch"]["feature"] == "phoneme_level"
         )
         # 是否進行energy_phoneme_averaging
         self.energy_phoneme_averaging = (
             config["preprocessing"]["energy"]["feature"] == "phoneme_level"
         )
         # 是否進行正則化 is True
         self.pitch_normalization = config["preprocessing"]["pitch"]["normalization"]
         self.energy_normalization = config["preprocessing"]["energy"]["normalization"]
         
         # 初始化STFT子產品
         self.STFT = Audio.stft.TacotronSTFT(
             config["preprocessing"]["stft"]["filter_length"],  # 1024
             config["preprocessing"]["stft"]["hop_length"],  # 256
             config["preprocessing"]["stft"]["win_length"],  # 1024
             config["preprocessing"]["mel"]["n_mel_channels"],  # 80
             config["preprocessing"]["audio"]["sampling_rate"],  # 22050
             config["preprocessing"]["mel"]["mel_fmin"],  # 0
             config["preprocessing"]["mel"]["mel_fmax"],  # 8000
         )
     
     # 提取所需要的資料
     def build_from_path(self):
         # out_dir:"./preprocessed_data/LibriTTS"
         # os.makedirs() 建立檔案夾,exist_ok = True, 存在的話不會抛出異常
         os.makedirs((os.path.join(self.out_dir, "mel")), exist_ok=True)
         os.makedirs((os.path.join(self.out_dir, "pitch")), exist_ok=True)
         os.makedirs((os.path.join(self.out_dir, "energy")), exist_ok=True)
         os.makedirs((os.path.join(self.out_dir, "duration")), exist_ok=True)
 
         print("Processing Data ...")
         out = list()
         n_frames = 0
         pitch_scaler = StandardScaler()
         energy_scaler = StandardScaler()
 
         # Compute pitch, energy, duration, and mel-spectrogram
         speakers = {}
         # in_dir "./raw_data/LibriTTS" 下的所有檔案都是說話人編号
         # tqdm 添加一個進度條
         for i, speaker in enumerate(tqdm(os.listdir(self.in_dir))):
             speakers[speaker] = i
             # in_dir/speaker 目錄下的檔案包括兩種類型:{base_name}.lab , {base_name}.wav
             for wav_name in os.listdir(os.path.join(self.in_dir, speaker)):
                 if ".wav" not in wav_name:
                     continue
                 # 基于音頻檔案的basename建構對應的對齊檔案路徑名
                 basename = wav_name.split(".")[0]
                 # out_dir/TextGrid/speaker/{base_name}.TextGrid
                 # tg_path 為某個句子的TextGrid檔案路徑
                 tg_path = os.path.join(
                     self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
                 )
                 if os.path.exists(tg_path):
                     print(tg_path)
 
                     # process_utterance() return
                     #     "|".join([basename, speaker, text(音素串), raw_text(音素串對應的文本)]),
                     #     self.remove_outlier(pitch),
                     #     self.remove_outlier(energy),
                     #     mel_spectrogram.shape[1],
 
                     ret = self.process_utterance(speaker, basename) # 提取單個音頻的mel、pitch、energy資料
                     if ret is None:
                         continue
                     else:
                         info, pitch, energy, n = ret # n是mel譜圖序列的總幀數
                     out.append(info) # 記錄info中文本相關的資料,是一個用“|”分割的字元串
 
                 if len(pitch) > 0:
                     # reshape(-1,1)轉換成一列
                     pitch_scaler.partial_fit(pitch.reshape((-1, 1)))
                 if len(energy) > 0:
                     energy_scaler.partial_fit(energy.reshape((-1, 1)))
 
                 n_frames += n
 
         print("Computing statistic quantities ...")
         # Perform normalization if necessary
         if self.pitch_normalization:
             pitch_mean = pitch_scaler.mean_[0]
             pitch_std = pitch_scaler.scale_[0]
         else:
             # A numerical trick to avoid normalization...
             pitch_mean = 0
             pitch_std = 1
         if self.energy_normalization:
             energy_mean = energy_scaler.mean_[0]
             energy_std = energy_scaler.scale_[0]
         else:
             energy_mean = 0
             energy_std = 1
         # ./preprocessed_data/LibriTTS/pitch
         # normalize() 進行歸一化并且将歸一化資料儲存,傳回最大值和最小值
         pitch_min, pitch_max = self.normalize(
             os.path.join(self.out_dir, "pitch"), pitch_mean, pitch_std
         )
         energy_min, energy_max = self.normalize(
             os.path.join(self.out_dir, "energy"), energy_mean, energy_std
         )
 
         # Save files
         # json.dump() 将一個python資料結構轉為json格式
         with open(os.path.join(self.out_dir, "speakers.json"), "w") as f:
             f.write(json.dumps(speakers))
 
         with open(os.path.join(self.out_dir, "stats.json"), "w") as f:
             stats = {
                 "pitch": [
                     float(pitch_min),
                     float(pitch_max),
                     float(pitch_mean),
                     float(pitch_std),
                 ],
                 "energy": [
                     float(energy_min),
                     float(energy_max),
                     float(energy_mean),
                     float(energy_std),
                 ],
             }
             f.write(json.dumps(stats))
 
         print(
             "Total time: {} hours".format(
                 n_frames * self.hop_length / self.sampling_rate / 3600
             )
         )
 
         random.shuffle(out)
         out = [r for r in out if r is not None]
 
         # Write metadata 劃分訓練集文本資料和驗證集文本資料
         # val_size = 512
         with open(os.path.join(self.out_dir, "train.txt"), "w", encoding="utf-8") as f:
             for m in out[self.val_size :]:
                 f.write(m + "\n")
         with open(os.path.join(self.out_dir, "val.txt"), "w", encoding="utf-8") as f:
             for m in out[: self.val_size]:
                 f.write(m + "\n")
 
         return out
     # 基于檔案路徑提取音頻檔案的mel、pitch、energy、duration資料
     def process_utterance(self, speaker, basename):
         # ./raw_data/LibriTTS/speaker/{basename}.wav
         # ./raw_data/LibriTTS/speaker/{basename}.lab
         # lab 檔案存儲音頻對印的文本
         wav_path = os.path.join(self.in_dir, speaker, "{}.wav".format(basename))
         text_path = os.path.join(self.in_dir, speaker, "{}.lab".format(basename))
         # tg_path = ./preprocessed_data/LibriTTS/TextGrid/speaker/{base_name}.TextGrid
         tg_path = os.path.join(
             self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
         )
 
         # Get alignments
         # 讀取textgrid标注檔案
         textgrid = tgt.io.read_textgrid(tg_path)
         # 擷取文本的對應音素phone[],duration[],start,end
         # 資料提取。
         # phone中是textgrid對象中文本轉為音素的清單,
         # duration中為音素清單中每個元素對應的mel幀數,即每個音素的持續時間,
         # start為音頻開始時間,end為結束時間
         phone, duration, start, end = self.get_alignment(
             textgrid.get_tier_by_name("phones")
         )
         # text = {phone1 phone2 ... phone(n)}
         # 文本資訊拼接成字元串友善存儲
         text = "{" + " ".join(phone) + "}"
         if start >= end:
             return None
 
         # Read and trim wav files
         # 讀取到該音素集合對應的音頻片段
         wav, _ = librosa.load(wav_path)
         wav = wav[
             int(self.sampling_rate * start) : int(self.sampling_rate * end)
         ].astype(np.float32)
 
         # Read raw text
         # 讀取到該音素集合對應的文本内容
         with open(text_path, "r") as f:
             raw_text = f.readline().strip("\n")
 
         # Compute fundamental frequency
         # 提取基頻F0
         pitch, t = pw.dio(
             wav.astype(np.float64),
             self.sampling_rate,
             frame_period=self.hop_length / self.sampling_rate * 1000,
         )
         pitch = pw.stonemask(wav.astype(np.float64), pitch, t, self.sampling_rate)
 
         pitch = pitch[: sum(duration)] # 與總的mel譜圖幀數對齊
         if np.sum(pitch != 0) <= 1:
             return None
         # 計算音素對應的mel——specttogram和能量
         # Compute mel-scale spectrogram and energy
         mel_spectrogram, energy = Audio.tools.get_mel_from_wav(wav, self.STFT) # 計算mel譜圖
         mel_spectrogram = mel_spectrogram[:, : sum(duration)]
         energy = energy[: sum(duration)]
 
         if self.pitch_phoneme_averaging:
             # phoneme_level
             # perform linear interpolation,線性插值,就是将pitch序列中為0的值指派一個合理的數值
             nonzero_ids = np.where(pitch != 0)[0] # 擷取pitch中不為值不為0的索引
             # interp1d()
             # bounds_error = False, 超界的值由fill_value指定。
             interp_fn = interp1d(
                 nonzero_ids,
                 pitch[nonzero_ids],
                 fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]),
                 bounds_error=False,
             )
             pitch = interp_fn(np.arange(0, len(pitch))) # 插值後,pitch中為0的部分通過插值得到了補充
 
             # Phoneme-level average
             pos = 0
             for i, d in enumerate(duration):
                 if d > 0:
                     pitch[i] = np.mean(pitch[pos : pos + d])
                 else:
                     pitch[i] = 0
                 pos += d
             pitch = pitch[: len(duration)]
 
         if self.energy_phoneme_averaging:
             # Phoneme-level average
             pos = 0
             for i, d in enumerate(duration):
                 if d > 0:
                     energy[i] = np.mean(energy[pos : pos + d])
                 else:
                     energy[i] = 0
                 pos += d
             energy = energy[: len(duration)]
 
         # Save files
         # ./preprocessed_data/LibriTTS/
         dur_filename = "{}-duration-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "duration", dur_filename), duration) # 儲存時序時間
 
         pitch_filename = "{}-pitch-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "pitch", pitch_filename), pitch) # 儲存pitch
 
         energy_filename = "{}-energy-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "energy", energy_filename), energy) # 儲存energy
 
         mel_filename = "{}-mel-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "mel", mel_filename),mel_spectrogram.T)
 
         return (
             "|".join([basename, speaker, text, raw_text]), # 存儲文本形式的資料,字元串
             self.remove_outlier(pitch), # 去除離群值的pitch序列
             self.remove_outlier(energy), # 去除離群值的energy序列
             mel_spectrogram.shape[1], # 記錄mel譜圖序列幀數
         )
         
     # 提取對齊資訊
     def get_alignment(self, tier):
         sil_phones = ["sil", "sp", "spn"]
         # tier中存儲的主要内容就是音頻的持續時間,以及文中中每個音素對應的持續時間資訊
         phones = [] # 音素
         durations = [] # 持續時間
         start_time = 0 # 開始時間
         end_time = 0 # 結束時間
         end_idx = 0
         # t的類型是Interval(0.0, 0.04, "P"),第一個開始時間,第二個是結束時間,第三個即為該段對應的文本,這裡是音素
         for t in tier._objects:
             s, e, p = t.start_time, t.end_time, t.text
 
             # Trim leading silences
             # 對于句子開頭的sil phones
             if phones == []:
                 if p in sil_phones:
                     continue
                 else:
                     start_time = s
 
             if p not in sil_phones:
                 # For ordinary phones
                 phones.append(p)
                 end_time = e
                 end_idx = len(phones) # 記錄已記錄的音素的個數
             else:
                 # 對于句子中的sil phones
                 # For silent phones
                 phones.append(p)
 
             # np.round()傳回浮點數的四舍五入值
             # e = end_time
             # 記錄持續時間,将時間機關秒轉換為mel幀數
             durations.append(
                 int(
                     np.round(e * self.sampling_rate / self.hop_length)
                     - np.round(s * self.sampling_rate / self.hop_length)
                 )
             )
 
         # Trim tailing silences
         phones = phones[:end_idx]
         durations = durations[:end_idx]
 
         return phones, durations, start_time, end_time
     
     # 删除離群值,使用箱型圖的邏輯
     def remove_outlier(self, values):
         values = np.array(values)
         # 計算分位數值。
         p25 = np.percentile(values, 25)
         p75 = np.percentile(values, 75)
         lower = p25 - 1.5 * (p75 - p25)
         upper = p75 + 1.5 * (p75 - p25)
 
         normal_indices = np.logical_and(values > lower, values < upper)
 
         return values[normal_indices]
 
     # ./preprocessed_data/LibriTTS/pitch or energy
     def normalize(self, in_dir, mean, std):
         max_value = np.finfo(np.float64).min
         min_value = np.finfo(np.float64).max
         for filename in os.listdir(in_dir):
             filename = os.path.join(in_dir, filename)
             # normalize
             values = (np.load(filename) - mean) / std
             np.save(filename, values)
 
             max_value = max(max_value, max(values))
             min_value = min(min_value, min(values))
 
         return min_value, max_value