Spaces:
Running
Running
import os | |
import numpy as np | |
import pandas as pd | |
import tensorflow as tf | |
import tensorflow_io as tfio | |
import csv | |
from scipy.io import wavfile | |
import scipy | |
import librosa | |
import soundfile as sf | |
import time | |
import soundfile as sf | |
import gradio as gr | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
from transformers import AutoProcessor | |
from transformers import BarkModel | |
from optimum.bettertransformer import BetterTransformer | |
import torch | |
from nemo.collections.tts.models import FastPitchModel | |
from nemo.collections.tts.models import HifiGanModel | |
from deep_translator import GoogleTranslator | |
from haystack.document_stores import InMemoryDocumentStore | |
from haystack.nodes import EmbeddingRetriever | |
# --- Load models --- | |
#Load a model from tensorflow hub | |
def load_model_hub(model_url): | |
model = hub.load(model_url) | |
return model | |
# Load a model from the project folder | |
def load_model_file(model_path): | |
interpreter = tf.lite.Interpreter(model_path) | |
interpreter.allocate_tensors() | |
return interpreter | |
# --- Initialize models --- | |
def initialize_text_to_speech_model(): | |
spec_generator = FastPitchModel.from_pretrained("nvidia/tts_en_fastpitch") | |
# Load vocoder | |
model = HifiGanModel.from_pretrained(model_name="nvidia/tts_hifigan") | |
return spec_generator, model | |
def initialize_tt5_model(): | |
from transformers import SpeechT5ForTextToSpeech, SpeechT5Processor, SpeechT5HifiGan | |
from datasets import load_dataset | |
dataset = load_dataset("pedropauletti/librispeech-portuguese") | |
model = SpeechT5ForTextToSpeech.from_pretrained("pedropauletti/speecht5_finetuned_librispeech_pt") | |
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") | |
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan") | |
example = dataset["test"][100] | |
speaker_embeddings = torch.tensor(example["speaker_embeddings"]).unsqueeze(0) | |
return model, processor, vocoder, speaker_embeddings | |
def load_qa_model(): | |
document_store = InMemoryDocumentStore() | |
retriever = EmbeddingRetriever( | |
document_store=document_store, | |
embedding_model="sentence-transformers/all-MiniLM-L6-v2", | |
use_gpu=False, | |
scale_score=False, | |
) | |
# Get dataframe with columns "question", "answer" and some custom metadata | |
df = pd.read_csv('content/social-faq.csv', on_bad_lines='skip', delimiter=';') | |
# Minimal cleaning | |
df.fillna(value="", inplace=True) | |
df["question"] = df["question"].apply(lambda x: x.strip()) | |
questions = list(df["question"].values) | |
df["embedding"] = retriever.embed_queries(queries=questions).tolist() | |
df = df.rename(columns={"question": "content"}) | |
# Convert Dataframe to list of dicts and index them in our DocumentStore | |
docs_to_index = df.to_dict(orient="records") | |
document_store.write_documents(docs_to_index) | |
return retriever | |
# --- Audio pre-processing --- | |
# Utility functions for loading audio files and making sure the sample rate is correct. | |
def load_wav_16k_mono(filename): | |
""" Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """ | |
file_contents = tf.io.read_file(filename) | |
wav, sample_rate = tf.audio.decode_wav( | |
file_contents, | |
desired_channels=1) | |
wav = tf.squeeze(wav, axis=-1) | |
sample_rate = tf.cast(sample_rate, dtype=tf.int64) | |
wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000) | |
return wav | |
def load_wav_16k_mono_librosa(filename): | |
""" Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio using librosa. """ | |
wav, sample_rate = librosa.load(filename, sr=16000, mono=True) | |
return wav | |
def load_wav_16k_mono_soundfile(filename): | |
""" Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio using soundfile. """ | |
wav, sample_rate = sf.read(filename, dtype='float32') | |
# Resample to 16 kHz if necessary | |
if sample_rate != 16000: | |
wav = librosa.resample(wav, orig_sr=sample_rate, target_sr=16000) | |
return wav | |
# --- History --- | |
def updateHistory(): | |
global history | |
return history | |
def clearHistory(): | |
global history | |
history = "" | |
return history | |
def clear(): | |
return None | |
# --- Output Format --- | |
def format_dictionary(dictionary): | |
result = [] | |
for key, value in dictionary.items(): | |
percentage = int(value * 100) | |
result.append(f"{key}: {percentage}%") | |
return ', '.join(result) | |
def format_json(json_data): | |
confidence_strings = [f"{item['label']}: {round(item['confidence']*100)}%" for item in json_data['confidences']] | |
result_string = f"{', '.join(confidence_strings)}" | |
return result_string | |
def format_json_pt(json_data): | |
from unidecode import unidecode | |
confidence_strings = [f"{item['label']}... " for item in json_data['confidences']] | |
result_string = f"{', '.join(confidence_strings)}" | |
return unidecode(result_string) | |
# --- Classification --- | |
def load_label_mapping(csv_path): | |
label_mapping = {} | |
with open(csv_path, newline='', encoding='utf-8') as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
label_mapping[int(row['index'])] = row['display_name'] | |
return label_mapping | |
def predict_yamnet(interpreter, waveform, input_details, output_details, label_mapping): | |
# Pré-processamento da waveform para corresponder aos requisitos do modelo | |
input_shape = input_details[0]['shape'] | |
input_data = np.array(waveform, dtype=np.float32) | |
if input_data.shape != input_shape: | |
# Redimensionar ou preencher a waveform para corresponder ao tamanho esperado | |
if input_data.shape[0] < input_shape[0]: | |
# Preencher a waveform com zeros | |
padding = np.zeros((input_shape[0] - input_data.shape[0],)) | |
input_data = np.concatenate((input_data, padding)) | |
elif input_data.shape[0] > input_shape[0]: | |
# Redimensionar a waveform | |
input_data = input_data[:input_shape[0]] | |
input_data = np.reshape(input_data, input_shape) | |
# Executar a inferência | |
interpreter.set_tensor(input_details[0]['index'], input_data) | |
interpreter.invoke() | |
# Obter os resultados da inferência | |
output_data = interpreter.get_tensor(output_details[0]['index']) | |
# Processar os resultados e imprimir nome da etiqueta | |
top_labels_indices = np.argsort(output_data[0])[::-1][:3] | |
results = [] | |
for i in top_labels_indices: | |
label_name = label_mapping.get(i, "Unknown Label") | |
probability = float(output_data[0][i]) # Converter para float | |
results.append({'label': label_name, 'probability': str(probability)}) | |
return results # Retornar um dicionário contendo a lista de resultados | |
def classify(audio, language="en-us"): | |
#Preprocessing audio | |
wav_data = load_wav_16k_mono_librosa(audio) | |
if(language == "pt-br"): | |
#Label Mapping | |
label_mapping = load_label_mapping('content/yamnet_class_map_ptbr.csv') | |
else: | |
label_mapping = load_label_mapping('content/yamnet_class_map.csv') | |
#Load Model by File | |
model = load_model_file('content/yamnet_classification.tflite') | |
input_details = model.get_input_details() | |
output_details = model.get_output_details() | |
#Classification | |
result = predict_yamnet(model, wav_data, input_details, output_details, label_mapping) | |
return result | |
def classify_realtime(language, audio, state): | |
#Preprocessing audio | |
wav_data = load_wav_16k_mono_librosa(audio) | |
if(language == "pt-br"): | |
#Label Mapping | |
label_mapping = load_label_mapping('content/yamnet_class_map_ptbr.csv') | |
else: | |
label_mapping = load_label_mapping('content/yamnet_class_map.csv') | |
#Load Model by File | |
model = load_model_file('content/yamnet_classification.tflite') | |
input_details = model.get_input_details() | |
output_details = model.get_output_details() | |
#Classification | |
result = predict_yamnet(model, wav_data, input_details, output_details, label_mapping) | |
state += result + " " | |
return result, state | |
# --- TTS --- | |
def generate_audio(spec_generator, model, input_text): | |
parsed = spec_generator.parse(input_text) | |
spectrogram = spec_generator.generate_spectrogram(tokens=parsed) | |
audio = model.convert_spectrogram_to_audio(spec=spectrogram) | |
return 22050, audio.cpu().detach().numpy().squeeze() | |
def generate_audio_tt5(model, processor, vocoder, speaker_embeddings, text): | |
inputs = processor(text=text, return_tensors="pt") | |
audio = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder) | |
return 16000, audio.cpu().detach().numpy().squeeze() | |
def TTS(json_input, language): | |
global spec_generator, model_nvidia, history | |
global model_tt5, processor, vocoder, speaker_embeddings | |
if language == 'en-us': | |
sr, generatedAudio = generate_audio(spec_generator, model_nvidia, format_json(json_input)) | |
else: | |
sr, generatedAudio = generate_audio_tt5(model_tt5, processor, vocoder, speaker_embeddings, format_json_pt(json_input)) | |
return (sr, generatedAudio) | |
def TTS_ASR(json_input, language): | |
global spec_generator, model_nvidia, history | |
global model_tt5, processor, vocoder, speaker_embeddings | |
if language == 'en-us': | |
sr, generatedAudio = generate_audio(spec_generator, model_nvidia, json_input['label']) | |
else: | |
sr, generatedAudio = generate_audio_tt5(model_tt5, processor, vocoder, speaker_embeddings, json_input['label']) | |
return (sr, generatedAudio) | |
def TTS_chatbot(language): | |
global spec_generator, model_nvidia, history | |
global model_tt5, processor, vocoder, speaker_embeddings | |
global last_answer | |
if language == 'en-us': | |
sr, generatedAudio = generate_audio(spec_generator, model_nvidia, last_answer) | |
else: | |
sr, generatedAudio = generate_audio_tt5(model_tt5, processor, vocoder, speaker_embeddings, last_answer) | |
return (sr, generatedAudio) | |
# --- ASR --- | |
def transcribe_speech(filepath, language): | |
print(filepath) | |
if(language == "pt-br"): | |
output = pipe( | |
filepath, | |
max_new_tokens=256, | |
generate_kwargs={ | |
"task": "transcribe", | |
"language": "portuguese", | |
}, | |
chunk_length_s=30, | |
batch_size=8, | |
) | |
else: | |
output = pipe_en( | |
filepath, | |
max_new_tokens=256, | |
generate_kwargs={ | |
"task": "transcribe", | |
"language": "english", | |
}, | |
chunk_length_s=30, | |
batch_size=8, | |
) | |
return output["text"] | |
def transcribe_speech_realtime(filepath, state): | |
output = pipe( | |
filepath, | |
max_new_tokens=256, | |
generate_kwargs={ | |
"task": "transcribe", | |
"language": "english", | |
}, | |
chunk_length_s=30, | |
batch_size=8, | |
) | |
state += output["text"] + " " | |
return output["text"], state | |
def transcribe_realtime(new_chunk, stream): | |
sr, y = new_chunk | |
y = y.astype(np.float32) | |
y /= np.max(np.abs(y)) | |
if stream is not None: | |
stream = np.concatenate([stream, y]) | |
else: | |
stream = y | |
return stream, pipe_en({"sampling_rate": sr, "raw": stream})["text"] | |
# --- Translation --- | |
def translate_enpt(text): | |
global enpt_pipeline | |
translation = enpt_pipeline(f"translate English to Portuguese: {text}") | |
return translation[0]['generated_text'] | |
# --- Gradio Interface --- | |
def interface(language, audio): | |
global classificationResult | |
result = classify(language, audio) | |
dic = {result[0]['label']: float(result[0]['probability']), | |
result[1]['label']: float(result[1]['probability']), | |
result[2]['label']: float(result[2]['probability']) | |
} | |
# history += result[0]['label'] + '\n' | |
classificationResult = dic | |
return dic | |
def interface_realtime(language, audio): | |
global history | |
result = classify(language, audio) | |
dic = {result[0]['label']: float(result[0]['probability']), | |
result[1]['label']: float(result[1]['probability']), | |
result[2]['label']: float(result[2]['probability']) | |
} | |
history = result[0]['label'] + '\n' + history | |
return dic | |
# --- QA Model --- | |
def get_answers(retriever, query): | |
from haystack.pipelines import FAQPipeline | |
pipe = FAQPipeline(retriever=retriever) | |
from haystack.utils import print_answers | |
# Run any question and change top_k to see more or less answers | |
prediction = pipe.run(query=query, params={"Retriever": {"top_k": 1}}) | |
answers = prediction['answers'] | |
if answers: | |
return answers[0].answer | |
else: | |
return "I don't have an answer to that question" | |
def add_text(chat_history, text): | |
chat_history = chat_history + [(text, None)] | |
return chat_history, gr.Textbox(value="", interactive=False) | |
def chatbot_response(chat_history, language): | |
chat_history[-1][1] = "" | |
global retriever | |
global last_answer | |
if language == 'pt-br': | |
response = get_answers(retriever, GoogleTranslator(source='pt', target='en').translate(chat_history[-1][0])) | |
response = GoogleTranslator(source='en', target='pt').translate(response) | |
else: | |
response = get_answers(retriever, chat_history[-1][0]) | |
last_answer = response | |
for character in response: | |
chat_history[-1][1] += character | |
time.sleep(0.01) | |
yield chat_history | |
retriever = load_qa_model() | |
spec_generator, model_nvidia = initialize_text_to_speech_model() | |
model_tt5, processor, vocoder, speaker_embeddings = initialize_tt5_model() | |
pipe = pipeline("automatic-speech-recognition", model="pedropauletti/whisper-small-pt") | |
pipe_en = pipeline("automatic-speech-recognition", model="openai/whisper-small") |