Spaces:
Sleeping
Sleeping
import nest_asyncio | |
import gradio as gr | |
import tiktoken | |
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader | |
from llama_index.core.postprocessor import LLMRerank | |
import logging | |
import sys | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
from llama_index.legacy.llms.huggingface import HuggingFaceInferenceAPI, HuggingFaceLLM | |
from llama_index.core import Settings | |
from llama_index.llms.huggingface import HuggingFaceLLM | |
import torch | |
from transformers import BitsAndBytesConfig | |
from llama_index.core.prompts import PromptTemplate | |
from llama_index.llms.openai import OpenAI | |
import os | |
import pandas as pd | |
from llama_index.core import Document | |
from llama_index.core.retrievers import VectorIndexRetriever | |
from llama_index.core import QueryBundle | |
import time | |
from huggingface_hub import login | |
from gradio import ChatMessage | |
nest_asyncio.apply() | |
hf_token = os.getenv('hf_token') | |
# Replace 'your_token_here' with your actual Hugging Face API token | |
login(token=hf_token) | |
# quantize to save memory | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_use_double_quant=True, | |
) | |
llm = HuggingFaceLLM( | |
model_name="kheopss/kheops_hermes-202k-e3-v0.14-bnb-16bit", | |
tokenizer_name="kheopss/kheops_hermes-202k-e3-v0.14-bnb-16bit", | |
context_window=3900, | |
max_new_tokens=2560, | |
model_kwargs={"quantization_config": quantization_config}, | |
generate_kwargs={"temperature": 0.01, "top_k": 0.95, "top_p": 0.95}, | |
device_map="cuda:0", | |
) | |
embed_model = HuggingFaceEmbedding( | |
model_name="kheopss/kheops_embedding_e5_v3", | |
) | |
Settings.llm=llm | |
Settings.embed_model=embed_model | |
# Replace 'file_path.json' with the path to your JSON file | |
file_path = 'response_metropo_cleaned.json' | |
data = pd.read_json(file_path) | |
documents = [Document(text=row['values'],metadata={"filename": row['file_name'], "description":row['file_description']},) for index, row in data.iterrows()] | |
index = VectorStoreIndex.from_documents(documents, show_progress=True) | |
def get_retrieved_nodes( | |
query_str, vector_top_k=10, reranker_top_n=3, with_reranker=False | |
): | |
query_bundle = QueryBundle(query_str) | |
# configure retriever | |
phase_01_start = time.time() | |
retriever = VectorIndexRetriever( | |
index=index, | |
similarity_top_k=vector_top_k, | |
) | |
retrieved_nodes = retriever.retrieve(query_bundle) | |
phase_01_end = time.time() | |
print(f"Phase 01 <RETRIEVING> took : {phase_01_end-phase_01_start}") | |
phase_02_start = time.time() | |
if with_reranker: | |
# configure reranker | |
reranker = LLMRerank( | |
choice_batch_size=5, | |
top_n=reranker_top_n, | |
) | |
retrieved_nodes = reranker.postprocess_nodes( | |
retrieved_nodes, query_bundle | |
) | |
phase_02_end = time.time() | |
print(f"Phase 02 <RERANKING> took : {phase_02_end-phase_02_start}") | |
return retrieved_nodes | |
def get_all_text(new_nodes): | |
texts = [] | |
for i, node in enumerate(new_nodes, 1): | |
texts.append(f"\nDocument {i} : {node.get_text()}") | |
return ' '.join(texts) | |
# Charger le tokenizer cl100k_base | |
encoding = tiktoken.get_encoding("cl100k_base") | |
def estimate_tokens(text): | |
# Encoder le texte pour obtenir les tokens | |
tokens = encoding.encode(text) | |
return len(tokens) | |
def process_final(user_prom,history): | |
import time | |
all_process_start = time.time() | |
system_p = ''' | |
You are an AI assistant supporting public agents in Nice by helping residents and citizens access appropriate services. You will respond to user inquiries based on the information provided in the context. Your responses should be accurate, relevant, and tailored to the user’s needs, always ensuring clarity and usefulness, and don't add extra information like phone numbers and links if doesn't provided in context. | |
You must assume that all numbers (such as phone numbers or service numbers) and addresses provided in the context are correct and should not question their validity. If the context does not provide sufficient information, or you are uncertain about other aspects, acknowledge this and offer to guide the user towards the next steps, such as contacting a relevant service or providing general advice based on common practices. | |
Always respond in French and ensure your answers are conversational, helpful, and aligned with the user's query and the provided documents''' | |
new_nodes = get_retrieved_nodes( | |
user_prom, | |
vector_top_k=5, | |
reranker_top_n=3, | |
with_reranker=True, | |
) | |
get_texts = get_all_text(new_nodes) | |
print("PHASE 03 passing to LLM\n") | |
sys_p = f"<|im_start|>system \n{system_p}\n DOCUMENTS {get_texts}\n<|im_end|>" | |
prompt_f="" | |
total_tokens = estimate_tokens(prompt_f) | |
for val in reversed(history): | |
if val[0]: | |
user_p = f" <|im_start|>user \n {val[0]}\n<|im_end|>" | |
if val[1]: | |
assistant_p = f" <|im_start|>assistant \n {val[1]}\n<|im_end|>" | |
current_tokens = estimate_tokens(user_p+assistant_p) | |
# Vérifier si l'ajout de cet historique dépasse la limite | |
if total_tokens + current_tokens > 3000: | |
break # Arrêter l'ajout si on dépasse la limite | |
else: | |
# Ajouter à `prompt_f` et mettre à jour le nombre total de tokens | |
prompt_f = user_p + assistant_p + prompt_f | |
total_tokens += current_tokens | |
prompt_f=f"{sys_p} {prompt_f} <|im_start|>user \n{user_prom} \n<|im_end|><|im_start|>assistant \n" | |
phase_03_start = time.time() | |
gen =llm.stream_complete(formatted=True, prompt=prompt_f) | |
print (f"le nombre TOTAL de tokens : {total_tokens}\n") | |
print("_"*100) | |
print(prompt_f) | |
print("o"*100) | |
for response in gen: | |
yield response.text | |
description = """ | |
<p> | |
<center> | |
<img src="https://www.nicecotedazur.org/wp-content/themes/mnca/images/logo-metropole-nca.png" alt="rick" width="250"/> | |
</center> | |
</p> | |
<p style="text-align:right"> Made by KHEOPS AI</p> | |
""" | |
demo = gr.ChatInterface( | |
fn=process_final, | |
title="METROPOLE CHATBOT", | |
description=description, | |
) | |
demo.launch(share=True, debug =True) |