Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
from nltk.tokenize import sent_tokenize | |
# Split the text into sentences. Necessary for NLI models | |
def split_sentences(text): | |
return sent_tokenize(text) | |
###### Prompting | |
def query_model_prompting(model, text, prompt_with_mask, top_k, targets): | |
"""Query the prompting model | |
:param model: Prompting model object | |
:type model: Huggingface pipeline object | |
:param text: Event description (context) | |
:type text: str | |
:param prompt_with_mask: Prompt with a mask | |
:type prompt_with_mask: str | |
:param top_k: Number of tokens to output | |
:type top_k: integer | |
:param targets: Restrict the answer to these possible tokens | |
:type targets: list | |
:return: Results of the prompting model | |
:rtype: list of dict | |
""" | |
sequence = text + prompt_with_mask | |
output_tokens = model(sequence, top_k=top_k, targets=targets) | |
return output_tokens | |
def do_sentence_entailment(sentence, hypothesis, model): | |
"""Concatenate context and hypothesis then perform entailment | |
:param sentence: Event description (context), 1 sentence | |
:type sentence: str | |
:param hypothesis: Mask filled with a token | |
:type hypothesis: str | |
:param model: NLI Model | |
:type model: Huggingface pipeline | |
:return: DataFrame containing the result of the entailment | |
:rtype: pandas DataFrame | |
""" | |
text = sentence + '</s></s>' + hypothesis | |
res = model(text, return_all_scores=True) | |
df_res = pd.DataFrame(res[0]) | |
df_res['label'] = df_res['label'].apply(lambda x: x.lower()) | |
df_res.columns = ["Label", "Score"] | |
return df_res | |
def softmax(x): | |
"""Compute softmax values for each sets of scores in x.""" | |
return np.exp(x) / np.sum(np.exp(x), axis=0) | |
######### NLI + PROMPTING | |
def do_text_entailment(text, hypothesis, model): | |
""" | |
Do entailment for each sentence of the event description as | |
model was trained on sentence pair | |
:param text: Event Description (context) | |
:type text: str | |
:param hypothesis: Mask filled with a token | |
:type hypothesis: str | |
:param model: Model NLI | |
:type model: Huggingface pipeline | |
:return: List of entailment results for each sentence of the text | |
:rtype: list | |
""" | |
text_entailment_results = [] | |
for i, sentence in enumerate(split_sentences(text)): | |
df_score = do_sentence_entailment(sentence, hypothesis, model) | |
text_entailment_results.append((sentence, hypothesis, df_score)) | |
return text_entailment_results | |
def get_true_entailment(text_entailment_results, nli_limit): | |
""" | |
From the result of each sentence entailment, extract the maximum entailment score and | |
check if it's higher than the entailment threshold. | |
""" | |
true_hypothesis_list = [] | |
max_score = 0 | |
for sentence_entailment in text_entailment_results: | |
df_score = sentence_entailment[2] | |
score = df_score[df_score["Label"] == 'entailment']["Score"].values.max() | |
if score > max_score: | |
max_score = score | |
if max_score > nli_limit: | |
true_hypothesis_list.append((sentence_entailment[1], np.round(max_score,2))) | |
return list(set(true_hypothesis_list)) | |
def prompt_to_nli(text, prompt, model_prompting, nli_model, nlp, top_k=10, nli_limit=0.5, remove_lemma=False): | |
""" | |
Apply the PR-ENT pipeline | |
:param text: Event description | |
:type text: str | |
:param prompt: Prompt with mask | |
:type prompt: str | |
:param model_prompting: Prompting Model | |
:type model_prompting: Huggingface pipeline | |
:param nli_model: NLI Model | |
:type nli_model: Huggingface pipeline | |
:param top_k: Number of words output by the prompting model | |
:type top_k: int | |
:param nli_limit: Entailment threshold | |
:type nli_limit: float | |
:return: Results of the pipeline | |
:rtype: list | |
""" | |
prompt_masked = prompt.format(model_prompting.tokenizer.mask_token) | |
label = [] | |
output_prompting = query_model_prompting(model_prompting, text, prompt_masked, top_k, targets=None) | |
if remove_lemma: | |
output_prompting = filter_prompt_output_by_lemma(prompt, output_prompting, nlp) | |
for token in output_prompting: | |
hypothesis = prompt.format(token['token_str']) | |
text_entailment_results = do_text_entailment(text, hypothesis, nli_model) | |
true_hypothesis_list = get_true_entailment(text_entailment_results, nli_limit) | |
if len(true_hypothesis_list) > 0: | |
label.append(((token['token_str'], token['score']), true_hypothesis_list[0])) | |
return label | |
def display_nli_pr_results_as_list(title, list_results): | |
""" | |
Display the list of entailment results as a streamlit choice list | |
""" | |
st.markdown( | |
""" | |
<style> | |
span[data-baseweb="tag"] { | |
background-color: red !important; | |
} | |
</style> | |
""", | |
unsafe_allow_html=True, | |
) | |
prompt_list = st.multiselect( | |
title, | |
list_results | |
, | |
list_results, key='results_mix') | |
##### QA | |
def question_answering(model, text, questions_list, to_print=True): | |
""" | |
Apply question answering model | |
:param model: QA Model | |
:type model: Huggingface pipeline | |
:param text: Event description (context) | |
:type text: str | |
:param question: Question to answer | |
:type question: str | |
:return: Tuple containing the answer and the confidence score | |
:rtype: tuple | |
""" | |
for question in questions_list: | |
QA_input = { | |
'question': question, | |
'context': text} | |
res = model(QA_input, handle_impossible_answer=False) | |
if to_print: | |
st.write("Question: {}".format(question)) | |
st.write("Answer: {}".format(res["answer"])) | |
return res["answer"], res["score"] | |
### Prompt + NLI + QA | |
def get_who_what_whom_qa(text, tokens, model_qa): | |
who_what_whom = [] | |
if not tokens: | |
res_dict = {"Actor":'', "Action":'', "Target": ''} | |
st.write("No entailed tokens.") | |
else: | |
for token in tokens: | |
# res_dict = {"who":'', "did_what":token, "to_whom": '', "qa_score": []} | |
res_dict = {"Actor":'', "Action":token, "Target": ''} | |
if token[-3:] == 'ing': | |
perp,score_p = question_answering(model_qa, text, ["Who was {}?".format(token)], to_print=False) | |
else: | |
perp,score_p = question_answering(model_qa, text, ["Who {} people?".format(token)], to_print=False) | |
if perp: | |
res_dict["Actor"] = perp + ' [' + str(np.round(score_p*100,1)) + '%]' | |
else: | |
res_dict["Actor"] = 'N/A' + ' [' + str(np.round(score_p*100,1)) + '%]' | |
victim,score_v = question_answering(model_qa, text, ["Who was {}?".format(token)], to_print=False) | |
if victim: | |
res_dict["Target"] = victim + ' [' + str(np.round(score_v*100,1)) + '%]' | |
else: | |
res_dict["Target"] = 'N/A' + ' [' + str(np.round(score_v*100,1)) + '%]' | |
who_what_whom.append(res_dict) | |
return who_what_whom | |
def remove_similar_lemma_from_list(prompt, list_words, nlp): | |
## Compute a dictionnary with the lemma for all tokens | |
## If there is a duplicate lemma then the dictionnary value will be a list of the corresponding tokens | |
lemma_dict = {} | |
for each in list_words: | |
mask_filled = nlp(prompt.strip('.').format(each)) | |
lemma_dict.setdefault([x.lemma_ for x in mask_filled][-1],[]).append(each) | |
## Get back the list of tokens | |
## If multiple tokens available then take the shortest one | |
new_token_list = [] | |
for key in lemma_dict.keys(): | |
if len(lemma_dict[key]) >= 1: | |
new_token_list.append(min(lemma_dict[key], key=len)) | |
else: | |
raise ValueError("Lemma dict has 0 corresponding words") | |
return new_token_list | |
def filter_prompt_output_by_lemma(prompt, output_prompting, nlp): | |
""" | |
Remove all similar lemmas from the prompt output (e.g. "protest", "protests") | |
""" | |
list_words = [x['token_str'] for x in output_prompting] | |
new_token_list = remove_similar_lemma_from_list(prompt, list_words, nlp) | |
return [x for x in output_prompting if x['token_str'] in new_token_list] |