Spaces:
Runtime error
Runtime error
import gradio as gr | |
from gradio.components import Textbox, Checkbox | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, T5ForConditionalGeneration | |
from peft import PeftModel | |
import torch | |
import datasets | |
from sentence_transformers import CrossEncoder | |
import math | |
import re | |
from nltk import sent_tokenize, word_tokenize | |
import nltk | |
nltk.download('punkt') | |
# Load cross encoder | |
top_k = 10 | |
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
# Load your fine-tuned model and tokenizer | |
model_name = "google/flan-t5-large" | |
peft_name = "legacy107/flan-t5-large-ia3-covidqa" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
pretrained_model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large") | |
model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large") | |
model = PeftModel.from_pretrained(model, peft_name) | |
peft_name = "legacy107/flan-t5-large-ia3-bioasq-paraphrase" | |
paraphrase_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) | |
paraphrase_model = PeftModel.from_pretrained(paraphrase_model, peft_name) | |
max_length = 512 | |
max_target_length = 200 | |
# Load your dataset | |
dataset = datasets.load_dataset("minh21/COVID-QA-Chunk-64-testset-biencoder-data-90_10", split="train") | |
# dataset = dataset.shuffle() | |
dataset = dataset.select([4, 16, 22, 154, 63, 19, 31, 97, 133, 183]) | |
# Context chunking | |
min_sentences_per_chunk = 3 | |
chunk_size = 64 | |
window_size = math.ceil(min_sentences_per_chunk * 0.25) | |
over_lap_chunk_size = chunk_size * 0.25 | |
def chunk_splitter(context): | |
sentences = sent_tokenize(context) | |
chunks = [] | |
current_chunk = [] | |
for sentence in sentences: | |
if len(current_chunk) < min_sentences_per_chunk: | |
current_chunk.append(sentence) | |
continue | |
elif len(word_tokenize(' '.join(current_chunk) + " " + sentence)) < chunk_size: | |
current_chunk.append(sentence) | |
continue | |
chunks.append(' '.join(current_chunk)) | |
new_chunk = current_chunk[-window_size:] | |
new_window = window_size | |
buffer_new_chunk = new_chunk | |
while len(word_tokenize(' '.join(new_chunk))) <= over_lap_chunk_size: | |
buffer_new_chunk = new_chunk | |
new_window += 1 | |
new_chunk = current_chunk[-new_window:] | |
if new_window >= len(current_chunk): | |
break | |
current_chunk = buffer_new_chunk | |
current_chunk.append(sentence) | |
if current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
return chunks | |
def clean_data(text): | |
# Extract abstract content | |
index = text.find("\nAbstract: ") | |
if index != -1: | |
cleaned_text = text[index + len("\nAbstract: "):] | |
else: | |
cleaned_text = text # If "\nAbstract: " is not found, keep the original text | |
# Remove both http and https links using a regular expression | |
cleaned_text = re.sub(r'(http(s|)\/\/:( |)\S+)|(http(s|):\/\/( |)\S+)', '', cleaned_text) | |
# Remove DOI patterns like "doi:10.1371/journal.pone.0007211.s003" | |
cleaned_text = re.sub(r'doi:( |)\w+', '', cleaned_text) | |
# Remove the "(0.11 MB DOC)" pattern | |
cleaned_text = re.sub(r'\(0\.\d+ MB DOC\)', '', cleaned_text) | |
cleaned_text = re.sub(r'www\.\w+(.org|)', '', cleaned_text) | |
return cleaned_text | |
def paraphrase_answer(question, answer, use_pretrained=False): | |
# Combine question and context | |
input_text = f"question: {question}. Paraphrase the answer to make it more natural answer: {answer}" | |
# Tokenize the input text | |
input_ids = tokenizer( | |
input_text, | |
return_tensors="pt", | |
padding="max_length", | |
truncation=True, | |
max_length=max_length, | |
).input_ids | |
# Generate the answer | |
with torch.no_grad(): | |
if use_pretrained: | |
generated_ids = pretrained_model.generate(input_ids=input_ids, max_new_tokens=max_target_length) | |
else: | |
generated_ids = paraphrase_model.generate(input_ids=input_ids, max_new_tokens=max_target_length) | |
# Decode and return the generated answer | |
paraphrased_answer = tokenizer.decode(generated_ids[0], skip_special_tokens=True) | |
return paraphrased_answer | |
def retrieve_context(question, contexts): | |
# cross-encoder | |
hits = [{"corpus_id": i} for i in range(len(contexts))] | |
cross_inp = [[question, contexts[hit["corpus_id"]]] for hit in hits] | |
cross_scores = cross_encoder.predict(cross_inp, show_progress_bar=False) | |
for idx in range(len(cross_scores)): | |
hits[idx]["cross-score"] = cross_scores[idx] | |
hits = sorted(hits, key=lambda x: x["cross-score"], reverse=True) | |
return " ".join( | |
[contexts[hit["corpus_id"]] for hit in hits[0:top_k]] | |
).replace("\n", " ") | |
# Define your function to generate answers | |
def generate_answer(question, context, ground, do_pretrained, do_natural, do_pretrained_natural): | |
contexts = chunk_splitter(clean_data(context)) | |
retrieved_context = retrieve_context(question, contexts) | |
ground_in_context = (retrieved_context.find(ground) != -1) | |
# Combine question and context | |
input_text = f"question: {question} context: {retrieved_context}" | |
# Tokenize the input text | |
input_ids = tokenizer( | |
input_text, | |
return_tensors="pt", | |
padding="max_length", | |
truncation=True, | |
max_length=max_length, | |
).input_ids | |
# Generate the answer | |
with torch.no_grad(): | |
generated_ids = model.generate(input_ids=input_ids, max_new_tokens=max_target_length) | |
# Decode and return the generated answer | |
generated_answer = tokenizer.decode(generated_ids[0], skip_special_tokens=True) | |
# Paraphrase answer | |
paraphrased_answer = "" | |
if do_natural: | |
paraphrased_answer = paraphrase_answer(question, generated_answer) | |
# Get pretrained model's answer | |
pretrained_answer = "" | |
if do_pretrained: | |
with torch.no_grad(): | |
pretrained_generated_ids = pretrained_model.generate(input_ids=input_ids, max_new_tokens=max_target_length) | |
pretrained_answer = tokenizer.decode(pretrained_generated_ids[0], skip_special_tokens=True) | |
# Get pretrained model's natural answer | |
pretrained_paraphrased_answer = "" | |
if do_pretrained_natural: | |
pretrained_paraphrased_answer = paraphrase_answer(question, generated_answer, True) | |
return generated_answer, paraphrased_answer, ("Yes" if ground_in_context else "No"), pretrained_answer, pretrained_paraphrased_answer, retrieved_context | |
# Define a function to list examples from the dataset | |
def list_examples(): | |
examples = [] | |
for example in dataset: | |
context = example["context"] | |
question = example["question"] | |
answer = example["answer"] | |
examples.append([question, context, answer, True, True, True]) | |
return examples | |
# Create a Gradio interface | |
iface = gr.Interface( | |
fn=generate_answer, | |
inputs=[ | |
Textbox(label="Question"), | |
Textbox(label="Context"), | |
Textbox(label="Ground truth"), | |
Checkbox(label="Include pretrained model's result"), | |
Checkbox(label="Include natural answer"), | |
Checkbox(label="Include pretrained model's natural answer") | |
], | |
outputs=[ | |
Textbox(label="Generated Answer"), | |
Textbox(label="Natural Answer"), | |
Textbox(label="Ground truth in the retrieved context"), | |
Textbox(label="Pretrained Model's Answer"), | |
Textbox(label="Pretrained Model's Natural Answer"), | |
Textbox(label="Retrieved Context") | |
], | |
examples=list_examples(), | |
examples_per_page=1, | |
) | |
# Launch the Gradio interface | |
iface.launch() |