Spaces:
Sleeping
Sleeping
from io import BytesIO | |
import json | |
import os | |
import re | |
import struct | |
import warnings | |
from collections import OrderedDict | |
import librosa | |
import numpy as np | |
import parselmouth | |
import pyloudnorm as pyln | |
import resampy | |
import torch | |
import torchcrepe | |
import webrtcvad | |
from scipy.ndimage.morphology import binary_dilation | |
from skimage.transform import resize | |
import pyworld as world | |
from utils import audio | |
from utils.pitch_utils import f0_to_coarse | |
from utils.text_encoder import TokenTextEncoder | |
warnings.filterwarnings("ignore") | |
PUNCS = '!,.?;:' | |
int16_max = (2 ** 15) - 1 | |
def trim_long_silences(path, sr=None, return_raw_wav=False, norm=True, vad_max_silence_length=12): | |
""" | |
Ensures that segments without voice in the waveform remain no longer than a | |
threshold determined by the VAD parameters in params.py. | |
:param wav: the raw waveform as a numpy array of floats | |
:param vad_max_silence_length: Maximum number of consecutive silent frames a segment can have. | |
:return: the same waveform with silences trimmed away (length <= original wav length) | |
""" | |
## Voice Activation Detection | |
# Window size of the VAD. Must be either 10, 20 or 30 milliseconds. | |
# This sets the granularity of the VAD. Should not need to be changed. | |
sampling_rate = 16000 | |
wav_raw, sr = librosa.core.load(path, sr=sr) | |
if norm: | |
meter = pyln.Meter(sr) # create BS.1770 meter | |
loudness = meter.integrated_loudness(wav_raw) | |
wav_raw = pyln.normalize.loudness(wav_raw, loudness, -20.0) | |
if np.abs(wav_raw).max() > 1.0: | |
wav_raw = wav_raw / np.abs(wav_raw).max() | |
wav = librosa.resample(wav_raw, sr, sampling_rate, res_type='kaiser_best') | |
vad_window_length = 30 # In milliseconds | |
# Number of frames to average together when performing the moving average smoothing. | |
# The larger this value, the larger the VAD variations must be to not get smoothed out. | |
vad_moving_average_width = 8 | |
# Compute the voice detection window size | |
samples_per_window = (vad_window_length * sampling_rate) // 1000 | |
# Trim the end of the audio to have a multiple of the window size | |
wav = wav[:len(wav) - (len(wav) % samples_per_window)] | |
# Convert the float waveform to 16-bit mono PCM | |
pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16)) | |
# Perform voice activation detection | |
voice_flags = [] | |
vad = webrtcvad.Vad(mode=3) | |
for window_start in range(0, len(wav), samples_per_window): | |
window_end = window_start + samples_per_window | |
voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2], | |
sample_rate=sampling_rate)) | |
voice_flags = np.array(voice_flags) | |
# Smooth the voice detection with a moving average | |
def moving_average(array, width): | |
array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2))) | |
ret = np.cumsum(array_padded, dtype=float) | |
ret[width:] = ret[width:] - ret[:-width] | |
return ret[width - 1:] / width | |
audio_mask = moving_average(voice_flags, vad_moving_average_width) | |
audio_mask = np.round(audio_mask).astype(np.bool) | |
# Dilate the voiced regions | |
audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1)) | |
audio_mask = np.repeat(audio_mask, samples_per_window) | |
audio_mask = resize(audio_mask, (len(wav_raw),)) > 0 | |
if return_raw_wav: | |
return wav_raw, audio_mask, sr | |
return wav_raw[audio_mask], audio_mask, sr | |
def process_utterance(wav_path, | |
fft_size=1024, | |
hop_size=256, | |
win_length=1024, | |
window="hann", | |
num_mels=80, | |
fmin=80, | |
fmax=7600, | |
eps=1e-6, | |
sample_rate=22050, | |
loud_norm=False, | |
min_level_db=-100, | |
return_linear=False, | |
trim_long_sil=False, vocoder='pwg'): | |
if isinstance(wav_path, str) or isinstance(wav_path, BytesIO): | |
if trim_long_sil: | |
wav, _, _ = trim_long_silences(wav_path, sample_rate) | |
else: | |
wav, _ = librosa.core.load(wav_path, sr=sample_rate) | |
else: | |
wav = wav_path | |
if loud_norm: | |
meter = pyln.Meter(sample_rate) # create BS.1770 meter | |
loudness = meter.integrated_loudness(wav) | |
wav = pyln.normalize.loudness(wav, loudness, -22.0) | |
if np.abs(wav).max() > 1: | |
wav = wav / np.abs(wav).max() | |
# get amplitude spectrogram | |
x_stft = librosa.stft(wav, n_fft=fft_size, hop_length=hop_size, | |
win_length=win_length, window=window, pad_mode="constant") | |
spc = np.abs(x_stft) # (n_bins, T) | |
# get mel basis | |
fmin = 0 if fmin == -1 else fmin | |
fmax = sample_rate / 2 if fmax == -1 else fmax | |
mel_basis = librosa.filters.mel(sample_rate, fft_size, num_mels, fmin, fmax) | |
mel = mel_basis @ spc | |
if vocoder == 'pwg': | |
mel = np.log10(np.maximum(eps, mel)) # (n_mel_bins, T) | |
else: | |
assert False, f'"{vocoder}" is not in ["pwg"].' | |
l_pad, r_pad = audio.librosa_pad_lr(wav, fft_size, hop_size, 1) | |
wav = np.pad(wav, (l_pad, r_pad), mode='constant', constant_values=0.0) | |
wav = wav[:mel.shape[1] * hop_size] | |
if not return_linear: | |
return wav, mel | |
else: | |
spc = audio.amp_to_db(spc) | |
spc = audio.normalize(spc, {'min_level_db': min_level_db}) | |
return wav, mel, spc | |
def get_pitch_parselmouth(wav_data, mel, hparams): | |
""" | |
:param wav_data: [T] | |
:param mel: [T, 80] | |
:param hparams: | |
:return: | |
""" | |
# time_step = hparams['hop_size'] / hparams['audio_sample_rate'] | |
# f0_min = hparams['f0_min'] | |
# f0_max = hparams['f0_max'] | |
# if hparams['hop_size'] == 128: | |
# pad_size = 4 | |
# elif hparams['hop_size'] == 256: | |
# pad_size = 2 | |
# else: | |
# assert False | |
# f0 = parselmouth.Sound(wav_data, hparams['audio_sample_rate']).to_pitch_ac( | |
# time_step=time_step, voicing_threshold=0.6, | |
# pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency'] | |
# lpad = pad_size * 2 | |
# rpad = len(mel) - len(f0) - lpad | |
# f0 = np.pad(f0, [[lpad, rpad]], mode='constant') | |
# # mel and f0 are extracted by 2 different libraries. we should force them to have the same length. | |
# # Attention: we find that new version of some libraries could cause ``rpad'' to be a negetive value... | |
# # Just to be sure, we recommend users to set up the same environments as them in requirements_auto.txt (by Anaconda) | |
# delta_l = len(mel) - len(f0) | |
# assert np.abs(delta_l) <= 8 | |
# if delta_l > 0: | |
# f0 = np.concatenate([f0, [f0[-1]] * delta_l], 0) | |
# f0 = f0[:len(mel)] | |
# pad_size=(int(len(wav_data) // hparams['hop_size']) - len(f0) + 1) // 2 | |
# f0 = np.pad(f0,[[pad_size,len(mel) - len(f0) - pad_size]], mode='constant') | |
# pitch_coarse = f0_to_coarse(f0, hparams) | |
# return f0, pitch_coarse | |
# Bye bye Parselmouth ! | |
return get_pitch_world(wav_data, mel, hparams) | |
def get_pitch_world(wav_data, mel, hparams): | |
""" | |
:param wav_data: [T] | |
:param mel: [T, 80] | |
:param hparams: | |
:return: | |
""" | |
time_step = 1000 * hparams['hop_size'] / hparams['audio_sample_rate'] | |
f0_min = hparams['f0_min'] | |
f0_max = hparams['f0_max'] | |
# Here's to hoping it uses numpy stuff ! | |
f0, _ = world.harvest(wav_data.astype(np.double), hparams['audio_sample_rate'], f0_min, f0_max, time_step) | |
# Change padding | |
len_diff = len(mel) - len(f0) | |
if len_diff > 0: | |
pad_len = (len_diff + 1) // 2 | |
f0 = np.pad(f0, [[pad_len, len_diff - pad_len]]) | |
else: | |
pad_len = (1 - len_diff) // 2 | |
rpad = pad_len + len_diff | |
if rpad != 0: | |
f0 = f0[pad_len:rpad] | |
f0 = f0[pad_len:] | |
pitch_coarse = f0_to_coarse(f0, hparams) | |
return f0, pitch_coarse | |
def get_pitch_crepe(wav_data, mel, hparams, threshold=0.05): | |
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
device = torch.device("cuda") | |
# crepe只支持16khz采样率,需要重采样 | |
wav16k = resampy.resample(wav_data, hparams['audio_sample_rate'], 16000) | |
wav16k_torch = torch.FloatTensor(wav16k).unsqueeze(0).to(device) | |
# 频率范围 | |
f0_min = hparams['f0_min'] | |
f0_max = hparams['f0_max'] | |
# 重采样后按照hopsize=80,也就是5ms一帧分析f0 | |
f0, pd = torchcrepe.predict(wav16k_torch, 16000, 80, f0_min, f0_max, pad=True, model='full', batch_size=1024, | |
device=device, return_periodicity=True) | |
# 滤波,去掉静音,设置uv阈值,参考原仓库readme | |
pd = torchcrepe.filter.median(pd, 3) | |
pd = torchcrepe.threshold.Silence(-60.)(pd, wav16k_torch, 16000, 80) | |
f0 = torchcrepe.threshold.At(threshold)(f0, pd) | |
f0 = torchcrepe.filter.mean(f0, 3) | |
# 将nan频率(uv部分)转换为0频率 | |
f0 = torch.where(torch.isnan(f0), torch.full_like(f0, 0), f0) | |
''' | |
np.savetxt('问棋-crepe.csv',np.array([0.005*np.arange(len(f0[0])),f0[0].cpu().numpy()]).transpose(),delimiter=',') | |
''' | |
# 去掉0频率,并线性插值 | |
nzindex = torch.nonzero(f0[0]).squeeze() | |
f0 = torch.index_select(f0[0], dim=0, index=nzindex).cpu().numpy() | |
time_org = 0.005 * nzindex.cpu().numpy() | |
time_frame = np.arange(len(mel)) * hparams['hop_size'] / hparams['audio_sample_rate'] | |
if f0.shape[0] == 0: | |
f0 = torch.FloatTensor(time_frame.shape[0]).fill_(0) | |
print('f0 all zero!') | |
else: | |
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1]) | |
pitch_coarse = f0_to_coarse(f0, hparams) | |
return f0, pitch_coarse | |
def remove_empty_lines(text): | |
"""remove empty lines""" | |
assert (len(text) > 0) | |
assert (isinstance(text, list)) | |
text = [t.strip() for t in text] | |
if "" in text: | |
text.remove("") | |
return text | |
class TextGrid(object): | |
def __init__(self, text): | |
text = remove_empty_lines(text) | |
self.text = text | |
self.line_count = 0 | |
self._get_type() | |
self._get_time_intval() | |
self._get_size() | |
self.tier_list = [] | |
self._get_item_list() | |
def _extract_pattern(self, pattern, inc): | |
""" | |
Parameters | |
---------- | |
pattern : regex to extract pattern | |
inc : increment of line count after extraction | |
Returns | |
------- | |
group : extracted info | |
""" | |
try: | |
group = re.match(pattern, self.text[self.line_count]).group(1) | |
self.line_count += inc | |
except AttributeError: | |
raise ValueError("File format error at line %d:%s" % (self.line_count, self.text[self.line_count])) | |
return group | |
def _get_type(self): | |
self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2) | |
def _get_time_intval(self): | |
self.xmin = self._extract_pattern(r"xmin = (.*)", 1) | |
self.xmax = self._extract_pattern(r"xmax = (.*)", 2) | |
def _get_size(self): | |
self.size = int(self._extract_pattern(r"size = (.*)", 2)) | |
def _get_item_list(self): | |
"""Only supports IntervalTier currently""" | |
for itemIdx in range(1, self.size + 1): | |
tier = OrderedDict() | |
item_list = [] | |
tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1) | |
tier_class = self._extract_pattern(r"class = \"(.*)\"", 1) | |
if tier_class != "IntervalTier": | |
raise NotImplementedError("Only IntervalTier class is supported currently") | |
tier_name = self._extract_pattern(r"name = \"(.*)\"", 1) | |
tier_xmin = self._extract_pattern(r"xmin = (.*)", 1) | |
tier_xmax = self._extract_pattern(r"xmax = (.*)", 1) | |
tier_size = self._extract_pattern(r"intervals: size = (.*)", 1) | |
for i in range(int(tier_size)): | |
item = OrderedDict() | |
item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1) | |
item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1) | |
item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1) | |
item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1) | |
item_list.append(item) | |
tier["idx"] = tier_idx | |
tier["class"] = tier_class | |
tier["name"] = tier_name | |
tier["xmin"] = tier_xmin | |
tier["xmax"] = tier_xmax | |
tier["size"] = tier_size | |
tier["items"] = item_list | |
self.tier_list.append(tier) | |
def toJson(self): | |
_json = OrderedDict() | |
_json["file_type"] = self.file_type | |
_json["xmin"] = self.xmin | |
_json["xmax"] = self.xmax | |
_json["size"] = self.size | |
_json["tiers"] = self.tier_list | |
return json.dumps(_json, ensure_ascii=False, indent=2) | |
def get_mel2ph(tg_fn, ph, mel, hparams): | |
ph_list = ph.split(" ") | |
with open(tg_fn, "r", encoding='utf-8') as f: | |
tg = f.readlines() | |
tg = remove_empty_lines(tg) | |
tg = TextGrid(tg) | |
tg = json.loads(tg.toJson()) | |
split = np.ones(len(ph_list) + 1, np.float) * -1 | |
tg_idx = 0 | |
ph_idx = 0 | |
tg_align = [x for x in tg['tiers'][-1]['items']] | |
tg_align_ = [] | |
for x in tg_align: | |
x['xmin'] = float(x['xmin']) | |
x['xmax'] = float(x['xmax']) | |
if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC']: | |
x['text'] = '' | |
if len(tg_align_) > 0 and tg_align_[-1]['text'] == '': | |
tg_align_[-1]['xmax'] = x['xmax'] | |
continue | |
tg_align_.append(x) | |
tg_align = tg_align_ | |
tg_len = len([x for x in tg_align if x['text'] != '']) | |
ph_len = len([x for x in ph_list if not is_sil_phoneme(x)]) | |
assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, tg_fn) | |
while tg_idx < len(tg_align) or ph_idx < len(ph_list): | |
if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]): | |
split[ph_idx] = 1e8 | |
ph_idx += 1 | |
continue | |
x = tg_align[tg_idx] | |
if x['text'] == '' and ph_idx == len(ph_list): | |
tg_idx += 1 | |
continue | |
assert ph_idx < len(ph_list), (tg_len, ph_len, tg_align, ph_list, tg_fn) | |
ph = ph_list[ph_idx] | |
if x['text'] == '' and not is_sil_phoneme(ph): | |
assert False, (ph_list, tg_align) | |
if x['text'] != '' and is_sil_phoneme(ph): | |
ph_idx += 1 | |
else: | |
assert (x['text'] == '' and is_sil_phoneme(ph)) \ | |
or x['text'].lower() == ph.lower() \ | |
or x['text'].lower() == 'sil', (x['text'], ph) | |
split[ph_idx] = x['xmin'] | |
if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme(ph_list[ph_idx - 1]): | |
split[ph_idx - 1] = split[ph_idx] | |
ph_idx += 1 | |
tg_idx += 1 | |
assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align]) | |
assert ph_idx >= len(ph_list) - 1, (ph_idx, ph_list, len(ph_list), [x['text'] for x in tg_align], tg_fn) | |
mel2ph = np.zeros([mel.shape[0]], np.int) | |
split[0] = 0 | |
split[-1] = 1e8 | |
for i in range(len(split) - 1): | |
assert split[i] != -1 and split[i] <= split[i + 1], (split[:-1],) | |
split = [int(s * hparams['audio_sample_rate'] / hparams['hop_size'] + 0.5) for s in split] | |
for ph_idx in range(len(ph_list)): | |
mel2ph[split[ph_idx]:split[ph_idx + 1]] = ph_idx + 1 | |
mel2ph_torch = torch.from_numpy(mel2ph) | |
T_t = len(ph_list) | |
dur = mel2ph_torch.new_zeros([T_t + 1]).scatter_add(0, mel2ph_torch, torch.ones_like(mel2ph_torch)) | |
dur = dur[1:].numpy() | |
return mel2ph, dur | |
def build_phone_encoder(data_dir): | |
phone_list_file = os.path.join(data_dir, 'phone_set.json') | |
phone_list = json.load(open(phone_list_file, encoding='utf-8')) | |
return TokenTextEncoder(None, vocab_list=phone_list, replace_oov=',') | |
def is_sil_phoneme(p): | |
return not p[0].isalpha() | |