import nest_asyncio
import gradio as gr
import tiktoken
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.postprocessor import LLMRerank
import logging
import sys
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.legacy.llms.huggingface import HuggingFaceInferenceAPI, HuggingFaceLLM
from llama_index.core import Settings
from llama_index.llms.huggingface import HuggingFaceLLM
import torch
from transformers import BitsAndBytesConfig
from llama_index.core.prompts import PromptTemplate
from llama_index.llms.openai import OpenAI
import os
import pandas as pd
from llama_index.core import Document
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core import QueryBundle
import time
from huggingface_hub import login
from gradio import ChatMessage
nest_asyncio.apply()
hf_token = os.getenv('hf_token')
# Replace 'your_token_here' with your actual Hugging Face API token
login(token=hf_token)
# quantize to save memory
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
llm = HuggingFaceLLM(
model_name="kheopss/kheops_hermes-202k-e3-v0.14-bnb-16bit",
tokenizer_name="kheopss/kheops_hermes-202k-e3-v0.14-bnb-16bit",
context_window=3900,
max_new_tokens=2560,
model_kwargs={"quantization_config": quantization_config},
generate_kwargs={"temperature": 0.01, "top_k": 0.95, "top_p": 0.95},
device_map="cuda:0",
)
embed_model = HuggingFaceEmbedding(
model_name="kheopss/kheops_embedding_e5_v3",
)
Settings.llm=llm
Settings.embed_model=embed_model
# Replace 'file_path.json' with the path to your JSON file
file_path = 'response_metropo_cleaned.json'
data = pd.read_json(file_path)
documents = [Document(text=row['values'],metadata={"filename": row['file_name'], "description":row['file_description']},) for index, row in data.iterrows()]
index = VectorStoreIndex.from_documents(documents, show_progress=True)
def get_retrieved_nodes(
query_str, vector_top_k=10, reranker_top_n=3, with_reranker=False
):
query_bundle = QueryBundle(query_str)
# configure retriever
phase_01_start = time.time()
retriever = VectorIndexRetriever(
index=index,
similarity_top_k=vector_top_k,
)
retrieved_nodes = retriever.retrieve(query_bundle)
phase_01_end = time.time()
print(f"Phase 01 took : {phase_01_end-phase_01_start}")
phase_02_start = time.time()
if with_reranker:
# configure reranker
reranker = LLMRerank(
choice_batch_size=5,
top_n=reranker_top_n,
)
retrieved_nodes = reranker.postprocess_nodes(
retrieved_nodes, query_bundle
)
phase_02_end = time.time()
print(f"Phase 02 took : {phase_02_end-phase_02_start}")
return retrieved_nodes
def get_all_text(new_nodes):
texts = []
for i, node in enumerate(new_nodes, 1):
texts.append(f"\nDocument {i} : {node.get_text()}")
return ' '.join(texts)
# Charger le tokenizer cl100k_base
encoding = tiktoken.get_encoding("cl100k_base")
def estimate_tokens(text):
# Encoder le texte pour obtenir les tokens
tokens = encoding.encode(text)
return len(tokens)
def process_final(user_prom,history):
import time
all_process_start = time.time()
system_p = '''
You are an AI assistant supporting public agents in Nice by helping residents and citizens access appropriate services. You will respond to user inquiries based on the information provided in the context. Your responses should be accurate, relevant, and tailored to the user’s needs, always ensuring clarity and usefulness, and don't add extra information like phone numbers and links if doesn't provided in context.
You must assume that all numbers (such as phone numbers or service numbers) and addresses provided in the context are correct and should not question their validity. If the context does not provide sufficient information, or you are uncertain about other aspects, acknowledge this and offer to guide the user towards the next steps, such as contacting a relevant service or providing general advice based on common practices.
Always respond in French and ensure your answers are conversational, helpful, and aligned with the user's query and the provided documents'''
new_nodes = get_retrieved_nodes(
user_prom,
vector_top_k=5,
reranker_top_n=3,
with_reranker=True,
)
get_texts = get_all_text(new_nodes)
print("PHASE 03 passing to LLM\n")
sys_p = f"<|im_start|>system \n{system_p}\n DOCUMENTS {get_texts}\n<|im_end|>"
prompt_f=""
total_tokens = estimate_tokens(prompt_f)
for val in reversed(history):
if val[0]:
user_p = f" <|im_start|>user \n {val[0]}\n<|im_end|>"
if val[1]:
assistant_p = f" <|im_start|>assistant \n {val[1]}\n<|im_end|>"
current_tokens = estimate_tokens(user_p+assistant_p)
# Vérifier si l'ajout de cet historique dépasse la limite
if total_tokens + current_tokens > 3000:
break # Arrêter l'ajout si on dépasse la limite
else:
# Ajouter à `prompt_f` et mettre à jour le nombre total de tokens
prompt_f = user_p + assistant_p + prompt_f
total_tokens += current_tokens
prompt_f=f"{sys_p} {prompt_f} <|im_start|>user \n{user_prom} \n<|im_end|><|im_start|>assistant \n"
phase_03_start = time.time()
gen =llm.stream_complete(formatted=True, prompt=prompt_f)
print (f"le nombre TOTAL de tokens : {total_tokens}\n")
print("_"*100)
print(prompt_f)
print("o"*100)
for response in gen:
yield response.text
description = """
Made by KHEOPS AI
"""
demo = gr.ChatInterface(
fn=process_final,
title="METROPOLE CHATBOT",
description=description,
)
demo.launch(share=True, debug =True)