|
|
|
|
|
|
|
|
|
|
|
import os |
|
import torch |
|
import numpy as np |
|
|
|
import json |
|
from tqdm import tqdm |
|
from sklearn.preprocessing import StandardScaler |
|
from utils.io import save_feature, save_txt, save_torch_audio |
|
from utils.util import has_existed |
|
from utils.tokenizer import extract_encodec_token |
|
from utils.stft import TacotronSTFT |
|
from utils.dsp import compress, audio_to_label |
|
from utils.data_utils import remove_outlier |
|
from preprocessors.metadata import replace_augment_name |
|
from scipy.interpolate import interp1d |
|
from utils.mel import ( |
|
extract_mel_features, |
|
extract_linear_features, |
|
extract_mel_features_tts, |
|
) |
|
|
|
ZERO = 1e-12 |
|
|
|
|
|
def extract_utt_acoustic_features_parallel(metadata, dataset_output, cfg, n_workers=1): |
|
"""Extract acoustic features from utterances using muliprocess |
|
|
|
Args: |
|
metadata (dict): dictionary that stores data in train.json and test.json files |
|
dataset_output (str): directory to store acoustic features |
|
cfg (dict): dictionary that stores configurations |
|
n_workers (int, optional): num of processes to extract features in parallel. Defaults to 1. |
|
|
|
Returns: |
|
list: acoustic features |
|
""" |
|
for utt in tqdm(metadata): |
|
if cfg.task_type == "tts": |
|
extract_utt_acoustic_features_tts(dataset_output, cfg, utt) |
|
if cfg.task_type == "svc": |
|
extract_utt_acoustic_features_svc(dataset_output, cfg, utt) |
|
if cfg.task_type == "vocoder": |
|
extract_utt_acoustic_features_vocoder(dataset_output, cfg, utt) |
|
if cfg.task_type == "tta": |
|
extract_utt_acoustic_features_tta(dataset_output, cfg, utt) |
|
|
|
|
|
def avg_phone_feature(feature, duration, interpolation=False): |
|
feature = feature[: sum(duration)] |
|
if interpolation: |
|
nonzero_ids = np.where(feature != 0)[0] |
|
interp_fn = interp1d( |
|
nonzero_ids, |
|
feature[nonzero_ids], |
|
fill_value=(feature[nonzero_ids[0]], feature[nonzero_ids[-1]]), |
|
bounds_error=False, |
|
) |
|
feature = interp_fn(np.arange(0, len(feature))) |
|
|
|
|
|
pos = 0 |
|
for i, d in enumerate(duration): |
|
if d > 0: |
|
feature[i] = np.mean(feature[pos : pos + d]) |
|
else: |
|
feature[i] = 0 |
|
pos += d |
|
feature = feature[: len(duration)] |
|
return feature |
|
|
|
|
|
def extract_utt_acoustic_features_serial(metadata, dataset_output, cfg): |
|
"""Extract acoustic features from utterances (in single process) |
|
|
|
Args: |
|
metadata (dict): dictionary that stores data in train.json and test.json files |
|
dataset_output (str): directory to store acoustic features |
|
cfg (dict): dictionary that stores configurations |
|
|
|
""" |
|
for utt in tqdm(metadata): |
|
if cfg.task_type == "tts": |
|
extract_utt_acoustic_features_tts(dataset_output, cfg, utt) |
|
if cfg.task_type == "svc": |
|
extract_utt_acoustic_features_svc(dataset_output, cfg, utt) |
|
if cfg.task_type == "vocoder": |
|
extract_utt_acoustic_features_vocoder(dataset_output, cfg, utt) |
|
if cfg.task_type == "tta": |
|
extract_utt_acoustic_features_tta(dataset_output, cfg, utt) |
|
|
|
|
|
def __extract_utt_acoustic_features(dataset_output, cfg, utt): |
|
"""Extract acoustic features from utterances (in single process) |
|
|
|
Args: |
|
dataset_output (str): directory to store acoustic features |
|
cfg (dict): dictionary that stores configurations |
|
utt (dict): utterance info including dataset, singer, uid:{singer}_{song}_{index}, |
|
path to utternace, duration, utternace index |
|
|
|
""" |
|
from utils import audio, f0, world, duration |
|
|
|
uid = utt["Uid"] |
|
wav_path = utt["Path"] |
|
if os.path.exists(os.path.join(dataset_output, cfg.preprocess.raw_data)): |
|
wav_path = os.path.join( |
|
dataset_output, cfg.preprocess.raw_data, utt["Singer"], uid + ".wav" |
|
) |
|
|
|
with torch.no_grad(): |
|
|
|
wav_torch, _ = audio.load_audio_torch(wav_path, cfg.preprocess.sample_rate) |
|
wav = wav_torch.cpu().numpy() |
|
|
|
|
|
if cfg.preprocess.extract_duration: |
|
durations, phones, start, end = duration.get_duration( |
|
utt, wav, cfg.preprocess |
|
) |
|
save_feature(dataset_output, cfg.preprocess.duration_dir, uid, durations) |
|
save_txt(dataset_output, cfg.preprocess.lab_dir, uid, phones) |
|
wav = wav[start:end].astype(np.float32) |
|
wav_torch = torch.from_numpy(wav).to(wav_torch.device) |
|
|
|
if cfg.preprocess.extract_linear_spec: |
|
linear = extract_linear_features(wav_torch.unsqueeze(0), cfg.preprocess) |
|
save_feature( |
|
dataset_output, cfg.preprocess.linear_dir, uid, linear.cpu().numpy() |
|
) |
|
|
|
if cfg.preprocess.extract_mel: |
|
if cfg.preprocess.mel_extract_mode == "taco": |
|
_stft = TacotronSTFT( |
|
sampling_rate=cfg.preprocess.sample_rate, |
|
win_length=cfg.preprocess.win_size, |
|
hop_length=cfg.preprocess.hop_size, |
|
filter_length=cfg.preprocess.n_fft, |
|
n_mel_channels=cfg.preprocess.n_mel, |
|
mel_fmin=cfg.preprocess.fmin, |
|
mel_fmax=cfg.preprocess.fmax, |
|
) |
|
mel = extract_mel_features( |
|
wav_torch.unsqueeze(0), cfg.preprocess, taco=True, _stft=_stft |
|
) |
|
if cfg.preprocess.extract_duration: |
|
mel = mel[:, : sum(durations)] |
|
else: |
|
mel = extract_mel_features(wav_torch.unsqueeze(0), cfg.preprocess) |
|
save_feature(dataset_output, cfg.preprocess.mel_dir, uid, mel.cpu().numpy()) |
|
|
|
if cfg.preprocess.extract_energy: |
|
if ( |
|
cfg.preprocess.energy_extract_mode == "from_mel" |
|
and cfg.preprocess.extract_mel |
|
): |
|
energy = (mel.exp() ** 2).sum(0).sqrt().cpu().numpy() |
|
elif cfg.preprocess.energy_extract_mode == "from_waveform": |
|
energy = audio.energy(wav, cfg.preprocess) |
|
elif cfg.preprocess.energy_extract_mode == "from_tacotron_stft": |
|
_stft = TacotronSTFT( |
|
sampling_rate=cfg.preprocess.sample_rate, |
|
win_length=cfg.preprocess.win_size, |
|
hop_length=cfg.preprocess.hop_size, |
|
filter_length=cfg.preprocess.n_fft, |
|
n_mel_channels=cfg.preprocess.n_mel, |
|
mel_fmin=cfg.preprocess.fmin, |
|
mel_fmax=cfg.preprocess.fmax, |
|
) |
|
_, energy = audio.get_energy_from_tacotron(wav, _stft) |
|
else: |
|
assert cfg.preprocess.energy_extract_mode in [ |
|
"from_mel", |
|
"from_waveform", |
|
"from_tacotron_stft", |
|
], f"{cfg.preprocess.energy_extract_mode} not in supported energy_extract_mode [from_mel, from_waveform, from_tacotron_stft]" |
|
if cfg.preprocess.extract_duration: |
|
energy = energy[: sum(durations)] |
|
phone_energy = avg_phone_feature(energy, durations) |
|
save_feature( |
|
dataset_output, cfg.preprocess.phone_energy_dir, uid, phone_energy |
|
) |
|
|
|
save_feature(dataset_output, cfg.preprocess.energy_dir, uid, energy) |
|
|
|
if cfg.preprocess.extract_pitch: |
|
pitch = f0.get_f0(wav, cfg.preprocess) |
|
if cfg.preprocess.extract_duration: |
|
pitch = pitch[: sum(durations)] |
|
phone_pitch = avg_phone_feature(pitch, durations, interpolation=True) |
|
save_feature( |
|
dataset_output, cfg.preprocess.phone_pitch_dir, uid, phone_pitch |
|
) |
|
save_feature(dataset_output, cfg.preprocess.pitch_dir, uid, pitch) |
|
|
|
if cfg.preprocess.extract_uv: |
|
assert isinstance(pitch, np.ndarray) |
|
uv = pitch != 0 |
|
save_feature(dataset_output, cfg.preprocess.uv_dir, uid, uv) |
|
|
|
if cfg.preprocess.extract_audio: |
|
save_feature(dataset_output, cfg.preprocess.audio_dir, uid, wav) |
|
|
|
if cfg.preprocess.extract_label: |
|
if cfg.preprocess.is_mu_law: |
|
|
|
wav = compress(wav, cfg.preprocess.bits) |
|
label = audio_to_label(wav, cfg.preprocess.bits) |
|
save_feature(dataset_output, cfg.preprocess.label_dir, uid, label) |
|
|
|
if cfg.preprocess.extract_acoustic_token: |
|
if cfg.preprocess.acoustic_token_extractor == "Encodec": |
|
codes = extract_encodec_token(wav_path) |
|
save_feature( |
|
dataset_output, cfg.preprocess.acoustic_token_dir, uid, codes |
|
) |
|
|
|
|
|
|
|
def extract_utt_acoustic_features_tts(dataset_output, cfg, utt): |
|
"""Extract acoustic features from utterances (in single process) |
|
|
|
Args: |
|
dataset_output (str): directory to store acoustic features |
|
cfg (dict): dictionary that stores configurations |
|
utt (dict): utterance info including dataset, singer, uid:{singer}_{song}_{index}, |
|
path to utternace, duration, utternace index |
|
|
|
""" |
|
from utils import audio, f0, world, duration |
|
|
|
uid = utt["Uid"] |
|
wav_path = utt["Path"] |
|
if os.path.exists(os.path.join(dataset_output, cfg.preprocess.raw_data)): |
|
wav_path = os.path.join( |
|
dataset_output, cfg.preprocess.raw_data, utt["Singer"], uid + ".wav" |
|
) |
|
if not os.path.exists(wav_path): |
|
wav_path = os.path.join( |
|
dataset_output, cfg.preprocess.raw_data, utt["Singer"], uid + ".flac" |
|
) |
|
|
|
assert os.path.exists(wav_path) |
|
|
|
with torch.no_grad(): |
|
|
|
wav_torch, _ = audio.load_audio_torch(wav_path, cfg.preprocess.sample_rate) |
|
wav = wav_torch.cpu().numpy() |
|
|
|
|
|
if cfg.preprocess.extract_duration: |
|
durations, phones, start, end = duration.get_duration( |
|
utt, wav, cfg.preprocess |
|
) |
|
save_feature(dataset_output, cfg.preprocess.duration_dir, uid, durations) |
|
save_txt(dataset_output, cfg.preprocess.lab_dir, uid, phones) |
|
wav = wav[start:end].astype(np.float32) |
|
wav_torch = torch.from_numpy(wav).to(wav_torch.device) |
|
|
|
if cfg.preprocess.extract_linear_spec: |
|
from utils.mel import extract_linear_features |
|
|
|
linear = extract_linear_features(wav_torch.unsqueeze(0), cfg.preprocess) |
|
save_feature( |
|
dataset_output, cfg.preprocess.linear_dir, uid, linear.cpu().numpy() |
|
) |
|
|
|
if cfg.preprocess.extract_mel: |
|
from utils.mel import extract_mel_features |
|
|
|
if cfg.preprocess.mel_extract_mode == "taco": |
|
_stft = TacotronSTFT( |
|
sampling_rate=cfg.preprocess.sample_rate, |
|
win_length=cfg.preprocess.win_size, |
|
hop_length=cfg.preprocess.hop_size, |
|
filter_length=cfg.preprocess.n_fft, |
|
n_mel_channels=cfg.preprocess.n_mel, |
|
mel_fmin=cfg.preprocess.fmin, |
|
mel_fmax=cfg.preprocess.fmax, |
|
) |
|
mel = extract_mel_features_tts( |
|
wav_torch.unsqueeze(0), cfg.preprocess, taco=True, _stft=_stft |
|
) |
|
if cfg.preprocess.extract_duration: |
|
mel = mel[:, : sum(durations)] |
|
else: |
|
mel = extract_mel_features(wav_torch.unsqueeze(0), cfg.preprocess) |
|
save_feature(dataset_output, cfg.preprocess.mel_dir, uid, mel.cpu().numpy()) |
|
|
|
if cfg.preprocess.extract_energy: |
|
if ( |
|
cfg.preprocess.energy_extract_mode == "from_mel" |
|
and cfg.preprocess.extract_mel |
|
): |
|
energy = (mel.exp() ** 2).sum(0).sqrt().cpu().numpy() |
|
elif cfg.preprocess.energy_extract_mode == "from_waveform": |
|
energy = audio.energy(wav, cfg.preprocess) |
|
elif cfg.preprocess.energy_extract_mode == "from_tacotron_stft": |
|
_stft = TacotronSTFT( |
|
sampling_rate=cfg.preprocess.sample_rate, |
|
win_length=cfg.preprocess.win_size, |
|
hop_length=cfg.preprocess.hop_size, |
|
filter_length=cfg.preprocess.n_fft, |
|
n_mel_channels=cfg.preprocess.n_mel, |
|
mel_fmin=cfg.preprocess.fmin, |
|
mel_fmax=cfg.preprocess.fmax, |
|
) |
|
_, energy = audio.get_energy_from_tacotron(wav, _stft) |
|
else: |
|
assert cfg.preprocess.energy_extract_mode in [ |
|
"from_mel", |
|
"from_waveform", |
|
"from_tacotron_stft", |
|
], f"{cfg.preprocess.energy_extract_mode} not in supported energy_extract_mode [from_mel, from_waveform, from_tacotron_stft]" |
|
if cfg.preprocess.extract_duration: |
|
energy = energy[: sum(durations)] |
|
phone_energy = avg_phone_feature(energy, durations) |
|
save_feature( |
|
dataset_output, cfg.preprocess.phone_energy_dir, uid, phone_energy |
|
) |
|
|
|
save_feature(dataset_output, cfg.preprocess.energy_dir, uid, energy) |
|
|
|
if cfg.preprocess.extract_pitch: |
|
pitch = f0.get_f0(wav, cfg.preprocess) |
|
if cfg.preprocess.extract_duration: |
|
pitch = pitch[: sum(durations)] |
|
phone_pitch = avg_phone_feature(pitch, durations, interpolation=True) |
|
save_feature( |
|
dataset_output, cfg.preprocess.phone_pitch_dir, uid, phone_pitch |
|
) |
|
save_feature(dataset_output, cfg.preprocess.pitch_dir, uid, pitch) |
|
|
|
if cfg.preprocess.extract_uv: |
|
assert isinstance(pitch, np.ndarray) |
|
uv = pitch != 0 |
|
save_feature(dataset_output, cfg.preprocess.uv_dir, uid, uv) |
|
|
|
if cfg.preprocess.extract_audio: |
|
save_torch_audio( |
|
dataset_output, |
|
cfg.preprocess.audio_dir, |
|
uid, |
|
wav_torch, |
|
cfg.preprocess.sample_rate, |
|
) |
|
|
|
if cfg.preprocess.extract_label: |
|
if cfg.preprocess.is_mu_law: |
|
|
|
wav = compress(wav, cfg.preprocess.bits) |
|
label = audio_to_label(wav, cfg.preprocess.bits) |
|
save_feature(dataset_output, cfg.preprocess.label_dir, uid, label) |
|
|
|
if cfg.preprocess.extract_acoustic_token: |
|
if cfg.preprocess.acoustic_token_extractor == "Encodec": |
|
codes = extract_encodec_token(wav_path) |
|
save_feature( |
|
dataset_output, cfg.preprocess.acoustic_token_dir, uid, codes |
|
) |
|
|
|
|
|
def extract_utt_acoustic_features_svc(dataset_output, cfg, utt): |
|
__extract_utt_acoustic_features(dataset_output, cfg, utt) |
|
|
|
|
|
def extract_utt_acoustic_features_tta(dataset_output, cfg, utt): |
|
__extract_utt_acoustic_features(dataset_output, cfg, utt) |
|
|
|
|
|
def extract_utt_acoustic_features_vocoder(dataset_output, cfg, utt): |
|
"""Extract acoustic features from utterances (in single process) |
|
|
|
Args: |
|
dataset_output (str): directory to store acoustic features |
|
cfg (dict): dictionary that stores configurations |
|
utt (dict): utterance info including dataset, singer, uid:{singer}_{song}_{index}, |
|
path to utternace, duration, utternace index |
|
|
|
""" |
|
from utils import audio, f0, world, duration |
|
|
|
uid = utt["Uid"] |
|
wav_path = utt["Path"] |
|
|
|
with torch.no_grad(): |
|
|
|
wav_torch, _ = audio.load_audio_torch(wav_path, cfg.preprocess.sample_rate) |
|
wav = wav_torch.cpu().numpy() |
|
|
|
|
|
if cfg.preprocess.extract_mel: |
|
from utils.mel import extract_mel_features |
|
|
|
mel = extract_mel_features(wav_torch.unsqueeze(0), cfg.preprocess) |
|
save_feature(dataset_output, cfg.preprocess.mel_dir, uid, mel.cpu().numpy()) |
|
|
|
if cfg.preprocess.extract_energy: |
|
if ( |
|
cfg.preprocess.energy_extract_mode == "from_mel" |
|
and cfg.preprocess.extract_mel |
|
): |
|
energy = (mel.exp() ** 2).sum(0).sqrt().cpu().numpy() |
|
elif cfg.preprocess.energy_extract_mode == "from_waveform": |
|
energy = audio.energy(wav, cfg.preprocess) |
|
else: |
|
assert cfg.preprocess.energy_extract_mode in [ |
|
"from_mel", |
|
"from_waveform", |
|
], f"{cfg.preprocess.energy_extract_mode} not in supported energy_extract_mode [from_mel, from_waveform, from_tacotron_stft]" |
|
|
|
save_feature(dataset_output, cfg.preprocess.energy_dir, uid, energy) |
|
|
|
if cfg.preprocess.extract_pitch: |
|
pitch = f0.get_f0(wav, cfg.preprocess) |
|
save_feature(dataset_output, cfg.preprocess.pitch_dir, uid, pitch) |
|
|
|
if cfg.preprocess.extract_uv: |
|
assert isinstance(pitch, np.ndarray) |
|
uv = pitch != 0 |
|
save_feature(dataset_output, cfg.preprocess.uv_dir, uid, uv) |
|
|
|
if cfg.preprocess.extract_amplitude_phase: |
|
from utils.mel import amplitude_phase_spectrum |
|
|
|
log_amplitude, phase, real, imaginary = amplitude_phase_spectrum( |
|
wav_torch.unsqueeze(0), cfg.preprocess |
|
) |
|
save_feature( |
|
dataset_output, cfg.preprocess.log_amplitude_dir, uid, log_amplitude |
|
) |
|
save_feature(dataset_output, cfg.preprocess.phase_dir, uid, phase) |
|
save_feature(dataset_output, cfg.preprocess.real_dir, uid, real) |
|
save_feature(dataset_output, cfg.preprocess.imaginary_dir, uid, imaginary) |
|
|
|
if cfg.preprocess.extract_audio: |
|
save_feature(dataset_output, cfg.preprocess.audio_dir, uid, wav) |
|
|
|
if cfg.preprocess.extract_label: |
|
if cfg.preprocess.is_mu_law: |
|
|
|
wav = compress(wav, cfg.preprocess.bits) |
|
label = audio_to_label(wav, cfg.preprocess.bits) |
|
save_feature(dataset_output, cfg.preprocess.label_dir, uid, label) |
|
|
|
|
|
def cal_normalized_mel(mel, dataset_name, cfg): |
|
""" |
|
mel: (n_mels, T) |
|
""" |
|
|
|
mel_min, mel_max = load_mel_extrema(cfg, dataset_name) |
|
mel_norm = normalize_mel_channel(mel, mel_min, mel_max) |
|
return mel_norm |
|
|
|
|
|
def cal_mel_min_max(dataset, output_path, cfg, metadata=None): |
|
dataset_output = os.path.join(output_path, dataset) |
|
|
|
if metadata is None: |
|
metadata = [] |
|
for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
|
dataset_file = os.path.join(dataset_output, "{}.json".format(dataset_type)) |
|
with open(dataset_file, "r") as f: |
|
metadata.extend(json.load(f)) |
|
|
|
tmp_mel_min = [] |
|
tmp_mel_max = [] |
|
for item in metadata: |
|
mel_path = os.path.join( |
|
dataset_output, cfg.preprocess.mel_dir, item["Uid"] + ".npy" |
|
) |
|
if not os.path.exists(mel_path): |
|
continue |
|
mel = np.load(mel_path) |
|
if mel.shape[0] != cfg.preprocess.n_mel: |
|
mel = mel.T |
|
|
|
assert mel.shape[0] == cfg.preprocess.n_mel |
|
|
|
tmp_mel_min.append(np.min(mel, axis=-1)) |
|
tmp_mel_max.append(np.max(mel, axis=-1)) |
|
|
|
mel_min = np.min(tmp_mel_min, axis=0) |
|
mel_max = np.max(tmp_mel_max, axis=0) |
|
|
|
|
|
mel_min_max_dir = os.path.join(dataset_output, cfg.preprocess.mel_min_max_stats_dir) |
|
os.makedirs(mel_min_max_dir, exist_ok=True) |
|
|
|
mel_min_path = os.path.join(mel_min_max_dir, "mel_min.npy") |
|
mel_max_path = os.path.join(mel_min_max_dir, "mel_max.npy") |
|
np.save(mel_min_path, mel_min) |
|
np.save(mel_max_path, mel_max) |
|
|
|
|
|
def denorm_for_pred_mels(cfg, dataset_name, split, pred): |
|
""" |
|
Args: |
|
pred: a list whose every element is (frame_len, n_mels) |
|
Return: |
|
similar like pred |
|
""" |
|
mel_min, mel_max = load_mel_extrema(cfg.preprocess, dataset_name) |
|
recovered_mels = [ |
|
denormalize_mel_channel(mel.T, mel_min, mel_max).T for mel in pred |
|
] |
|
|
|
return recovered_mels |
|
|
|
|
|
def load_mel_extrema(cfg, dataset_name): |
|
data_dir = os.path.join(cfg.processed_dir, dataset_name, cfg.mel_min_max_stats_dir) |
|
|
|
min_file = os.path.join(data_dir, "mel_min.npy") |
|
max_file = os.path.join(data_dir, "mel_max.npy") |
|
|
|
mel_min = np.load(min_file) |
|
mel_max = np.load(max_file) |
|
|
|
return mel_min, mel_max |
|
|
|
|
|
def denormalize_mel_channel(mel, mel_min, mel_max): |
|
mel_min = np.expand_dims(mel_min, -1) |
|
mel_max = np.expand_dims(mel_max, -1) |
|
return (mel + 1) / 2 * (mel_max - mel_min + ZERO) + mel_min |
|
|
|
|
|
def normalize_mel_channel(mel, mel_min, mel_max): |
|
""" |
|
mel: (n_mels, T) |
|
mel_min, mel_max: (n_mels) |
|
""" |
|
mel_min = np.expand_dims(mel_min, -1) |
|
mel_max = np.expand_dims(mel_max, -1) |
|
return (mel - mel_min) / (mel_max - mel_min + ZERO) * 2 - 1 |
|
|
|
|
|
def normalize(dataset, feat_dir, cfg): |
|
dataset_output = os.path.join(cfg.preprocess.processed_dir, dataset) |
|
print(f"normalize {feat_dir}") |
|
|
|
max_value = np.finfo(np.float64).min |
|
min_value = np.finfo(np.float64).max |
|
|
|
scaler = StandardScaler() |
|
feat_files = os.listdir(os.path.join(dataset_output, feat_dir)) |
|
|
|
for feat_file in tqdm(feat_files): |
|
feat_file = os.path.join(dataset_output, feat_dir, feat_file) |
|
if not feat_file.endswith(".npy"): |
|
continue |
|
feat = np.load(feat_file) |
|
max_value = max(max_value, max(feat)) |
|
min_value = min(min_value, min(feat)) |
|
scaler.partial_fit(feat.reshape((-1, 1))) |
|
mean = scaler.mean_[0] |
|
std = scaler.scale_[0] |
|
stat = np.array([min_value, max_value, mean, std]) |
|
stat_npy = os.path.join(dataset_output, f"{feat_dir}_stat.npy") |
|
np.save(stat_npy, stat) |
|
return mean, std, min_value, max_value |
|
|
|
|
|
def load_normalized(feat_dir, dataset_name, cfg): |
|
dataset_output = os.path.join(cfg.preprocess.processed_dir, dataset_name) |
|
stat_npy = os.path.join(dataset_output, f"{feat_dir}_stat.npy") |
|
min_value, max_value, mean, std = np.load(stat_npy) |
|
return mean, std, min_value, max_value |
|
|
|
|
|
def cal_pitch_statistics_svc(dataset, output_path, cfg, metadata=None): |
|
|
|
dataset_dir = os.path.join(output_path, dataset) |
|
save_dir = os.path.join(dataset_dir, cfg.preprocess.pitch_dir) |
|
os.makedirs(save_dir, exist_ok=True) |
|
if has_existed(os.path.join(save_dir, "statistics.json")): |
|
return |
|
|
|
if metadata is None: |
|
|
|
singers = json.load(open(os.path.join(dataset_dir, "singers.json"), "r")) |
|
|
|
|
|
metadata = [] |
|
for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
|
dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
|
with open(dataset_file, "r") as f: |
|
metadata.extend(json.load(f)) |
|
else: |
|
singers = list(set([item["Singer"] for item in metadata])) |
|
singers = { |
|
"{}_{}".format(dataset, name): idx for idx, name in enumerate(singers) |
|
} |
|
|
|
|
|
pitch_scalers = [[] for _ in range(len(singers))] |
|
total_pitch_scalers = [[] for _ in range(len(singers))] |
|
|
|
for utt_info in tqdm(metadata, desc="Loading F0..."): |
|
|
|
singer = utt_info["Singer"] |
|
pitch_path = os.path.join( |
|
dataset_dir, cfg.preprocess.pitch_dir, utt_info["Uid"] + ".npy" |
|
) |
|
|
|
if not os.path.exists(pitch_path): |
|
continue |
|
total_pitch = np.load(pitch_path) |
|
assert len(total_pitch) > 0 |
|
|
|
pitch = total_pitch[total_pitch != 0] |
|
spkid = singers[f"{replace_augment_name(dataset)}_{singer}"] |
|
|
|
|
|
pitch_scalers[spkid].extend(pitch.tolist()) |
|
|
|
total_pitch_scalers[spkid].extend(total_pitch.tolist()) |
|
|
|
|
|
sta_dict = {} |
|
for singer in tqdm(singers, desc="Singers statistics"): |
|
spkid = singers[singer] |
|
|
|
mean, std, min, max, median = ( |
|
np.mean(pitch_scalers[spkid]), |
|
np.std(pitch_scalers[spkid]), |
|
np.min(pitch_scalers[spkid]), |
|
np.max(pitch_scalers[spkid]), |
|
np.median(pitch_scalers[spkid]), |
|
) |
|
|
|
|
|
mean_t, std_t, min_t, max_t, median_t = ( |
|
np.mean(total_pitch_scalers[spkid]), |
|
np.std(total_pitch_scalers[spkid]), |
|
np.min(total_pitch_scalers[spkid]), |
|
np.max(total_pitch_scalers[spkid]), |
|
np.median(total_pitch_scalers[spkid]), |
|
) |
|
sta_dict[singer] = { |
|
"voiced_positions": { |
|
"mean": mean, |
|
"std": std, |
|
"median": median, |
|
"min": min, |
|
"max": max, |
|
}, |
|
"total_positions": { |
|
"mean": mean_t, |
|
"std": std_t, |
|
"median": median_t, |
|
"min": min_t, |
|
"max": max_t, |
|
}, |
|
} |
|
|
|
|
|
with open(os.path.join(save_dir, "statistics.json"), "w") as f: |
|
json.dump(sta_dict, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
def cal_pitch_statistics(dataset, output_path, cfg): |
|
|
|
dataset_dir = os.path.join(output_path, dataset) |
|
if cfg.preprocess.use_phone_pitch: |
|
pitch_dir = cfg.preprocess.phone_pitch_dir |
|
else: |
|
pitch_dir = cfg.preprocess.pitch_dir |
|
save_dir = os.path.join(dataset_dir, pitch_dir) |
|
|
|
os.makedirs(save_dir, exist_ok=True) |
|
if has_existed(os.path.join(save_dir, "statistics.json")): |
|
return |
|
|
|
singers = json.load(open(os.path.join(dataset_dir, "singers.json"), "r")) |
|
|
|
|
|
metadata = [] |
|
for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
|
dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
|
with open(dataset_file, "r") as f: |
|
metadata.extend(json.load(f)) |
|
|
|
|
|
pitch_scalers = [[] for _ in range(len(singers))] |
|
total_pitch_scalers = [[] for _ in range(len(singers))] |
|
|
|
for utt_info in metadata: |
|
utt = f'{utt_info["Dataset"]}_{utt_info["Uid"]}' |
|
singer = utt_info["Singer"] |
|
pitch_path = os.path.join(dataset_dir, pitch_dir, utt_info["Uid"] + ".npy") |
|
|
|
if not os.path.exists(pitch_path): |
|
continue |
|
total_pitch = np.load(pitch_path) |
|
assert len(total_pitch) > 0 |
|
|
|
|
|
if cfg.preprocess.pitch_remove_outlier: |
|
pitch = remove_outlier(total_pitch) |
|
spkid = singers[f"{replace_augment_name(dataset)}_{singer}"] |
|
|
|
|
|
pitch_scalers[spkid].extend(pitch.tolist()) |
|
|
|
total_pitch_scalers[spkid].extend(total_pitch.tolist()) |
|
|
|
|
|
sta_dict = {} |
|
for singer in singers: |
|
spkid = singers[singer] |
|
|
|
mean, std, min, max, median = ( |
|
np.mean(pitch_scalers[spkid]), |
|
np.std(pitch_scalers[spkid]), |
|
np.min(pitch_scalers[spkid]), |
|
np.max(pitch_scalers[spkid]), |
|
np.median(pitch_scalers[spkid]), |
|
) |
|
|
|
|
|
mean_t, std_t, min_t, max_t, median_t = ( |
|
np.mean(total_pitch_scalers[spkid]), |
|
np.std(total_pitch_scalers[spkid]), |
|
np.min(total_pitch_scalers[spkid]), |
|
np.max(total_pitch_scalers[spkid]), |
|
np.median(total_pitch_scalers[spkid]), |
|
) |
|
sta_dict[singer] = { |
|
"voiced_positions": { |
|
"mean": mean, |
|
"std": std, |
|
"median": median, |
|
"min": min, |
|
"max": max, |
|
}, |
|
"total_positions": { |
|
"mean": mean_t, |
|
"std": std_t, |
|
"median": median_t, |
|
"min": min_t, |
|
"max": max_t, |
|
}, |
|
} |
|
|
|
|
|
with open(os.path.join(save_dir, "statistics.json"), "w") as f: |
|
json.dump(sta_dict, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
def cal_energy_statistics(dataset, output_path, cfg): |
|
|
|
dataset_dir = os.path.join(output_path, dataset) |
|
if cfg.preprocess.use_phone_energy: |
|
energy_dir = cfg.preprocess.phone_energy_dir |
|
else: |
|
energy_dir = cfg.preprocess.energy_dir |
|
save_dir = os.path.join(dataset_dir, energy_dir) |
|
os.makedirs(save_dir, exist_ok=True) |
|
print(os.path.join(save_dir, "statistics.json")) |
|
if has_existed(os.path.join(save_dir, "statistics.json")): |
|
return |
|
|
|
singers = json.load(open(os.path.join(dataset_dir, "singers.json"), "r")) |
|
|
|
|
|
metadata = [] |
|
for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
|
dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
|
with open(dataset_file, "r") as f: |
|
metadata.extend(json.load(f)) |
|
|
|
|
|
energy_scalers = [[] for _ in range(len(singers))] |
|
total_energy_scalers = [[] for _ in range(len(singers))] |
|
|
|
for utt_info in metadata: |
|
utt = f'{utt_info["Dataset"]}_{utt_info["Uid"]}' |
|
singer = utt_info["Singer"] |
|
energy_path = os.path.join(dataset_dir, energy_dir, utt_info["Uid"] + ".npy") |
|
|
|
if not os.path.exists(energy_path): |
|
continue |
|
total_energy = np.load(energy_path) |
|
assert len(total_energy) > 0 |
|
|
|
|
|
if cfg.preprocess.energy_remove_outlier: |
|
energy = remove_outlier(total_energy) |
|
spkid = singers[f"{replace_augment_name(dataset)}_{singer}"] |
|
|
|
|
|
energy_scalers[spkid].extend(energy.tolist()) |
|
|
|
total_energy_scalers[spkid].extend(total_energy.tolist()) |
|
|
|
|
|
sta_dict = {} |
|
for singer in singers: |
|
spkid = singers[singer] |
|
|
|
mean, std, min, max, median = ( |
|
np.mean(energy_scalers[spkid]), |
|
np.std(energy_scalers[spkid]), |
|
np.min(energy_scalers[spkid]), |
|
np.max(energy_scalers[spkid]), |
|
np.median(energy_scalers[spkid]), |
|
) |
|
|
|
|
|
mean_t, std_t, min_t, max_t, median_t = ( |
|
np.mean(total_energy_scalers[spkid]), |
|
np.std(total_energy_scalers[spkid]), |
|
np.min(total_energy_scalers[spkid]), |
|
np.max(total_energy_scalers[spkid]), |
|
np.median(total_energy_scalers[spkid]), |
|
) |
|
sta_dict[singer] = { |
|
"voiced_positions": { |
|
"mean": mean, |
|
"std": std, |
|
"median": median, |
|
"min": min, |
|
"max": max, |
|
}, |
|
"total_positions": { |
|
"mean": mean_t, |
|
"std": std_t, |
|
"median": median_t, |
|
"min": min_t, |
|
"max": max_t, |
|
}, |
|
} |
|
|
|
|
|
with open(os.path.join(save_dir, "statistics.json"), "w") as f: |
|
json.dump(sta_dict, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
def copy_acoustic_features(metadata, dataset_dir, src_dataset_dir, cfg): |
|
"""Copy acoustic features from src_dataset_dir to dataset_dir |
|
|
|
Args: |
|
metadata (dict): dictionary that stores data in train.json and test.json files |
|
dataset_dir (str): directory to store acoustic features |
|
src_dataset_dir (str): directory to store acoustic features |
|
cfg (dict): dictionary that stores configurations |
|
|
|
""" |
|
|
|
if cfg.preprocess.extract_mel: |
|
if not has_existed(os.path.join(dataset_dir, cfg.preprocess.mel_dir)): |
|
os.makedirs( |
|
os.path.join(dataset_dir, cfg.preprocess.mel_dir), exist_ok=True |
|
) |
|
print( |
|
"Copying mel features from {} to {}...".format( |
|
src_dataset_dir, dataset_dir |
|
) |
|
) |
|
for utt_info in tqdm(metadata): |
|
src_mel_path = os.path.join( |
|
src_dataset_dir, cfg.preprocess.mel_dir, utt_info["Uid"] + ".npy" |
|
) |
|
dst_mel_path = os.path.join( |
|
dataset_dir, cfg.preprocess.mel_dir, utt_info["Uid"] + ".npy" |
|
) |
|
|
|
if not os.path.exists(dst_mel_path): |
|
os.symlink(src_mel_path, dst_mel_path) |
|
if cfg.preprocess.extract_energy: |
|
if not has_existed(os.path.join(dataset_dir, cfg.preprocess.energy_dir)): |
|
os.makedirs( |
|
os.path.join(dataset_dir, cfg.preprocess.energy_dir), exist_ok=True |
|
) |
|
print( |
|
"Copying energy features from {} to {}...".format( |
|
src_dataset_dir, dataset_dir |
|
) |
|
) |
|
for utt_info in tqdm(metadata): |
|
src_energy_path = os.path.join( |
|
src_dataset_dir, cfg.preprocess.energy_dir, utt_info["Uid"] + ".npy" |
|
) |
|
dst_energy_path = os.path.join( |
|
dataset_dir, cfg.preprocess.energy_dir, utt_info["Uid"] + ".npy" |
|
) |
|
|
|
if not os.path.exists(dst_energy_path): |
|
os.symlink(src_energy_path, dst_energy_path) |
|
if cfg.preprocess.extract_pitch: |
|
if not has_existed(os.path.join(dataset_dir, cfg.preprocess.pitch_dir)): |
|
os.makedirs( |
|
os.path.join(dataset_dir, cfg.preprocess.pitch_dir), exist_ok=True |
|
) |
|
print( |
|
"Copying pitch features from {} to {}...".format( |
|
src_dataset_dir, dataset_dir |
|
) |
|
) |
|
for utt_info in tqdm(metadata): |
|
src_pitch_path = os.path.join( |
|
src_dataset_dir, cfg.preprocess.pitch_dir, utt_info["Uid"] + ".npy" |
|
) |
|
dst_pitch_path = os.path.join( |
|
dataset_dir, cfg.preprocess.pitch_dir, utt_info["Uid"] + ".npy" |
|
) |
|
|
|
if not os.path.exists(dst_pitch_path): |
|
os.symlink(src_pitch_path, dst_pitch_path) |
|
if cfg.preprocess.extract_uv: |
|
if not has_existed(os.path.join(dataset_dir, cfg.preprocess.uv_dir)): |
|
os.makedirs( |
|
os.path.join(dataset_dir, cfg.preprocess.uv_dir), exist_ok=True |
|
) |
|
print( |
|
"Copying uv features from {} to {}...".format( |
|
src_dataset_dir, dataset_dir |
|
) |
|
) |
|
for utt_info in tqdm(metadata): |
|
src_uv_path = os.path.join( |
|
src_dataset_dir, cfg.preprocess.uv_dir, utt_info["Uid"] + ".npy" |
|
) |
|
dst_uv_path = os.path.join( |
|
dataset_dir, cfg.preprocess.uv_dir, utt_info["Uid"] + ".npy" |
|
) |
|
|
|
if not os.path.exists(dst_uv_path): |
|
os.symlink(src_uv_path, dst_uv_path) |
|
if cfg.preprocess.extract_audio: |
|
if not has_existed(os.path.join(dataset_dir, cfg.preprocess.audio_dir)): |
|
os.makedirs( |
|
os.path.join(dataset_dir, cfg.preprocess.audio_dir), exist_ok=True |
|
) |
|
print( |
|
"Copying audio features from {} to {}...".format( |
|
src_dataset_dir, dataset_dir |
|
) |
|
) |
|
for utt_info in tqdm(metadata): |
|
if cfg.task_type == "tts": |
|
src_audio_path = os.path.join( |
|
src_dataset_dir, |
|
cfg.preprocess.audio_dir, |
|
utt_info["Uid"] + ".wav", |
|
) |
|
else: |
|
src_audio_path = os.path.join( |
|
src_dataset_dir, |
|
cfg.preprocess.audio_dir, |
|
utt_info["Uid"] + ".npy", |
|
) |
|
if cfg.task_type == "tts": |
|
dst_audio_path = os.path.join( |
|
dataset_dir, cfg.preprocess.audio_dir, utt_info["Uid"] + ".wav" |
|
) |
|
else: |
|
dst_audio_path = os.path.join( |
|
dataset_dir, cfg.preprocess.audio_dir, utt_info["Uid"] + ".npy" |
|
) |
|
|
|
if not os.path.exists(dst_audio_path): |
|
os.symlink(src_audio_path, dst_audio_path) |
|
if cfg.preprocess.extract_label: |
|
if not has_existed(os.path.join(dataset_dir, cfg.preprocess.label_dir)): |
|
os.makedirs( |
|
os.path.join(dataset_dir, cfg.preprocess.label_dir), exist_ok=True |
|
) |
|
print( |
|
"Copying label features from {} to {}...".format( |
|
src_dataset_dir, dataset_dir |
|
) |
|
) |
|
for utt_info in tqdm(metadata): |
|
src_label_path = os.path.join( |
|
src_dataset_dir, cfg.preprocess.label_dir, utt_info["Uid"] + ".npy" |
|
) |
|
dst_label_path = os.path.join( |
|
dataset_dir, cfg.preprocess.label_dir, utt_info["Uid"] + ".npy" |
|
) |
|
|
|
if not os.path.exists(dst_label_path): |
|
os.symlink(src_label_path, dst_label_path) |
|
|
|
|
|
def align_duration_mel(dataset, output_path, cfg): |
|
print("align the duration and mel") |
|
|
|
dataset_dir = os.path.join(output_path, dataset) |
|
metadata = [] |
|
for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
|
dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
|
with open(dataset_file, "r") as f: |
|
metadata.extend(json.load(f)) |
|
|
|
utt2dur = {} |
|
for index in tqdm(range(len(metadata))): |
|
utt_info = metadata[index] |
|
dataset = utt_info["Dataset"] |
|
uid = utt_info["Uid"] |
|
utt = "{}_{}".format(dataset, uid) |
|
|
|
mel_path = os.path.join(dataset_dir, cfg.preprocess.mel_dir, uid + ".npy") |
|
mel = np.load(mel_path).transpose(1, 0) |
|
duration_path = os.path.join( |
|
dataset_dir, cfg.preprocess.duration_dir, uid + ".npy" |
|
) |
|
duration = np.load(duration_path) |
|
if sum(duration) != mel.shape[0]: |
|
duration_sum = sum(duration) |
|
mel_len = mel.shape[0] |
|
mismatch = abs(duration_sum - mel_len) |
|
assert mismatch <= 5, "duration and mel length mismatch!" |
|
cloned = np.array(duration, copy=True) |
|
if duration_sum > mel_len: |
|
for j in range(1, len(duration) - 1): |
|
if mismatch == 0: |
|
break |
|
dur_val = cloned[-j] |
|
if dur_val >= mismatch: |
|
cloned[-j] -= mismatch |
|
mismatch -= dur_val |
|
break |
|
else: |
|
cloned[-j] = 0 |
|
mismatch -= dur_val |
|
|
|
elif duration_sum < mel_len: |
|
cloned[-1] += mismatch |
|
duration = cloned |
|
utt2dur[utt] = duration |
|
np.save(duration_path, duration) |
|
|
|
return utt2dur |
|
|