Spaces:
Build error
Build error
import re | |
import functools | |
import requests | |
import pandas as pd | |
import plotly.express as px | |
import torch | |
import gradio as gr | |
from transformers import pipeline, Wav2Vec2ProcessorWithLM | |
from pyannote.audio import Pipeline | |
from librosa import load, resample | |
import whisperx | |
import re | |
alphabets= "([A-Za-z])" | |
prefixes = "(Mr|St|Mrs|Ms|Dr)[.]" | |
suffixes = "(Inc|Ltd|Jr|Sr|Co)" | |
starters = "(Mr|Mrs|Ms|Dr|He\s|She\s|It\s|They\s|Their\s|Our\s|We\s|But\s|However\s|That\s|This\s|Wherever)" | |
acronyms = "([A-Z][.][A-Z][.](?:[A-Z][.])?)" | |
websites = "[.](com|net|org|io|gov)" | |
def split_into_sentences(text): | |
text = " " + text + " " | |
text = text.replace("\n"," ") | |
text = re.sub(prefixes,"\\1<prd>",text) | |
text = re.sub(websites,"<prd>\\1",text) | |
if "Ph.D" in text: text = text.replace("Ph.D.","Ph<prd>D<prd>") | |
text = re.sub("\s" + alphabets + "[.] "," \\1<prd> ",text) | |
text = re.sub(acronyms+" "+starters,"\\1<stop> \\2",text) | |
text = re.sub(alphabets + "[.]" + alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>\\3<prd>",text) | |
text = re.sub(alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>",text) | |
text = re.sub(" "+suffixes+"[.] "+starters," \\1<stop> \\2",text) | |
text = re.sub(" "+suffixes+"[.]"," \\1<prd>",text) | |
text = re.sub(" " + alphabets + "[.]"," \\1<prd>",text) | |
if "”" in text: text = text.replace(".”","”.") | |
if "\"" in text: text = text.replace(".\"","\".") | |
if "!" in text: text = text.replace("!\"","\"!") | |
if "?" in text: text = text.replace("?\"","\"?") | |
text = text.replace(".",".<stop>") | |
text = text.replace("?","?<stop>") | |
text = text.replace("!","!<stop>") | |
text = text.replace("<prd>",".") | |
sentences = text.split("<stop>") | |
sentences = sentences[:-1] | |
sentences = [s.strip() for s in sentences] | |
return sentences | |
def summarize(diarized, check, summarization_pipeline): | |
""" | |
diarized: a list of tuples. Each tuple has a string to be displayed and a label for highlighting. | |
The start/end times are not highlighted [(speaker text, speaker id), (start time/end time, None)] | |
check is a list of speaker ids whose speech will get summarized | |
""" | |
if not check: | |
return "" | |
# Combine text based on the speaker id | |
text_lines = [f"{d[1]}: {d[0]}" if len(check) == 2 and d[1] is not None else d[0] for d in diarized if d[1] in check] | |
text = "\n".join(text_lines) | |
# Cache the inner function because the outer function cannot be cached | |
def call_summarize_api(text): | |
return summarization_pipeline(text)[0]["summary_text"] | |
return call_summarize_api(text) | |
# display if the sentiment value is above these thresholds | |
thresholds = { | |
"joy": 0.99, | |
"anger": 0.95, | |
"surprise": 0.95, | |
"sadness": 0.98, | |
"fear": 0.95, | |
"love": 0.99, | |
} | |
color_map = { | |
"joy": "green", | |
"anger": "red", | |
"surprise": "yellow", | |
"sadness": "blue", | |
"fear": "orange", | |
"love": "purple", | |
} | |
def sentiment(diarized, emotion_pipeline): | |
def split_into_intervals(speaker_speech, start_time, end_time): | |
sentences = split_into_sentences(speaker_speech) | |
interval_size = (end_time - start_time) / len(sentences) | |
return sentences, interval_size | |
def process_customer_emotion(outputs, sentences, start_time, interval_size): | |
sentiments = [] | |
for idx, (o, t) in enumerate(zip(outputs, sentences)): | |
sent = "neutral" | |
if o["score"] > thresholds[o["label"]]: | |
sentiments.append((t + f"({round(idx*interval_size+start_time,1)} s)", o["label"])) | |
if o["label"] in {"joy", "love", "surprise"}: | |
sent = "positive" | |
elif o["label"] in {"sadness", "anger", "fear"}: | |
sent = "negative" | |
if sent != "neutral": | |
to_plot.append((start_time + idx * interval_size, sent)) | |
plot_sentences.append(t) | |
return sentiments | |
x_min = 100 | |
x_max = 0 | |
customer_sentiments, to_plot, plot_sentences = [], [], [] | |
for i in range(0, len(diarized), 2): | |
speaker_speech, speaker_id = diarized[i] | |
times, _ = diarized[i + 1] | |
start_time, end_time = map(float, times[5:].split("-")) | |
x_min, x_max = min(x_min, start_time), max(x_max, end_time) | |
if "Customer" in speaker_id: | |
sentences, interval_size = split_into_intervals(speaker_speech, start_time, end_time) | |
outputs = emotion_pipeline(sentences) | |
customer_sentiments.extend(process_customer_emotion(outputs, sentences, start_time, interval_size)) | |
plot_df = pd.DataFrame(data={"x": [x for x, _ in to_plot], "y": [y for _, y in to_plot], "sentence": plot_sentences}) | |
fig = px.line(plot_df, x="x", y="y", hover_data={"sentence": True, "x": True, "y": False}, labels={"x": "time (seconds)", "y": "sentiment"}, title=f"Customer sentiment over time", markers=True) | |
fig.update_yaxes(categoryorder="category ascending") | |
fig.update_layout(font=dict(size=18), xaxis_range=[x_min - 5, x_max + 5]) | |
return customer_sentiments, fig | |
def speech_to_text(speech_file, speaker_segmentation, whisper, alignment_model, metadata, whisper_device): | |
def process_chunks(turn, chunks): | |
diarized = "" | |
i = 0 | |
while i < len(chunks) and chunks[i]["end"] <= turn.end: | |
diarized += chunks[i]["text"] + " " | |
i += 1 | |
return diarized, i | |
speaker_output = speaker_segmentation(speech_file) | |
result = whisper.transcribe(speech_file) | |
chunks = whisperx.align(result["segments"], alignment_model, metadata, speech_file, whisper_device)["word_segments"] | |
diarized_output = [] | |
i = 0 | |
speaker_counter = 0 | |
for turn, _, _ in speaker_output.itertracks(yield_label=True): | |
speaker = "Customer" if speaker_counter % 2 == 0 else "Support" | |
diarized, i = process_chunks(turn, chunks[i:]) | |
if diarized: | |
diarized_output.extend([(diarized, speaker), (f"from {turn.start:.2f}-{turn.end:.2f}", None)]) | |
speaker_counter += 1 | |
return diarized_output |