|
import os |
|
import shutil |
|
from os import listdir |
|
from colorama import Fore |
|
import os |
|
import shutil |
|
import numpy as np |
|
import faiss |
|
from pathlib import Path |
|
from sklearn.cluster import MiniBatchKMeans |
|
import traceback |
|
import gradio as gr |
|
|
|
|
|
def preprocess_data(model_name, dataset_folder): |
|
logs_path = f'/content/RVC/logs/{model_name}' |
|
temp_DG_path = '/content/temp_DG' |
|
|
|
if os.path.exists(logs_path): |
|
print("Model already exists, This will be resume training.") |
|
os.makedirs(temp_DG_path, exist_ok=True) |
|
|
|
|
|
for item in os.listdir(logs_path): |
|
item_path = os.path.join(logs_path, item) |
|
if os.path.isfile(item_path) and (item.startswith('D_') or item.startswith('G_')) and item.endswith('.pth'): |
|
shutil.copy(item_path, temp_DG_path) |
|
|
|
for item in os.listdir(logs_path): |
|
item_path = os.path.join(logs_path, item) |
|
if os.path.isfile(item_path): |
|
os.remove(item_path) |
|
elif os.path.isdir(item_path): |
|
shutil.rmtree(item_path) |
|
|
|
for file_name in os.listdir(temp_DG_path): |
|
shutil.move(os.path.join(temp_DG_path, file_name), logs_path) |
|
|
|
shutil.rmtree(temp_DG_path) |
|
|
|
if len(os.listdir(dataset_folder)) < 1: |
|
return "Error: Dataset folder is empty." |
|
|
|
os.makedirs(f'./logs/{model_name}', exist_ok=True) |
|
!python infer/modules/train/preprocess.py {dataset_folder} 32000 2 ./logs/{model_name} False 3.0 |
|
|
|
with open(f'./logs/{model_name}/preprocess.log', 'r') as f: |
|
log_content = f.read() |
|
|
|
if 'end preprocess' in log_content: |
|
return "Success: Data preprocessing complete." |
|
else: |
|
return "Error preprocessing data. Check your dataset folder." |
|
|
|
|
|
def extract_f0_feature(model_name, f0method): |
|
if f0method != "rmvpe_gpu": |
|
!python infer/modules/train/extract/extract_f0_print.py ./logs/{model_name} 2 {f0method} |
|
else: |
|
!python infer/modules/train/extract/extract_f0_rmvpe.py 1 0 0 ./logs/{model_name} True |
|
|
|
with open(f'./logs/{model_name}/extract_f0_feature.log', 'r') as f: |
|
log_content = f.read() |
|
|
|
if 'all-feature-done' in log_content: |
|
return "Success: F0 feature extraction complete." |
|
else: |
|
return "Error extracting F0 feature." |
|
|
|
|
|
def train_index(exp_dir1, version19): |
|
exp_dir = f"logs/{exp_dir1}" |
|
os.makedirs(exp_dir, exist_ok=True) |
|
feature_dir = f"{exp_dir}/3_feature768" if version19 == "v2" else f"{exp_dir}/3_feature256" |
|
|
|
if not os.path.exists(feature_dir) or len(os.listdir(feature_dir)) == 0: |
|
return "Please run feature extraction first." |
|
|
|
npys = [np.load(f"{feature_dir}/{name}") for name in sorted(os.listdir(feature_dir))] |
|
big_npy = np.concatenate(npys, axis=0) |
|
big_npy_idx = np.arange(big_npy.shape[0]) |
|
np.random.shuffle(big_npy_idx) |
|
big_npy = big_npy[big_npy_idx] |
|
|
|
if big_npy.shape[0] > 2e5: |
|
big_npy = MiniBatchKMeans(n_clusters=10000, batch_size=256, init="random").fit(big_npy).cluster_centers_ |
|
|
|
np.save(f"{exp_dir}/total_fea.npy", big_npy) |
|
n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39) |
|
|
|
index = faiss.index_factory(768 if version19 == "v2" else 256, f"IVF{n_ivf},Flat") |
|
index.train(big_npy) |
|
faiss.write_index(index, f"{exp_dir}/trained_IVF{n_ivf}_Flat_nprobe_1_{exp_dir1}_{version19}.index") |
|
|
|
batch_size_add = 8192 |
|
for i in range(0, big_npy.shape[0], batch_size_add): |
|
index.add(big_npy[i:i + batch_size_add]) |
|
|
|
faiss.write_index(index, f"{exp_dir}/added_IVF{n_ivf}_Flat_nprobe_1_{exp_dir1}_{version19}.index") |
|
return f"Indexing completed. Index saved to {exp_dir}/added_IVF{n_ivf}_Flat_nprobe_1_{exp_dir1}_{version19}.index" |
|
|
|
|
|
|
|
|
|
def run_inference(model_name, pitch, input_path, f0_method, save_as, index_rate, volume_normalization, consonant_protection): |
|
|
|
model_filename = model_name + '.pth' |
|
index_temp = 'Index_Temp' |
|
|
|
|
|
if not os.path.exists(index_temp): |
|
os.mkdir(index_temp) |
|
print("Index_Temp folder created.") |
|
else: |
|
print("Index_Temp folder found.") |
|
|
|
|
|
index_file_path = os.path.join('logs/', model_name, '') |
|
for file_name in listdir(index_file_path): |
|
if file_name.startswith('added') and file_name.endswith('.index'): |
|
shutil.copy(index_file_path + file_name, os.path.join(index_temp, file_name)) |
|
print('Index file copied successfully.') |
|
|
|
|
|
indexfile_directory = os.getcwd() + '/' + index_temp |
|
files = os.listdir(indexfile_directory) |
|
index_filename = files[0] if files else None |
|
if index_filename is None: |
|
raise ValueError("Index file not found.") |
|
|
|
shutil.rmtree(index_temp) |
|
|
|
model_path = "assets/weights/" + model_filename |
|
index_path = os.path.join('logs', model_name, index_filename) |
|
|
|
if not os.path.exists(input_path): |
|
raise ValueError(f"{input_path} was not found.") |
|
|
|
os.environ['index_root'] = os.path.dirname(index_path) |
|
index_path = os.path.basename(index_path) |
|
|
|
os.environ['weight_root'] = os.path.dirname(model_path) |
|
|
|
|
|
cmd = f"python tools/cmd/infer_cli.py --f0up_key {pitch} --input_path {input_path} --index_path {index_path} --f0method {f0_method} --opt_path {save_as} --model_name {os.path.basename(model_path)} --index_rate {index_rate} --device 'cuda:0' --is_half True --filter_radius 3 --resample_sr 0 --rms_mix_rate {volume_normalization} --protect {consonant_protection}" |
|
os.system(f"rm -f {save_as}") |
|
os.system(cmd) |
|
|
|
return f"Inference completed, output saved at {save_as}.", save_as |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Row(): |
|
gr.Markdown("# RVC V2 - EASY GUI") |
|
with gr.Row(): |
|
with gr.Tab("Inference"): |
|
with gr.Row(): |
|
model_name = gr.Textbox(label="Model Name For Inference") |
|
with gr.Row(): |
|
input_path = gr.Audio(label="Input Audio Path", type="filepath") |
|
with gr.Row(): |
|
with gr.Accordion("Inference Settings"): |
|
pitch = gr.Slider(minimum=-12, maximum=12, step=1, label="Pitch", value=0) |
|
f0_method = gr.Dropdown(choices=["rmvpe", "pm", "harvest"], label="f0 Method", value="rmvpe") |
|
index_rate = gr.Slider(minimum=0, maximum=1, step=0.01, label="Index Rate", value=0.5) |
|
volume_normalization = gr.Slider(minimum=0, maximum=1, step=0.01, label="Volume Normalization", value=0) |
|
consonant_protection = gr.Slider(minimum=0, maximum=1, step=0.01, label="Consonant Protection", value=0.5) |
|
with gr.Row(): |
|
save_as = gr.Textbox(value="/content/RVC/audios/output_audio.wav", label="Output Audio Path") |
|
|
|
run_btn = gr.Button("Run Inference") |
|
with gr.Row(): |
|
output_message = gr.Textbox(label="Output Message",interactive=False) |
|
output_audio = gr.Audio(label="Output Audio",interactive=False) |
|
|
|
|
|
|
|
|
|
demo.launch() |
|
|