Spaces:
Sleeping
Sleeping
import gradio as gr | |
from gradio_pdf import PDF | |
from gradio_client import Client, handle_file | |
import os | |
# Define your Hugging Face token (make sure to set it as an environment variable) | |
HF_TOKEN = os.getenv("HF_TOKEN") # Replace with your actual token if not using an environment variable | |
# Initialize the Gradio Client for the specified API | |
client = Client("mangoesai/Elections_Comparing_Agent", hf_token=HF_TOKEN) | |
client_name = ['2016 Election','2024 Election'] | |
def stream_chat_with_rag( | |
message: str, | |
history: list, | |
client_name: str, | |
system_prompt: str, | |
num_retrieved_docs: int = 10, | |
num_docs_final: int = 9, | |
temperature: float = 0, | |
max_new_tokens: int = 1024, | |
top_p: float = 1.0, | |
top_k: int = 20, | |
penalty: float = 1.2, | |
): | |
print(f"Message: {message}") | |
print(f"History: {history}") | |
# Build the conversation prompt including system prompt and history | |
conversation = f"{system_prompt}\n\nFor Client: {client_name}\n" | |
# Add previous conversation history | |
for user_input, assistant_response in history: | |
conversation += f"User: {user_input}\nAssistant: {assistant_response}\n" | |
# Add the current user message | |
conversation += f"User: {message}\nAssistant:" | |
# Call the API with the user's process_query | |
question = message | |
#answer = client.predict(question=question, api_name="/run_graph") | |
answer = client.predict( | |
query= message, | |
context="", | |
election_year=client_name, | |
api_name="/process_query" | |
) | |
# Debugging: Print the raw response | |
print("Raw answer from API:") | |
print(answer) | |
# # Format the assistant's answer and the relevant documents separately | |
# formatted_answer = format_answer_string(answer) | |
# # Update the conversation history with the new message and answer | |
# history.append((message, formatted_answer)) | |
# Return the formatted answer | |
return answer | |
# def format_answer_string(answer: str): | |
# """ | |
# This function extracts and formats the assistant's response before document metadata. | |
# Anything after the marker `[(` (where documents are listed) is ignored. | |
# """ | |
# # Step 1: Split the response at the start of the document metadata | |
# split_marker = "[(" | |
# if split_marker in answer: | |
# # Everything before the marker is the relevant answer | |
# answer_before_docs = answer.split(split_marker)[0] | |
# else: | |
# # If no documents metadata, return the entire answer | |
# answer_before_docs = answer | |
# # Step 2: Clean up formatting by replacing escaped newline characters | |
# formatted_answer = answer_before_docs.replace("\\n", "\n").strip() | |
# # Step 3: Remove potential starting and ending artifacts like (' and ,) if present | |
# if formatted_answer.startswith("(\"") and formatted_answer.endswith("\","): | |
# formatted_answer = formatted_answer[2:-2].strip() | |
# # Optional: Add a prefix for clarity | |
# formatted_answer = "Co-Pilot: " + formatted_answer | |
# return formatted_answer | |
# def format_relevant_documents(relevant_docs: list): | |
# """ | |
# This function formats the relevant document metadata and content for readable output. | |
# It extracts the heading, page number, and a snippet of the content from each document. | |
# """ | |
# formatted_docs = "Relevant Documents:\n\n" | |
# for idx, (doc, score) in enumerate(relevant_docs): | |
# # Extract the relevant metadata | |
# heading = doc.metadata.get('heading', 'Unnamed Document') | |
# page_number = int(doc.metadata.get('page_number', -1)) | |
# source = doc.metadata.get('source', 'Unknown Source') | |
# confidence = round(score, 4) # Rounding the score for cleaner output | |
# # Add the formatted details to the output string | |
# formatted_docs += f"Document {idx + 1}:\n" | |
# formatted_docs += f" - Heading: {heading}\n" | |
# formatted_docs += f" - Page Number: {page_number}\n" | |
# formatted_docs += f" - Source: {source}\n" | |
# formatted_docs += f" - Confidence Score: {confidence}\n" | |
# # Optionally include a snippet from the content | |
# content_snippet = doc.page_content[:200] # Take the first 200 characters for preview | |
# formatted_docs += f" - Content Snippet: {content_snippet}...\n\n" | |
# return formatted_docs.strip() | |
# CSS for custom styling | |
CSS = """ | |
# chat-container { | |
height: 100vh; | |
} | |
""" | |
# Title for the application | |
TITLE = "<h1 style='text-align:center;'>Reddit Election Q&A agent v0.1</h1>" | |
# Create the Gradio Blocks interface | |
with gr.Blocks(css=CSS) as demo: | |
gr.HTML(TITLE) | |
with gr.Tab("Chat"): | |
chatbot = gr.Chatbot() # Create a chatbot interface | |
chat_interface = gr.ChatInterface( | |
fn=stream_chat_with_rag, | |
chatbot=chatbot, | |
additional_inputs_accordion=gr.Accordion(label="⚙️ Parameters", open=False, render=False), | |
additional_inputs=[ | |
gr.Dropdown(client_name,value="2016 Election",label="Select Election year", render=False,allow_custom_value=True), | |
gr.Textbox( | |
value="You are an expert assistant", | |
label="System Prompt", | |
render=False, | |
), | |
gr.Slider( | |
minimum=1, | |
maximum=10, | |
step=1, | |
value=10, | |
label="Number of Initial Documents to Retrieve", | |
render=False, | |
), | |
gr.Slider( | |
minimum=1, | |
maximum=10, | |
step=1, | |
value=9, | |
label="Number of Final Documents to Retrieve", | |
render=False, | |
), | |
gr.Slider( | |
minimum=0.2, | |
maximum=1, | |
step=0.1, | |
value=0, | |
label="Temperature", | |
render=False, | |
), | |
gr.Slider( | |
minimum=128, | |
maximum=8192, | |
step=1, | |
value=1024, | |
label="Max new tokens", | |
render=False, | |
), | |
gr.Slider( | |
minimum=0.0, | |
maximum=1.0, | |
step=0.1, | |
value=1.0, | |
label="Top P", | |
render=False, | |
), | |
gr.Slider( | |
minimum=1, | |
maximum=20, | |
step=1, | |
value=20, | |
label="Top K", | |
render=False, | |
), | |
gr.Slider( | |
minimum=0.0, | |
maximum=2.0, | |
step=0.1, | |
value=1.2, | |
label="Repetition Penalty", | |
render=False, | |
), | |
], | |
) | |
# with gr.Tab("Process PDF"): | |
# pdf_input = gr.File(label="Upload PDF File") | |
# #select_client_dropdown = gr.Dropdown(client_name, value="rosariarossi", label="Select or Type Client", allow_custom_value=True) | |
# pdf_output = gr.Textbox(label="PDF Result", interactive=False) | |
# pdf_button = gr.Button("Process PDF") | |
# pdf_button.click( | |
# process_pdf, | |
# inputs=[pdf_input], # Pass both PDF and client name is not required | |
# outputs=pdf_output | |
# ) | |
# with gr.Tab("Answer with RAG"): | |
# question_input = gr.Textbox(label="Enter Question for RAG") | |
# answer_with_rag_select_client_dropdown = gr.Dropdown(client_name, value="primo", label="Select or Type Client", allow_custom_value=True) | |
# rag_output = gr.Textbox(label="RAG Answer Result", interactive=False) | |
# rag_button = gr.Button("Get Answer") | |
# rag_button.click( | |
# rag_api, | |
# inputs=[question_input,answer_with_rag_select_client_dropdown ], | |
# outputs=rag_output | |
# ) | |
# with gr.Tab(label="Manage Files"): | |
# with gr.Column(): | |
# delete_index_button = gr.Button("Delete All Files") | |
# delete_index_textout = gr.Textbox(label="Deleted Files and Refresh Result") | |
# delete_index_button.click(fn=delete_index, inputs=[],outputs=[delete_index_textout]) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() | |