在進行資料處理前,先将LJSpeech資料集下載下傳至本地,在FastSpeech2論文中使用強制對齊工具MFA從文本和音頻中提取對齊資訊,代碼解析時使用的是作者提供的已經提取好的對齊資訊檔案,感興趣的讀者也可以自行下載下傳、安裝MFA提取對齊資訊。根據倉庫作者提供的連結下載下傳的每一個*.TextGrid檔案與一個音頻對應,其中記錄了word_level和phone_level兩個級别的文本、對應持續時間(機關為秒)等資訊,具體格式如下圖所示,主要差別就是phone_level比word_level經精細,顆粒度更小。
1.TextGrid檔案(MFA對齊檔案)詳解
2.prepare_align.py
該檔案就是相當于一個接口,針對不同的資料集調用對應的檔案函數進行資料準備,主要就是調用資料集對應的prepare_align函數處理資料
import argparse
import yaml
from preprocessor import ljspeech, aishell3, libritts
def main(config):
if "LJSpeech" in config["dataset"]:
ljspeech.prepare_align(config)
if "AISHELL3" in config["dataset"]:
aishell3.prepare_align(config)
if "LibriTTS" in config["dataset"]:
libritts.prepare_align(config)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("config", type=str, help="path to preprocess.yaml") # 加載對應的yaml檔案,便于後面添加相應參數
args = parser.parse_args()
config = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader)
main(config)
3.LibriTTS.py
雖然該檔案隻定義了prepare_align函數,但是該函數也隻是簡單的将LJSpeech資料集中的音頻資料和文本資料進行了處理并儲存,并沒有提取對齊資訊。
import os
import librosa
import numpy as np
from scipy.io import wavfile
from tqdm import tqdm
from text import _clean_text
def prepare_align(config):
in_dir = config["path"]["corpus_path"] # "/home/ming/Data/LibriTTS/train-clean-360"
out_dir = config["path"]["raw_path"] # "./raw_data/LibriTTS"
sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
max_wav_value = config["preprocessing"]["audio"]["max_wav_value"] # 32768
cleaners = config["preprocessing"]["text"]["text_cleaners"] # english_cleaners
# os.listdir() 傳回指定目錄下的所有檔案名和目錄名
# 檔案名為speaker
for speaker in tqdm(os.listdir(in_dir)):
# os.path.join()将in_dir與speaker連接配接起來,傳回目錄下的所有檔案名和目錄名
# in_dir/speaker/chapter
for chapter in os.listdir(os.path.join(in_dir, speaker)):
# 傳回in_dir/speaker/chapter目錄下的所有檔案
# 該目錄下的檔案包括三種:.normalized.txt .wav .original.txt
# file_name檔案名
for file_name in os.listdir(os.path.join(in_dir, speaker, chapter)):
if file_name[-4:] != ".wav":
continue
# 100_121669_000001_000000.normalized.txt
# 100_121669_000001_000000.original.txt
# 100_121669_000001_000000.wav
# 不是wav檔案就跳過,取wav檔案的檔案名,不取字尾
base_name = file_name[:-4]
# .normalized.txt檔案中存着一句英語句子,如Tom, the Piper's Son
text_path = os.path.join(
in_dir, speaker, chapter, "{}.normalized.txt".format(base_name)
)
wav_path = os.path.join(
in_dir, speaker, chapter, "{}.wav".format(base_name)
)
# 讀取文本内容,如text=Tom, the Piper's Son
with open(text_path) as f:
text = f.readline().strip("\n")
# ######## ①
# 亂碼處理、大小寫處理、縮寫展開、空格處理、數字處理
text = _clean_text(text, cleaners)
# 建立檔案夾out_dir/speaker,且目錄存在不會觸發目錄存在異常
os.makedirs(os.path.join(out_dir, speaker), exist_ok=True)
# librosa音頻信号處理庫函數
# load 從檔案加載音頻資料,而且可以通過參數設定是否保留雙聲道,采樣率,重采樣類型
# 傳回類型wav為numpy.ndarray _為sampling_rate
wav, _ = librosa.load(wav_path, sampling_rate)
# wav = wav / (max(|wav|) * 32768)
# 歸一化,好處1,消除奇異樣本資料的影響,好處2,cond
wav = wav / max(abs(wav)) * max_wav_value # 32768.0 ???
# 将numpy格式的wav寫入到指定檔案中,out_dir/speaker/{base_name}.wav,sr,數值類型轉換
wavfile.write(
os.path.join(out_dir, speaker, "{}.wav".format(base_name)),
sampling_rate,
# 設定改類型是由ndarray中的數值大小範圍決定的,int16:-32768~32768
wav.astype(np.int16),
)
# 打開out_dir/speaker/{base_name}.lab,
# 将從{base_name}.normalized.txt檔案中讀取出來,然後經過處理的text寫入到{base_name}.lab檔案中
with open(
os.path.join(out_dir, speaker, "{}.lab".format(base_name)),
"w",
) as f1:
f1.write(text)
4.preprocessor.py
該檔案中才是從下載下傳的TextGrid檔案中提取每條音頻對應的duration、pitch和energy資訊;其中的config是通過config/LibriTTS/preprocess.yaml檔案加載而來。
import os
import random
import json
import tgt
import librosa
import numpy as np
import pyworld as pw
from scipy.interpolate import interp1d
from tqdm import tqdm
import audio as Audio
# 定義處理所有資料的處理類
class Preprocessor:
def __init__(self, config):
self.config = config
# 原始資料的存放路徑"./raw_data/LibriTTS"
self.in_dir = config["path"]["raw_path"]
# 資料處理後的儲存路徑./preprocessed_data/LibriTTS"
self.out_dir = config["path"]["preprocessed_path"]
self.val_size = config["preprocessing"]["val_size"] # 512
self.sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
self.hop_length = config["preprocessing"]["stft"]["hop_length"] # 256
# assert condition==False,raise AssertionError()
# pitch.feature == phoneme_level或者frame_level時繼續運作,否則抛出異常
assert config["preprocessing"]["pitch"]["feature"] in [
"phoneme_level",
"frame_level",
]
# energy.feature == phoneme_level或者frame_level時繼續運作,否則抛出異常
assert config["preprocessing"]["energy"]["feature"] in [
"phoneme_level",
"frame_level",
]
# 是否進行pitch_phoneme_averaging
self.pitch_phoneme_averaging = (
config["preprocessing"]["pitch"]["feature"] == "phoneme_level"
)
# 是否進行energy_phoneme_averaging
self.energy_phoneme_averaging = (
config["preprocessing"]["energy"]["feature"] == "phoneme_level"
)
# 是否進行正則化 is True
self.pitch_normalization = config["preprocessing"]["pitch"]["normalization"]
self.energy_normalization = config["preprocessing"]["energy"]["normalization"]
# 初始化STFT子產品
self.STFT = Audio.stft.TacotronSTFT(
config["preprocessing"]["stft"]["filter_length"], # 1024
config["preprocessing"]["stft"]["hop_length"], # 256
config["preprocessing"]["stft"]["win_length"], # 1024
config["preprocessing"]["mel"]["n_mel_channels"], # 80
config["preprocessing"]["audio"]["sampling_rate"], # 22050
config["preprocessing"]["mel"]["mel_fmin"], # 0
config["preprocessing"]["mel"]["mel_fmax"], # 8000
)
# 提取所需要的資料
def build_from_path(self):
# out_dir:"./preprocessed_data/LibriTTS"
# os.makedirs() 建立檔案夾,exist_ok = True, 存在的話不會抛出異常
os.makedirs((os.path.join(self.out_dir, "mel")), exist_ok=True)
os.makedirs((os.path.join(self.out_dir, "pitch")), exist_ok=True)
os.makedirs((os.path.join(self.out_dir, "energy")), exist_ok=True)
os.makedirs((os.path.join(self.out_dir, "duration")), exist_ok=True)
print("Processing Data ...")
out = list()
n_frames = 0
pitch_scaler = StandardScaler()
energy_scaler = StandardScaler()
# Compute pitch, energy, duration, and mel-spectrogram
speakers = {}
# in_dir "./raw_data/LibriTTS" 下的所有檔案都是說話人編号
# tqdm 添加一個進度條
for i, speaker in enumerate(tqdm(os.listdir(self.in_dir))):
speakers[speaker] = i
# in_dir/speaker 目錄下的檔案包括兩種類型:{base_name}.lab , {base_name}.wav
for wav_name in os.listdir(os.path.join(self.in_dir, speaker)):
if ".wav" not in wav_name:
continue
# 基于音頻檔案的basename建構對應的對齊檔案路徑名
basename = wav_name.split(".")[0]
# out_dir/TextGrid/speaker/{base_name}.TextGrid
# tg_path 為某個句子的TextGrid檔案路徑
tg_path = os.path.join(
self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
)
if os.path.exists(tg_path):
print(tg_path)
# process_utterance() return
# "|".join([basename, speaker, text(音素串), raw_text(音素串對應的文本)]),
# self.remove_outlier(pitch),
# self.remove_outlier(energy),
# mel_spectrogram.shape[1],
ret = self.process_utterance(speaker, basename) # 提取單個音頻的mel、pitch、energy資料
if ret is None:
continue
else:
info, pitch, energy, n = ret # n是mel譜圖序列的總幀數
out.append(info) # 記錄info中文本相關的資料,是一個用“|”分割的字元串
if len(pitch) > 0:
# reshape(-1,1)轉換成一列
pitch_scaler.partial_fit(pitch.reshape((-1, 1)))
if len(energy) > 0:
energy_scaler.partial_fit(energy.reshape((-1, 1)))
n_frames += n
print("Computing statistic quantities ...")
# Perform normalization if necessary
if self.pitch_normalization:
pitch_mean = pitch_scaler.mean_[0]
pitch_std = pitch_scaler.scale_[0]
else:
# A numerical trick to avoid normalization...
pitch_mean = 0
pitch_std = 1
if self.energy_normalization:
energy_mean = energy_scaler.mean_[0]
energy_std = energy_scaler.scale_[0]
else:
energy_mean = 0
energy_std = 1
# ./preprocessed_data/LibriTTS/pitch
# normalize() 進行歸一化并且将歸一化資料儲存,傳回最大值和最小值
pitch_min, pitch_max = self.normalize(
os.path.join(self.out_dir, "pitch"), pitch_mean, pitch_std
)
energy_min, energy_max = self.normalize(
os.path.join(self.out_dir, "energy"), energy_mean, energy_std
)
# Save files
# json.dump() 将一個python資料結構轉為json格式
with open(os.path.join(self.out_dir, "speakers.json"), "w") as f:
f.write(json.dumps(speakers))
with open(os.path.join(self.out_dir, "stats.json"), "w") as f:
stats = {
"pitch": [
float(pitch_min),
float(pitch_max),
float(pitch_mean),
float(pitch_std),
],
"energy": [
float(energy_min),
float(energy_max),
float(energy_mean),
float(energy_std),
],
}
f.write(json.dumps(stats))
print(
"Total time: {} hours".format(
n_frames * self.hop_length / self.sampling_rate / 3600
)
)
random.shuffle(out)
out = [r for r in out if r is not None]
# Write metadata 劃分訓練集文本資料和驗證集文本資料
# val_size = 512
with open(os.path.join(self.out_dir, "train.txt"), "w", encoding="utf-8") as f:
for m in out[self.val_size :]:
f.write(m + "\n")
with open(os.path.join(self.out_dir, "val.txt"), "w", encoding="utf-8") as f:
for m in out[: self.val_size]:
f.write(m + "\n")
return out
# 基于檔案路徑提取音頻檔案的mel、pitch、energy、duration資料
def process_utterance(self, speaker, basename):
# ./raw_data/LibriTTS/speaker/{basename}.wav
# ./raw_data/LibriTTS/speaker/{basename}.lab
# lab 檔案存儲音頻對印的文本
wav_path = os.path.join(self.in_dir, speaker, "{}.wav".format(basename))
text_path = os.path.join(self.in_dir, speaker, "{}.lab".format(basename))
# tg_path = ./preprocessed_data/LibriTTS/TextGrid/speaker/{base_name}.TextGrid
tg_path = os.path.join(
self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
)
# Get alignments
# 讀取textgrid标注檔案
textgrid = tgt.io.read_textgrid(tg_path)
# 擷取文本的對應音素phone[],duration[],start,end
# 資料提取。
# phone中是textgrid對象中文本轉為音素的清單,
# duration中為音素清單中每個元素對應的mel幀數,即每個音素的持續時間,
# start為音頻開始時間,end為結束時間
phone, duration, start, end = self.get_alignment(
textgrid.get_tier_by_name("phones")
)
# text = {phone1 phone2 ... phone(n)}
# 文本資訊拼接成字元串友善存儲
text = "{" + " ".join(phone) + "}"
if start >= end:
return None
# Read and trim wav files
# 讀取到該音素集合對應的音頻片段
wav, _ = librosa.load(wav_path)
wav = wav[
int(self.sampling_rate * start) : int(self.sampling_rate * end)
].astype(np.float32)
# Read raw text
# 讀取到該音素集合對應的文本内容
with open(text_path, "r") as f:
raw_text = f.readline().strip("\n")
# Compute fundamental frequency
# 提取基頻F0
pitch, t = pw.dio(
wav.astype(np.float64),
self.sampling_rate,
frame_period=self.hop_length / self.sampling_rate * 1000,
)
pitch = pw.stonemask(wav.astype(np.float64), pitch, t, self.sampling_rate)
pitch = pitch[: sum(duration)] # 與總的mel譜圖幀數對齊
if np.sum(pitch != 0) <= 1:
return None
# 計算音素對應的mel——specttogram和能量
# Compute mel-scale spectrogram and energy
mel_spectrogram, energy = Audio.tools.get_mel_from_wav(wav, self.STFT) # 計算mel譜圖
mel_spectrogram = mel_spectrogram[:, : sum(duration)]
energy = energy[: sum(duration)]
if self.pitch_phoneme_averaging:
# phoneme_level
# perform linear interpolation,線性插值,就是将pitch序列中為0的值指派一個合理的數值
nonzero_ids = np.where(pitch != 0)[0] # 擷取pitch中不為值不為0的索引
# interp1d()
# bounds_error = False, 超界的值由fill_value指定。
interp_fn = interp1d(
nonzero_ids,
pitch[nonzero_ids],
fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]),
bounds_error=False,
)
pitch = interp_fn(np.arange(0, len(pitch))) # 插值後,pitch中為0的部分通過插值得到了補充
# Phoneme-level average
pos = 0
for i, d in enumerate(duration):
if d > 0:
pitch[i] = np.mean(pitch[pos : pos + d])
else:
pitch[i] = 0
pos += d
pitch = pitch[: len(duration)]
if self.energy_phoneme_averaging:
# Phoneme-level average
pos = 0
for i, d in enumerate(duration):
if d > 0:
energy[i] = np.mean(energy[pos : pos + d])
else:
energy[i] = 0
pos += d
energy = energy[: len(duration)]
# Save files
# ./preprocessed_data/LibriTTS/
dur_filename = "{}-duration-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "duration", dur_filename), duration) # 儲存時序時間
pitch_filename = "{}-pitch-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "pitch", pitch_filename), pitch) # 儲存pitch
energy_filename = "{}-energy-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "energy", energy_filename), energy) # 儲存energy
mel_filename = "{}-mel-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "mel", mel_filename),mel_spectrogram.T)
return (
"|".join([basename, speaker, text, raw_text]), # 存儲文本形式的資料,字元串
self.remove_outlier(pitch), # 去除離群值的pitch序列
self.remove_outlier(energy), # 去除離群值的energy序列
mel_spectrogram.shape[1], # 記錄mel譜圖序列幀數
)
# 提取對齊資訊
def get_alignment(self, tier):
sil_phones = ["sil", "sp", "spn"]
# tier中存儲的主要内容就是音頻的持續時間,以及文中中每個音素對應的持續時間資訊
phones = [] # 音素
durations = [] # 持續時間
start_time = 0 # 開始時間
end_time = 0 # 結束時間
end_idx = 0
# t的類型是Interval(0.0, 0.04, "P"),第一個開始時間,第二個是結束時間,第三個即為該段對應的文本,這裡是音素
for t in tier._objects:
s, e, p = t.start_time, t.end_time, t.text
# Trim leading silences
# 對于句子開頭的sil phones
if phones == []:
if p in sil_phones:
continue
else:
start_time = s
if p not in sil_phones:
# For ordinary phones
phones.append(p)
end_time = e
end_idx = len(phones) # 記錄已記錄的音素的個數
else:
# 對于句子中的sil phones
# For silent phones
phones.append(p)
# np.round()傳回浮點數的四舍五入值
# e = end_time
# 記錄持續時間,将時間機關秒轉換為mel幀數
durations.append(
int(
np.round(e * self.sampling_rate / self.hop_length)
- np.round(s * self.sampling_rate / self.hop_length)
)
)
# Trim tailing silences
phones = phones[:end_idx]
durations = durations[:end_idx]
return phones, durations, start_time, end_time
# 删除離群值,使用箱型圖的邏輯
def remove_outlier(self, values):
values = np.array(values)
# 計算分位數值。
p25 = np.percentile(values, 25)
p75 = np.percentile(values, 75)
lower = p25 - 1.5 * (p75 - p25)
upper = p75 + 1.5 * (p75 - p25)
normal_indices = np.logical_and(values > lower, values < upper)
return values[normal_indices]
# ./preprocessed_data/LibriTTS/pitch or energy
def normalize(self, in_dir, mean, std):
max_value = np.finfo(np.float64).min
min_value = np.finfo(np.float64).max
for filename in os.listdir(in_dir):
filename = os.path.join(in_dir, filename)
# normalize
values = (np.load(filename) - mean) / std
np.save(filename, values)
max_value = max(max_value, max(values))
min_value = min(min_value, min(values))
return min_value, max_value