final structure. prepared to solve dependencies

This commit is contained in:
SWivid
2024-10-24 00:55:41 +08:00
parent 8ed1beac1e
commit 8e0edfcf8f
10 changed files with 587 additions and 583 deletions

View File

@@ -0,0 +1,397 @@
import math
import os
import random
import string
from tqdm import tqdm
import torch
import torch.nn.functional as F
import torchaudio
from f5_tts.model.modules import MelSpec
from f5_tts.model.utils import convert_char_to_pinyin
from f5_tts.eval.ecapa_tdnn import ECAPA_TDNN_SMALL
# seedtts testset metainfo: utt, prompt_text, prompt_wav, gt_text, gt_wav
def get_seedtts_testset_metainfo(metalst):
f = open(metalst)
lines = f.readlines()
f.close()
metainfo = []
for line in lines:
if len(line.strip().split("|")) == 5:
utt, prompt_text, prompt_wav, gt_text, gt_wav = line.strip().split("|")
elif len(line.strip().split("|")) == 4:
utt, prompt_text, prompt_wav, gt_text = line.strip().split("|")
gt_wav = os.path.join(os.path.dirname(metalst), "wavs", utt + ".wav")
if not os.path.isabs(prompt_wav):
prompt_wav = os.path.join(os.path.dirname(metalst), prompt_wav)
metainfo.append((utt, prompt_text, prompt_wav, gt_text, gt_wav))
return metainfo
# librispeech test-clean metainfo: gen_utt, ref_txt, ref_wav, gen_txt, gen_wav
def get_librispeech_test_clean_metainfo(metalst, librispeech_test_clean_path):
f = open(metalst)
lines = f.readlines()
f.close()
metainfo = []
for line in lines:
ref_utt, ref_dur, ref_txt, gen_utt, gen_dur, gen_txt = line.strip().split("\t")
# ref_txt = ref_txt[0] + ref_txt[1:].lower() + '.' # if use librispeech test-clean (no-pc)
ref_spk_id, ref_chaptr_id, _ = ref_utt.split("-")
ref_wav = os.path.join(librispeech_test_clean_path, ref_spk_id, ref_chaptr_id, ref_utt + ".flac")
# gen_txt = gen_txt[0] + gen_txt[1:].lower() + '.' # if use librispeech test-clean (no-pc)
gen_spk_id, gen_chaptr_id, _ = gen_utt.split("-")
gen_wav = os.path.join(librispeech_test_clean_path, gen_spk_id, gen_chaptr_id, gen_utt + ".flac")
metainfo.append((gen_utt, ref_txt, ref_wav, " " + gen_txt, gen_wav))
return metainfo
# padded to max length mel batch
def padded_mel_batch(ref_mels):
max_mel_length = torch.LongTensor([mel.shape[-1] for mel in ref_mels]).amax()
padded_ref_mels = []
for mel in ref_mels:
padded_ref_mel = F.pad(mel, (0, max_mel_length - mel.shape[-1]), value=0)
padded_ref_mels.append(padded_ref_mel)
padded_ref_mels = torch.stack(padded_ref_mels)
padded_ref_mels = padded_ref_mels.permute(0, 2, 1)
return padded_ref_mels
# get prompts from metainfo containing: utt, prompt_text, prompt_wav, gt_text, gt_wav
def get_inference_prompt(
metainfo,
speed=1.0,
tokenizer="pinyin",
polyphone=True,
target_sample_rate=24000,
n_mel_channels=100,
hop_length=256,
target_rms=0.1,
use_truth_duration=False,
infer_batch_size=1,
num_buckets=200,
min_secs=3,
max_secs=40,
):
prompts_all = []
min_tokens = min_secs * target_sample_rate // hop_length
max_tokens = max_secs * target_sample_rate // hop_length
batch_accum = [0] * num_buckets
utts, ref_rms_list, ref_mels, ref_mel_lens, total_mel_lens, final_text_list = (
[[] for _ in range(num_buckets)] for _ in range(6)
)
mel_spectrogram = MelSpec(
target_sample_rate=target_sample_rate, n_mel_channels=n_mel_channels, hop_length=hop_length
)
for utt, prompt_text, prompt_wav, gt_text, gt_wav in tqdm(metainfo, desc="Processing prompts..."):
# Audio
ref_audio, ref_sr = torchaudio.load(prompt_wav)
ref_rms = torch.sqrt(torch.mean(torch.square(ref_audio)))
if ref_rms < target_rms:
ref_audio = ref_audio * target_rms / ref_rms
assert ref_audio.shape[-1] > 5000, f"Empty prompt wav: {prompt_wav}, or torchaudio backend issue."
if ref_sr != target_sample_rate:
resampler = torchaudio.transforms.Resample(ref_sr, target_sample_rate)
ref_audio = resampler(ref_audio)
# Text
if len(prompt_text[-1].encode("utf-8")) == 1:
prompt_text = prompt_text + " "
text = [prompt_text + gt_text]
if tokenizer == "pinyin":
text_list = convert_char_to_pinyin(text, polyphone=polyphone)
else:
text_list = text
# Duration, mel frame length
ref_mel_len = ref_audio.shape[-1] // hop_length
if use_truth_duration:
gt_audio, gt_sr = torchaudio.load(gt_wav)
if gt_sr != target_sample_rate:
resampler = torchaudio.transforms.Resample(gt_sr, target_sample_rate)
gt_audio = resampler(gt_audio)
total_mel_len = ref_mel_len + int(gt_audio.shape[-1] / hop_length / speed)
# # test vocoder resynthesis
# ref_audio = gt_audio
else:
ref_text_len = len(prompt_text.encode("utf-8"))
gen_text_len = len(gt_text.encode("utf-8"))
total_mel_len = ref_mel_len + int(ref_mel_len / ref_text_len * gen_text_len / speed)
# to mel spectrogram
ref_mel = mel_spectrogram(ref_audio)
ref_mel = ref_mel.squeeze(0)
# deal with batch
assert infer_batch_size > 0, "infer_batch_size should be greater than 0."
assert (
min_tokens <= total_mel_len <= max_tokens
), f"Audio {utt} has duration {total_mel_len*hop_length//target_sample_rate}s out of range [{min_secs}, {max_secs}]."
bucket_i = math.floor((total_mel_len - min_tokens) / (max_tokens - min_tokens + 1) * num_buckets)
utts[bucket_i].append(utt)
ref_rms_list[bucket_i].append(ref_rms)
ref_mels[bucket_i].append(ref_mel)
ref_mel_lens[bucket_i].append(ref_mel_len)
total_mel_lens[bucket_i].append(total_mel_len)
final_text_list[bucket_i].extend(text_list)
batch_accum[bucket_i] += total_mel_len
if batch_accum[bucket_i] >= infer_batch_size:
# print(f"\n{len(ref_mels[bucket_i][0][0])}\n{ref_mel_lens[bucket_i]}\n{total_mel_lens[bucket_i]}")
prompts_all.append(
(
utts[bucket_i],
ref_rms_list[bucket_i],
padded_mel_batch(ref_mels[bucket_i]),
ref_mel_lens[bucket_i],
total_mel_lens[bucket_i],
final_text_list[bucket_i],
)
)
batch_accum[bucket_i] = 0
(
utts[bucket_i],
ref_rms_list[bucket_i],
ref_mels[bucket_i],
ref_mel_lens[bucket_i],
total_mel_lens[bucket_i],
final_text_list[bucket_i],
) = [], [], [], [], [], []
# add residual
for bucket_i, bucket_frames in enumerate(batch_accum):
if bucket_frames > 0:
prompts_all.append(
(
utts[bucket_i],
ref_rms_list[bucket_i],
padded_mel_batch(ref_mels[bucket_i]),
ref_mel_lens[bucket_i],
total_mel_lens[bucket_i],
final_text_list[bucket_i],
)
)
# not only leave easy work for last workers
random.seed(666)
random.shuffle(prompts_all)
return prompts_all
# get wav_res_ref_text of seed-tts test metalst
# https://github.com/BytedanceSpeech/seed-tts-eval
def get_seed_tts_test(metalst, gen_wav_dir, gpus):
f = open(metalst)
lines = f.readlines()
f.close()
test_set_ = []
for line in tqdm(lines):
if len(line.strip().split("|")) == 5:
utt, prompt_text, prompt_wav, gt_text, gt_wav = line.strip().split("|")
elif len(line.strip().split("|")) == 4:
utt, prompt_text, prompt_wav, gt_text = line.strip().split("|")
if not os.path.exists(os.path.join(gen_wav_dir, utt + ".wav")):
continue
gen_wav = os.path.join(gen_wav_dir, utt + ".wav")
if not os.path.isabs(prompt_wav):
prompt_wav = os.path.join(os.path.dirname(metalst), prompt_wav)
test_set_.append((gen_wav, prompt_wav, gt_text))
num_jobs = len(gpus)
if num_jobs == 1:
return [(gpus[0], test_set_)]
wav_per_job = len(test_set_) // num_jobs + 1
test_set = []
for i in range(num_jobs):
test_set.append((gpus[i], test_set_[i * wav_per_job : (i + 1) * wav_per_job]))
return test_set
# get librispeech test-clean cross sentence test
def get_librispeech_test(metalst, gen_wav_dir, gpus, librispeech_test_clean_path, eval_ground_truth=False):
f = open(metalst)
lines = f.readlines()
f.close()
test_set_ = []
for line in tqdm(lines):
ref_utt, ref_dur, ref_txt, gen_utt, gen_dur, gen_txt = line.strip().split("\t")
if eval_ground_truth:
gen_spk_id, gen_chaptr_id, _ = gen_utt.split("-")
gen_wav = os.path.join(librispeech_test_clean_path, gen_spk_id, gen_chaptr_id, gen_utt + ".flac")
else:
if not os.path.exists(os.path.join(gen_wav_dir, gen_utt + ".wav")):
raise FileNotFoundError(f"Generated wav not found: {gen_utt}")
gen_wav = os.path.join(gen_wav_dir, gen_utt + ".wav")
ref_spk_id, ref_chaptr_id, _ = ref_utt.split("-")
ref_wav = os.path.join(librispeech_test_clean_path, ref_spk_id, ref_chaptr_id, ref_utt + ".flac")
test_set_.append((gen_wav, ref_wav, gen_txt))
num_jobs = len(gpus)
if num_jobs == 1:
return [(gpus[0], test_set_)]
wav_per_job = len(test_set_) // num_jobs + 1
test_set = []
for i in range(num_jobs):
test_set.append((gpus[i], test_set_[i * wav_per_job : (i + 1) * wav_per_job]))
return test_set
# load asr model
def load_asr_model(lang, ckpt_dir=""):
if lang == "zh":
from funasr import AutoModel
model = AutoModel(
model=os.path.join(ckpt_dir, "paraformer-zh"),
# vad_model = os.path.join(ckpt_dir, "fsmn-vad"),
# punc_model = os.path.join(ckpt_dir, "ct-punc"),
# spk_model = os.path.join(ckpt_dir, "cam++"),
disable_update=True,
) # following seed-tts setting
elif lang == "en":
from faster_whisper import WhisperModel
model_size = "large-v3" if ckpt_dir == "" else ckpt_dir
model = WhisperModel(model_size, device="cuda", compute_type="float16")
return model
# WER Evaluation, the way Seed-TTS does
def run_asr_wer(args):
rank, lang, test_set, ckpt_dir = args
if lang == "zh":
import zhconv
torch.cuda.set_device(rank)
elif lang == "en":
os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)
else:
raise NotImplementedError(
"lang support only 'zh' (funasr paraformer-zh), 'en' (faster-whisper-large-v3), for now."
)
asr_model = load_asr_model(lang, ckpt_dir=ckpt_dir)
from zhon.hanzi import punctuation
punctuation_all = punctuation + string.punctuation
wers = []
from jiwer import compute_measures
for gen_wav, prompt_wav, truth in tqdm(test_set):
if lang == "zh":
res = asr_model.generate(input=gen_wav, batch_size_s=300, disable_pbar=True)
hypo = res[0]["text"]
hypo = zhconv.convert(hypo, "zh-cn")
elif lang == "en":
segments, _ = asr_model.transcribe(gen_wav, beam_size=5, language="en")
hypo = ""
for segment in segments:
hypo = hypo + " " + segment.text
# raw_truth = truth
# raw_hypo = hypo
for x in punctuation_all:
truth = truth.replace(x, "")
hypo = hypo.replace(x, "")
truth = truth.replace(" ", " ")
hypo = hypo.replace(" ", " ")
if lang == "zh":
truth = " ".join([x for x in truth])
hypo = " ".join([x for x in hypo])
elif lang == "en":
truth = truth.lower()
hypo = hypo.lower()
measures = compute_measures(truth, hypo)
wer = measures["wer"]
# ref_list = truth.split(" ")
# subs = measures["substitutions"] / len(ref_list)
# dele = measures["deletions"] / len(ref_list)
# inse = measures["insertions"] / len(ref_list)
wers.append(wer)
return wers
# SIM Evaluation
def run_sim(args):
rank, test_set, ckpt_dir = args
device = f"cuda:{rank}"
model = ECAPA_TDNN_SMALL(feat_dim=1024, feat_type="wavlm_large", config_path=None)
state_dict = torch.load(ckpt_dir, weights_only=True, map_location=lambda storage, loc: storage)
model.load_state_dict(state_dict["model"], strict=False)
use_gpu = True if torch.cuda.is_available() else False
if use_gpu:
model = model.cuda(device)
model.eval()
sim_list = []
for wav1, wav2, truth in tqdm(test_set):
wav1, sr1 = torchaudio.load(wav1)
wav2, sr2 = torchaudio.load(wav2)
resample1 = torchaudio.transforms.Resample(orig_freq=sr1, new_freq=16000)
resample2 = torchaudio.transforms.Resample(orig_freq=sr2, new_freq=16000)
wav1 = resample1(wav1)
wav2 = resample2(wav2)
if use_gpu:
wav1 = wav1.cuda(device)
wav2 = wav2.cuda(device)
with torch.no_grad():
emb1 = model(wav1)
emb2 = model(wav2)
sim = F.cosine_similarity(emb1, emb2)[0].item()
# print(f"VSim score between two audios: {sim:.4f} (-1.0, 1.0).")
sim_list.append(sim)
return sim_list

View File

@@ -11,7 +11,7 @@ import tomli
from cached_path import cached_path
from f5_tts.model import DiT, UNetT
from f5_tts.model.utils_infer import (
from f5_tts.infer.utils_infer import (
load_vocoder,
load_model,
preprocess_ref_audio_text,

View File

@@ -28,15 +28,13 @@ def gpu_decorator(func):
from f5_tts.model import DiT, UNetT
from f5_tts.model.utils import (
save_spectrogram,
)
from f5_tts.model.utils_infer import (
from f5_tts.infer.utils_infer import (
load_vocoder,
load_model,
preprocess_ref_audio_text,
infer_process,
remove_silence_for_generated_wav,
save_spectrogram,
)
vocos = load_vocoder()

View File

@@ -10,8 +10,8 @@ from f5_tts.model.utils import (
load_checkpoint,
get_tokenizer,
convert_char_to_pinyin,
save_spectrogram,
)
from f5_tts.infer.utils_infer import save_spectrogram
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

View File

@@ -4,6 +4,11 @@
import re
import tempfile
import matplotlib
matplotlib.use("Agg")
import matplotlib.pylab as plt
import numpy as np
import torch
import torchaudio
@@ -14,7 +19,6 @@ from vocos import Vocos
from f5_tts.model import CFM
from f5_tts.model.utils import (
load_checkpoint,
get_tokenizer,
convert_char_to_pinyin,
)
@@ -104,6 +108,38 @@ def initialize_asr_pipeline(device=device):
)
# load model checkpoint for inference
def load_checkpoint(model, ckpt_path, device, use_ema=True):
if device == "cuda":
model = model.half()
ckpt_type = ckpt_path.split(".")[-1]
if ckpt_type == "safetensors":
from safetensors.torch import load_file
checkpoint = load_file(ckpt_path)
else:
checkpoint = torch.load(ckpt_path, weights_only=True)
if use_ema:
if ckpt_type == "safetensors":
checkpoint = {"ema_model_state_dict": checkpoint}
checkpoint["model_state_dict"] = {
k.replace("ema_model.", ""): v
for k, v in checkpoint["ema_model_state_dict"].items()
if k not in ["initted", "step"]
}
model.load_state_dict(checkpoint["model_state_dict"])
else:
if ckpt_type == "safetensors":
checkpoint = {"model_state_dict": checkpoint}
model.load_state_dict(checkpoint["model_state_dict"])
return model.to(device)
# load model for inference
@@ -355,3 +391,14 @@ def remove_silence_for_generated_wav(filename):
non_silent_wave += non_silent_seg
aseg = non_silent_wave
aseg.export(filename, format="wav")
# save spectrogram
def save_spectrogram(spectrogram, path):
plt.figure(figsize=(12, 4))
plt.imshow(spectrogram, origin="lower", aspect="auto")
plt.colorbar()
plt.savefig(path)
plt.close()

View File

@@ -1,29 +1,16 @@
from __future__ import annotations
import os
import math
import random
import string
from importlib.resources import files
from tqdm import tqdm
from collections import defaultdict
import matplotlib
matplotlib.use("Agg")
import matplotlib.pylab as plt
import torch
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import torchaudio
import jieba
from pypinyin import lazy_pinyin, Style
from f5_tts.model.ecapa_tdnn import ECAPA_TDNN_SMALL
from f5_tts.model.modules import MelSpec
# seed everything
@@ -183,399 +170,6 @@ def convert_char_to_pinyin(text_list, polyphone=True):
return final_text_list
# save spectrogram
def save_spectrogram(spectrogram, path):
plt.figure(figsize=(12, 4))
plt.imshow(spectrogram, origin="lower", aspect="auto")
plt.colorbar()
plt.savefig(path)
plt.close()
# seedtts testset metainfo: utt, prompt_text, prompt_wav, gt_text, gt_wav
def get_seedtts_testset_metainfo(metalst):
f = open(metalst)
lines = f.readlines()
f.close()
metainfo = []
for line in lines:
if len(line.strip().split("|")) == 5:
utt, prompt_text, prompt_wav, gt_text, gt_wav = line.strip().split("|")
elif len(line.strip().split("|")) == 4:
utt, prompt_text, prompt_wav, gt_text = line.strip().split("|")
gt_wav = os.path.join(os.path.dirname(metalst), "wavs", utt + ".wav")
if not os.path.isabs(prompt_wav):
prompt_wav = os.path.join(os.path.dirname(metalst), prompt_wav)
metainfo.append((utt, prompt_text, prompt_wav, gt_text, gt_wav))
return metainfo
# librispeech test-clean metainfo: gen_utt, ref_txt, ref_wav, gen_txt, gen_wav
def get_librispeech_test_clean_metainfo(metalst, librispeech_test_clean_path):
f = open(metalst)
lines = f.readlines()
f.close()
metainfo = []
for line in lines:
ref_utt, ref_dur, ref_txt, gen_utt, gen_dur, gen_txt = line.strip().split("\t")
# ref_txt = ref_txt[0] + ref_txt[1:].lower() + '.' # if use librispeech test-clean (no-pc)
ref_spk_id, ref_chaptr_id, _ = ref_utt.split("-")
ref_wav = os.path.join(librispeech_test_clean_path, ref_spk_id, ref_chaptr_id, ref_utt + ".flac")
# gen_txt = gen_txt[0] + gen_txt[1:].lower() + '.' # if use librispeech test-clean (no-pc)
gen_spk_id, gen_chaptr_id, _ = gen_utt.split("-")
gen_wav = os.path.join(librispeech_test_clean_path, gen_spk_id, gen_chaptr_id, gen_utt + ".flac")
metainfo.append((gen_utt, ref_txt, ref_wav, " " + gen_txt, gen_wav))
return metainfo
# padded to max length mel batch
def padded_mel_batch(ref_mels):
max_mel_length = torch.LongTensor([mel.shape[-1] for mel in ref_mels]).amax()
padded_ref_mels = []
for mel in ref_mels:
padded_ref_mel = F.pad(mel, (0, max_mel_length - mel.shape[-1]), value=0)
padded_ref_mels.append(padded_ref_mel)
padded_ref_mels = torch.stack(padded_ref_mels)
padded_ref_mels = padded_ref_mels.permute(0, 2, 1)
return padded_ref_mels
# get prompts from metainfo containing: utt, prompt_text, prompt_wav, gt_text, gt_wav
def get_inference_prompt(
metainfo,
speed=1.0,
tokenizer="pinyin",
polyphone=True,
target_sample_rate=24000,
n_mel_channels=100,
hop_length=256,
target_rms=0.1,
use_truth_duration=False,
infer_batch_size=1,
num_buckets=200,
min_secs=3,
max_secs=40,
):
prompts_all = []
min_tokens = min_secs * target_sample_rate // hop_length
max_tokens = max_secs * target_sample_rate // hop_length
batch_accum = [0] * num_buckets
utts, ref_rms_list, ref_mels, ref_mel_lens, total_mel_lens, final_text_list = (
[[] for _ in range(num_buckets)] for _ in range(6)
)
mel_spectrogram = MelSpec(
target_sample_rate=target_sample_rate, n_mel_channels=n_mel_channels, hop_length=hop_length
)
for utt, prompt_text, prompt_wav, gt_text, gt_wav in tqdm(metainfo, desc="Processing prompts..."):
# Audio
ref_audio, ref_sr = torchaudio.load(prompt_wav)
ref_rms = torch.sqrt(torch.mean(torch.square(ref_audio)))
if ref_rms < target_rms:
ref_audio = ref_audio * target_rms / ref_rms
assert ref_audio.shape[-1] > 5000, f"Empty prompt wav: {prompt_wav}, or torchaudio backend issue."
if ref_sr != target_sample_rate:
resampler = torchaudio.transforms.Resample(ref_sr, target_sample_rate)
ref_audio = resampler(ref_audio)
# Text
if len(prompt_text[-1].encode("utf-8")) == 1:
prompt_text = prompt_text + " "
text = [prompt_text + gt_text]
if tokenizer == "pinyin":
text_list = convert_char_to_pinyin(text, polyphone=polyphone)
else:
text_list = text
# Duration, mel frame length
ref_mel_len = ref_audio.shape[-1] // hop_length
if use_truth_duration:
gt_audio, gt_sr = torchaudio.load(gt_wav)
if gt_sr != target_sample_rate:
resampler = torchaudio.transforms.Resample(gt_sr, target_sample_rate)
gt_audio = resampler(gt_audio)
total_mel_len = ref_mel_len + int(gt_audio.shape[-1] / hop_length / speed)
# # test vocoder resynthesis
# ref_audio = gt_audio
else:
ref_text_len = len(prompt_text.encode("utf-8"))
gen_text_len = len(gt_text.encode("utf-8"))
total_mel_len = ref_mel_len + int(ref_mel_len / ref_text_len * gen_text_len / speed)
# to mel spectrogram
ref_mel = mel_spectrogram(ref_audio)
ref_mel = ref_mel.squeeze(0)
# deal with batch
assert infer_batch_size > 0, "infer_batch_size should be greater than 0."
assert (
min_tokens <= total_mel_len <= max_tokens
), f"Audio {utt} has duration {total_mel_len*hop_length//target_sample_rate}s out of range [{min_secs}, {max_secs}]."
bucket_i = math.floor((total_mel_len - min_tokens) / (max_tokens - min_tokens + 1) * num_buckets)
utts[bucket_i].append(utt)
ref_rms_list[bucket_i].append(ref_rms)
ref_mels[bucket_i].append(ref_mel)
ref_mel_lens[bucket_i].append(ref_mel_len)
total_mel_lens[bucket_i].append(total_mel_len)
final_text_list[bucket_i].extend(text_list)
batch_accum[bucket_i] += total_mel_len
if batch_accum[bucket_i] >= infer_batch_size:
# print(f"\n{len(ref_mels[bucket_i][0][0])}\n{ref_mel_lens[bucket_i]}\n{total_mel_lens[bucket_i]}")
prompts_all.append(
(
utts[bucket_i],
ref_rms_list[bucket_i],
padded_mel_batch(ref_mels[bucket_i]),
ref_mel_lens[bucket_i],
total_mel_lens[bucket_i],
final_text_list[bucket_i],
)
)
batch_accum[bucket_i] = 0
(
utts[bucket_i],
ref_rms_list[bucket_i],
ref_mels[bucket_i],
ref_mel_lens[bucket_i],
total_mel_lens[bucket_i],
final_text_list[bucket_i],
) = [], [], [], [], [], []
# add residual
for bucket_i, bucket_frames in enumerate(batch_accum):
if bucket_frames > 0:
prompts_all.append(
(
utts[bucket_i],
ref_rms_list[bucket_i],
padded_mel_batch(ref_mels[bucket_i]),
ref_mel_lens[bucket_i],
total_mel_lens[bucket_i],
final_text_list[bucket_i],
)
)
# not only leave easy work for last workers
random.seed(666)
random.shuffle(prompts_all)
return prompts_all
# get wav_res_ref_text of seed-tts test metalst
# https://github.com/BytedanceSpeech/seed-tts-eval
def get_seed_tts_test(metalst, gen_wav_dir, gpus):
f = open(metalst)
lines = f.readlines()
f.close()
test_set_ = []
for line in tqdm(lines):
if len(line.strip().split("|")) == 5:
utt, prompt_text, prompt_wav, gt_text, gt_wav = line.strip().split("|")
elif len(line.strip().split("|")) == 4:
utt, prompt_text, prompt_wav, gt_text = line.strip().split("|")
if not os.path.exists(os.path.join(gen_wav_dir, utt + ".wav")):
continue
gen_wav = os.path.join(gen_wav_dir, utt + ".wav")
if not os.path.isabs(prompt_wav):
prompt_wav = os.path.join(os.path.dirname(metalst), prompt_wav)
test_set_.append((gen_wav, prompt_wav, gt_text))
num_jobs = len(gpus)
if num_jobs == 1:
return [(gpus[0], test_set_)]
wav_per_job = len(test_set_) // num_jobs + 1
test_set = []
for i in range(num_jobs):
test_set.append((gpus[i], test_set_[i * wav_per_job : (i + 1) * wav_per_job]))
return test_set
# get librispeech test-clean cross sentence test
def get_librispeech_test(metalst, gen_wav_dir, gpus, librispeech_test_clean_path, eval_ground_truth=False):
f = open(metalst)
lines = f.readlines()
f.close()
test_set_ = []
for line in tqdm(lines):
ref_utt, ref_dur, ref_txt, gen_utt, gen_dur, gen_txt = line.strip().split("\t")
if eval_ground_truth:
gen_spk_id, gen_chaptr_id, _ = gen_utt.split("-")
gen_wav = os.path.join(librispeech_test_clean_path, gen_spk_id, gen_chaptr_id, gen_utt + ".flac")
else:
if not os.path.exists(os.path.join(gen_wav_dir, gen_utt + ".wav")):
raise FileNotFoundError(f"Generated wav not found: {gen_utt}")
gen_wav = os.path.join(gen_wav_dir, gen_utt + ".wav")
ref_spk_id, ref_chaptr_id, _ = ref_utt.split("-")
ref_wav = os.path.join(librispeech_test_clean_path, ref_spk_id, ref_chaptr_id, ref_utt + ".flac")
test_set_.append((gen_wav, ref_wav, gen_txt))
num_jobs = len(gpus)
if num_jobs == 1:
return [(gpus[0], test_set_)]
wav_per_job = len(test_set_) // num_jobs + 1
test_set = []
for i in range(num_jobs):
test_set.append((gpus[i], test_set_[i * wav_per_job : (i + 1) * wav_per_job]))
return test_set
# load asr model
def load_asr_model(lang, ckpt_dir=""):
if lang == "zh":
from funasr import AutoModel
model = AutoModel(
model=os.path.join(ckpt_dir, "paraformer-zh"),
# vad_model = os.path.join(ckpt_dir, "fsmn-vad"),
# punc_model = os.path.join(ckpt_dir, "ct-punc"),
# spk_model = os.path.join(ckpt_dir, "cam++"),
disable_update=True,
) # following seed-tts setting
elif lang == "en":
from faster_whisper import WhisperModel
model_size = "large-v3" if ckpt_dir == "" else ckpt_dir
model = WhisperModel(model_size, device="cuda", compute_type="float16")
return model
# WER Evaluation, the way Seed-TTS does
def run_asr_wer(args):
rank, lang, test_set, ckpt_dir = args
if lang == "zh":
import zhconv
torch.cuda.set_device(rank)
elif lang == "en":
os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)
else:
raise NotImplementedError(
"lang support only 'zh' (funasr paraformer-zh), 'en' (faster-whisper-large-v3), for now."
)
asr_model = load_asr_model(lang, ckpt_dir=ckpt_dir)
from zhon.hanzi import punctuation
punctuation_all = punctuation + string.punctuation
wers = []
from jiwer import compute_measures
for gen_wav, prompt_wav, truth in tqdm(test_set):
if lang == "zh":
res = asr_model.generate(input=gen_wav, batch_size_s=300, disable_pbar=True)
hypo = res[0]["text"]
hypo = zhconv.convert(hypo, "zh-cn")
elif lang == "en":
segments, _ = asr_model.transcribe(gen_wav, beam_size=5, language="en")
hypo = ""
for segment in segments:
hypo = hypo + " " + segment.text
# raw_truth = truth
# raw_hypo = hypo
for x in punctuation_all:
truth = truth.replace(x, "")
hypo = hypo.replace(x, "")
truth = truth.replace(" ", " ")
hypo = hypo.replace(" ", " ")
if lang == "zh":
truth = " ".join([x for x in truth])
hypo = " ".join([x for x in hypo])
elif lang == "en":
truth = truth.lower()
hypo = hypo.lower()
measures = compute_measures(truth, hypo)
wer = measures["wer"]
# ref_list = truth.split(" ")
# subs = measures["substitutions"] / len(ref_list)
# dele = measures["deletions"] / len(ref_list)
# inse = measures["insertions"] / len(ref_list)
wers.append(wer)
return wers
# SIM Evaluation
def run_sim(args):
rank, test_set, ckpt_dir = args
device = f"cuda:{rank}"
model = ECAPA_TDNN_SMALL(feat_dim=1024, feat_type="wavlm_large", config_path=None)
state_dict = torch.load(ckpt_dir, weights_only=True, map_location=lambda storage, loc: storage)
model.load_state_dict(state_dict["model"], strict=False)
use_gpu = True if torch.cuda.is_available() else False
if use_gpu:
model = model.cuda(device)
model.eval()
sim_list = []
for wav1, wav2, truth in tqdm(test_set):
wav1, sr1 = torchaudio.load(wav1)
wav2, sr2 = torchaudio.load(wav2)
resample1 = torchaudio.transforms.Resample(orig_freq=sr1, new_freq=16000)
resample2 = torchaudio.transforms.Resample(orig_freq=sr2, new_freq=16000)
wav1 = resample1(wav1)
wav2 = resample2(wav2)
if use_gpu:
wav1 = wav1.cuda(device)
wav2 = wav2.cuda(device)
with torch.no_grad():
emb1 = model(wav1)
emb2 = model(wav2)
sim = F.cosine_similarity(emb1, emb2)[0].item()
# print(f"VSim score between two audios: {sim:.4f} (-1.0, 1.0).")
sim_list.append(sim)
return sim_list
# filter func for dirty data with many repetitions
@@ -588,35 +182,3 @@ def repetition_found(text, length=2, tolerance=10):
if count > tolerance:
return True
return False
# load model checkpoint for inference
def load_checkpoint(model, ckpt_path, device, use_ema=True):
if device == "cuda":
model = model.half()
ckpt_type = ckpt_path.split(".")[-1]
if ckpt_type == "safetensors":
from safetensors.torch import load_file
checkpoint = load_file(ckpt_path)
else:
checkpoint = torch.load(ckpt_path, weights_only=True)
if use_ema:
if ckpt_type == "safetensors":
checkpoint = {"ema_model_state_dict": checkpoint}
checkpoint["model_state_dict"] = {
k.replace("ema_model.", ""): v
for k, v in checkpoint["ema_model_state_dict"].items()
if k not in ["initted", "step"]
}
model.load_state_dict(checkpoint["model_state_dict"])
else:
if ckpt_type == "safetensors":
checkpoint = {"model_state_dict": checkpoint}
model.load_state_dict(checkpoint["model_state_dict"])
return model.to(device)

View File

@@ -1,138 +1,138 @@
import sys
import os
sys.path.append(os.getcwd())
from pathlib import Path
import json
import shutil
import argparse
import csv
import torchaudio
from tqdm import tqdm
from datasets.arrow_writer import ArrowWriter
from f5_tts.model.utils import (
convert_char_to_pinyin,
)
PRETRAINED_VOCAB_PATH = Path(__file__).parent.parent / "data/Emilia_ZH_EN_pinyin/vocab.txt"
def is_csv_wavs_format(input_dataset_dir):
fpath = Path(input_dataset_dir)
metadata = fpath / "metadata.csv"
wavs = fpath / "wavs"
return metadata.exists() and metadata.is_file() and wavs.exists() and wavs.is_dir()
def prepare_csv_wavs_dir(input_dir):
assert is_csv_wavs_format(input_dir), f"not csv_wavs format: {input_dir}"
input_dir = Path(input_dir)
metadata_path = input_dir / "metadata.csv"
audio_path_text_pairs = read_audio_text_pairs(metadata_path.as_posix())
sub_result, durations = [], []
vocab_set = set()
polyphone = True
for audio_path, text in audio_path_text_pairs:
if not Path(audio_path).exists():
print(f"audio {audio_path} not found, skipping")
continue
audio_duration = get_audio_duration(audio_path)
# assume tokenizer = "pinyin" ("pinyin" | "char")
text = convert_char_to_pinyin([text], polyphone=polyphone)[0]
sub_result.append({"audio_path": audio_path, "text": text, "duration": audio_duration})
durations.append(audio_duration)
vocab_set.update(list(text))
return sub_result, durations, vocab_set
def get_audio_duration(audio_path):
audio, sample_rate = torchaudio.load(audio_path)
num_channels = audio.shape[0]
return audio.shape[1] / (sample_rate * num_channels)
def read_audio_text_pairs(csv_file_path):
audio_text_pairs = []
parent = Path(csv_file_path).parent
with open(csv_file_path, mode="r", newline="", encoding="utf-8") as csvfile:
reader = csv.reader(csvfile, delimiter="|")
next(reader) # Skip the header row
for row in reader:
if len(row) >= 2:
audio_file = row[0].strip() # First column: audio file path
text = row[1].strip() # Second column: text
audio_file_path = parent / audio_file
audio_text_pairs.append((audio_file_path.as_posix(), text))
return audio_text_pairs
def save_prepped_dataset(out_dir, result, duration_list, text_vocab_set, is_finetune):
out_dir = Path(out_dir)
# save preprocessed dataset to disk
out_dir.mkdir(exist_ok=True, parents=True)
print(f"\nSaving to {out_dir} ...")
# dataset = Dataset.from_dict({"audio_path": audio_path_list, "text": text_list, "duration": duration_list}) # oom
# dataset.save_to_disk(f"data/{dataset_name}/raw", max_shard_size="2GB")
raw_arrow_path = out_dir / "raw.arrow"
with ArrowWriter(path=raw_arrow_path.as_posix(), writer_batch_size=1) as writer:
for line in tqdm(result, desc="Writing to raw.arrow ..."):
writer.write(line)
# dup a json separately saving duration in case for DynamicBatchSampler ease
dur_json_path = out_dir / "duration.json"
with open(dur_json_path.as_posix(), "w", encoding="utf-8") as f:
json.dump({"duration": duration_list}, f, ensure_ascii=False)
# vocab map, i.e. tokenizer
# add alphabets and symbols (optional, if plan to ft on de/fr etc.)
# if tokenizer == "pinyin":
# text_vocab_set.update([chr(i) for i in range(32, 127)] + [chr(i) for i in range(192, 256)])
voca_out_path = out_dir / "vocab.txt"
with open(voca_out_path.as_posix(), "w") as f:
for vocab in sorted(text_vocab_set):
f.write(vocab + "\n")
if is_finetune:
file_vocab_finetune = PRETRAINED_VOCAB_PATH.as_posix()
shutil.copy2(file_vocab_finetune, voca_out_path)
else:
with open(voca_out_path, "w") as f:
for vocab in sorted(text_vocab_set):
f.write(vocab + "\n")
dataset_name = out_dir.stem
print(f"\nFor {dataset_name}, sample count: {len(result)}")
print(f"For {dataset_name}, vocab size is: {len(text_vocab_set)}")
print(f"For {dataset_name}, total {sum(duration_list)/3600:.2f} hours")
def prepare_and_save_set(inp_dir, out_dir, is_finetune: bool = True):
if is_finetune:
assert PRETRAINED_VOCAB_PATH.exists(), f"pretrained vocab.txt not found: {PRETRAINED_VOCAB_PATH}"
sub_result, durations, vocab_set = prepare_csv_wavs_dir(inp_dir)
save_prepped_dataset(out_dir, sub_result, durations, vocab_set, is_finetune)
def cli():
# finetune: python scripts/prepare_csv_wavs.py /path/to/input_dir /path/to/output_dir_pinyin
# pretrain: python scripts/prepare_csv_wavs.py /path/to/output_dir_pinyin --pretrain
parser = argparse.ArgumentParser(description="Prepare and save dataset.")
parser.add_argument("inp_dir", type=str, help="Input directory containing the data.")
parser.add_argument("out_dir", type=str, help="Output directory to save the prepared data.")
parser.add_argument("--pretrain", action="store_true", help="Enable for new pretrain, otherwise is a fine-tune")
args = parser.parse_args()
prepare_and_save_set(args.inp_dir, args.out_dir, is_finetune=not args.pretrain)
if __name__ == "__main__":
cli()
import sys
import os
sys.path.append(os.getcwd())
from pathlib import Path
import json
import shutil
import argparse
import csv
import torchaudio
from tqdm import tqdm
from datasets.arrow_writer import ArrowWriter
from f5_tts.model.utils import (
convert_char_to_pinyin,
)
PRETRAINED_VOCAB_PATH = Path(__file__).parent.parent / "data/Emilia_ZH_EN_pinyin/vocab.txt"
def is_csv_wavs_format(input_dataset_dir):
fpath = Path(input_dataset_dir)
metadata = fpath / "metadata.csv"
wavs = fpath / "wavs"
return metadata.exists() and metadata.is_file() and wavs.exists() and wavs.is_dir()
def prepare_csv_wavs_dir(input_dir):
assert is_csv_wavs_format(input_dir), f"not csv_wavs format: {input_dir}"
input_dir = Path(input_dir)
metadata_path = input_dir / "metadata.csv"
audio_path_text_pairs = read_audio_text_pairs(metadata_path.as_posix())
sub_result, durations = [], []
vocab_set = set()
polyphone = True
for audio_path, text in audio_path_text_pairs:
if not Path(audio_path).exists():
print(f"audio {audio_path} not found, skipping")
continue
audio_duration = get_audio_duration(audio_path)
# assume tokenizer = "pinyin" ("pinyin" | "char")
text = convert_char_to_pinyin([text], polyphone=polyphone)[0]
sub_result.append({"audio_path": audio_path, "text": text, "duration": audio_duration})
durations.append(audio_duration)
vocab_set.update(list(text))
return sub_result, durations, vocab_set
def get_audio_duration(audio_path):
audio, sample_rate = torchaudio.load(audio_path)
num_channels = audio.shape[0]
return audio.shape[1] / (sample_rate * num_channels)
def read_audio_text_pairs(csv_file_path):
audio_text_pairs = []
parent = Path(csv_file_path).parent
with open(csv_file_path, mode="r", newline="", encoding="utf-8") as csvfile:
reader = csv.reader(csvfile, delimiter="|")
next(reader) # Skip the header row
for row in reader:
if len(row) >= 2:
audio_file = row[0].strip() # First column: audio file path
text = row[1].strip() # Second column: text
audio_file_path = parent / audio_file
audio_text_pairs.append((audio_file_path.as_posix(), text))
return audio_text_pairs
def save_prepped_dataset(out_dir, result, duration_list, text_vocab_set, is_finetune):
out_dir = Path(out_dir)
# save preprocessed dataset to disk
out_dir.mkdir(exist_ok=True, parents=True)
print(f"\nSaving to {out_dir} ...")
# dataset = Dataset.from_dict({"audio_path": audio_path_list, "text": text_list, "duration": duration_list}) # oom
# dataset.save_to_disk(f"data/{dataset_name}/raw", max_shard_size="2GB")
raw_arrow_path = out_dir / "raw.arrow"
with ArrowWriter(path=raw_arrow_path.as_posix(), writer_batch_size=1) as writer:
for line in tqdm(result, desc="Writing to raw.arrow ..."):
writer.write(line)
# dup a json separately saving duration in case for DynamicBatchSampler ease
dur_json_path = out_dir / "duration.json"
with open(dur_json_path.as_posix(), "w", encoding="utf-8") as f:
json.dump({"duration": duration_list}, f, ensure_ascii=False)
# vocab map, i.e. tokenizer
# add alphabets and symbols (optional, if plan to ft on de/fr etc.)
# if tokenizer == "pinyin":
# text_vocab_set.update([chr(i) for i in range(32, 127)] + [chr(i) for i in range(192, 256)])
voca_out_path = out_dir / "vocab.txt"
with open(voca_out_path.as_posix(), "w") as f:
for vocab in sorted(text_vocab_set):
f.write(vocab + "\n")
if is_finetune:
file_vocab_finetune = PRETRAINED_VOCAB_PATH.as_posix()
shutil.copy2(file_vocab_finetune, voca_out_path)
else:
with open(voca_out_path, "w") as f:
for vocab in sorted(text_vocab_set):
f.write(vocab + "\n")
dataset_name = out_dir.stem
print(f"\nFor {dataset_name}, sample count: {len(result)}")
print(f"For {dataset_name}, vocab size is: {len(text_vocab_set)}")
print(f"For {dataset_name}, total {sum(duration_list)/3600:.2f} hours")
def prepare_and_save_set(inp_dir, out_dir, is_finetune: bool = True):
if is_finetune:
assert PRETRAINED_VOCAB_PATH.exists(), f"pretrained vocab.txt not found: {PRETRAINED_VOCAB_PATH}"
sub_result, durations, vocab_set = prepare_csv_wavs_dir(inp_dir)
save_prepped_dataset(out_dir, sub_result, durations, vocab_set, is_finetune)
def cli():
# finetune: python scripts/prepare_csv_wavs.py /path/to/input_dir /path/to/output_dir_pinyin
# pretrain: python scripts/prepare_csv_wavs.py /path/to/output_dir_pinyin --pretrain
parser = argparse.ArgumentParser(description="Prepare and save dataset.")
parser.add_argument("inp_dir", type=str, help="Input directory containing the data.")
parser.add_argument("out_dir", type=str, help="Output directory to save the prepared data.")
parser.add_argument("--pretrain", action="store_true", help="Enable for new pretrain, otherwise is a fine-tune")
args = parser.parse_args()
prepare_and_save_set(args.inp_dir, args.out_dir, is_finetune=not args.pretrain)
if __name__ == "__main__":
cli()