Spaces:
Runtime error
Runtime error
import os | |
import pickle | |
import torch | |
import numpy as np | |
from numpy.random import uniform | |
from torch.utils import data | |
from torch.utils.data.sampler import Sampler | |
from multiprocessing import Process, Manager | |
class Utterances(data.Dataset): | |
"""Dataset class for the Utterances dataset.""" | |
def __init__(self, hparams): | |
"""Initialize and preprocess the Utterances dataset.""" | |
self.meta_file = hparams.meta_file | |
self.feat_dir_1 = hparams.feat_dir_1 | |
self.feat_dir_2 = hparams.feat_dir_2 | |
self.feat_dir_3 = hparams.feat_dir_3 | |
self.step = 4 | |
self.split = 0 | |
self.max_len_pad = hparams.max_len_pad | |
meta = pickle.load(open(self.meta_file, "rb")) | |
manager = Manager() | |
meta = manager.list(meta) | |
dataset = manager.list(len(meta)*[None]) # <-- can be shared between processes. | |
processes = [] | |
for i in range(0, len(meta), self.step): | |
p = Process(target=self.load_data, | |
args=(meta[i:i+self.step],dataset,i)) | |
p.start() | |
processes.append(p) | |
for p in processes: | |
p.join() | |
# very importtant to do dataset = list(dataset) | |
self.train_dataset = list(dataset) | |
self.num_tokens = len(self.train_dataset) | |
print('Finished loading the {} Utterances training dataset...'.format(self.num_tokens)) | |
def load_data(self, submeta, dataset, idx_offset): | |
for k, sbmt in enumerate(submeta): | |
uttrs = len(sbmt)*[None] | |
for j, tmp in enumerate(sbmt): | |
if j < 2: | |
# fill in speaker name and embedding | |
uttrs[j] = tmp | |
else: | |
# fill in data | |
sp_tmp = np.load(os.path.join(self.feat_dir_1, tmp)) | |
cep_tmp = np.load(os.path.join(self.feat_dir_2, tmp))[:, 0:14] | |
cd_tmp = np.load(os.path.join(self.feat_dir_3, tmp)) | |
assert len(sp_tmp) == len(cep_tmp) == len(cd_tmp) | |
uttrs[j] = ( np.clip(sp_tmp, 0, 1), cep_tmp, cd_tmp ) | |
dataset[idx_offset+k] = uttrs | |
def segment_np(self, cd_long, tau=2): | |
cd_norm = np.sqrt((cd_long ** 2).sum(axis=-1, keepdims=True)) | |
G = (cd_long @ cd_long.T) / (cd_norm @ cd_norm.T) | |
L = G.shape[0] | |
num_rep = [] | |
num_rep_sync = [] | |
prev_boundary = 0 | |
rate = np.random.uniform(0.8, 1.3) | |
for t in range(1, L+1): | |
if t==L: | |
num_rep.append(t - prev_boundary) | |
num_rep_sync.append(t - prev_boundary) | |
prev_boundary = t | |
if t < L: | |
q = np.random.uniform(rate-0.1, rate) | |
tmp = G[prev_boundary, max(prev_boundary-20, 0):min(prev_boundary+20, L)] | |
if q <= 1: | |
epsilon = np.quantile(tmp, q) | |
if np.all(G[prev_boundary, t:min(t+tau, L)] < epsilon): | |
num_rep.append(t - prev_boundary) | |
num_rep_sync.append(t - prev_boundary) | |
prev_boundary = t | |
else: | |
epsilon = np.quantile(tmp, 2-q) | |
if np.all(G[prev_boundary, t:min(t+tau, L)] < epsilon): | |
num_rep.append(t - prev_boundary) | |
else: | |
num_rep.extend([t-prev_boundary-0.5, 0.5]) | |
num_rep_sync.append(t - prev_boundary) | |
prev_boundary = t | |
num_rep = np.array(num_rep) | |
num_rep_sync = np.array(num_rep_sync) | |
return num_rep, num_rep_sync | |
def __getitem__(self, index): | |
"""Return M uttrs for one spkr.""" | |
dataset = self.train_dataset | |
list_uttrs = dataset[index] | |
emb_org = list_uttrs[1] | |
uttr = np.random.randint(2, len(list_uttrs)) | |
melsp, melcep, cd_real = list_uttrs[uttr] | |
num_rep, num_rep_sync = self.segment_np(cd_real) | |
return melsp, melcep, cd_real, num_rep, num_rep_sync, len(melsp), len(num_rep), len(num_rep_sync), emb_org | |
def __len__(self): | |
"""Return the number of spkrs.""" | |
return self.num_tokens | |
class MyCollator(object): | |
def __init__(self, hparams): | |
self.max_len_pad = hparams.max_len_pad | |
def __call__(self, batch): | |
new_batch = [] | |
l_short_max = 0 | |
l_short_sync_max = 0 | |
l_real_max = 0 | |
for token in batch: | |
sp_real, cep_real, cd_real, rep, rep_sync, l_real, l_short, l_short_sync, emb = token | |
if l_short > l_short_max: | |
l_short_max = l_short | |
if l_short_sync > l_short_sync_max: | |
l_short_sync_max = l_short_sync | |
if l_real > l_real_max: | |
l_real_max = l_real | |
sp_real_pad = np.pad(sp_real, ((0,self.max_len_pad-l_real),(0,0)), 'constant') | |
cep_real_pad = np.pad(cep_real, ((0,self.max_len_pad-l_real),(0,0)), 'constant') | |
cd_real_pad = np.pad(cd_real, ((0,self.max_len_pad-l_real),(0,0)), 'constant') | |
rep_pad = np.pad(rep, (0,self.max_len_pad-l_short), 'constant') | |
rep_sync_pad = np.pad(rep_sync, (0,self.max_len_pad-l_short_sync), 'constant') | |
new_batch.append( (sp_real_pad, cep_real_pad, cd_real_pad, rep_pad, rep_sync_pad, l_real, l_short, l_short_sync, emb) ) | |
batch = new_batch | |
a, b, c, d, e, f, g, h, i = zip(*batch) | |
sp_real = torch.from_numpy(np.stack(a, axis=0))[:,:l_real_max+1,:] | |
cep_real = torch.from_numpy(np.stack(b, axis=0))[:,:l_real_max+1,:] | |
cd_real = torch.from_numpy(np.stack(c, axis=0))[:,:l_real_max+1,:] | |
num_rep = torch.from_numpy(np.stack(d, axis=0))[:,:l_short_max+1] | |
num_rep_sync = torch.from_numpy(np.stack(e, axis=0))[:,:l_short_sync_max+1] | |
len_real = torch.from_numpy(np.stack(f, axis=0)) | |
len_short = torch.from_numpy(np.stack(g, axis=0)) | |
len_short_sync = torch.from_numpy(np.stack(h, axis=0)) | |
spk_emb = torch.from_numpy(np.stack(i, axis=0)) | |
return sp_real, cep_real, cd_real, num_rep, num_rep_sync, len_real, len_short, len_short_sync, spk_emb | |
class MultiSampler(Sampler): | |
"""Samples elements more than once in a single pass through the data. | |
""" | |
def __init__(self, num_samples, n_repeats, shuffle=False): | |
self.num_samples = num_samples | |
self.n_repeats = n_repeats | |
self.shuffle = shuffle | |
def gen_sample_array(self): | |
self.sample_idx_array = torch.arange(self.num_samples, dtype=torch.int64).repeat(self.n_repeats) | |
if self.shuffle: | |
self.sample_idx_array = self.sample_idx_array[torch.randperm(len(self.sample_idx_array))] | |
return self.sample_idx_array | |
def __iter__(self): | |
return iter(self.gen_sample_array()) | |
def __len__(self): | |
return len(self.sample_idx_array) | |
def worker_init_fn(x): | |
return np.random.seed((torch.initial_seed()) % (2**32)) | |
def get_loader(hparams): | |
"""Build and return a data loader.""" | |
dataset = Utterances(hparams) | |
my_collator = MyCollator(hparams) | |
sampler = MultiSampler(len(dataset), hparams.samplier, shuffle=hparams.shuffle) | |
data_loader = data.DataLoader(dataset=dataset, | |
batch_size=hparams.batch_size, | |
sampler=sampler, | |
num_workers=hparams.num_workers, | |
drop_last=True, | |
pin_memory=False, | |
worker_init_fn=worker_init_fn, | |
collate_fn=my_collator) | |
return data_loader |