Chaettim2 / run.py
nestole's picture
Update run.py
b384d54 verified
raw
history blame
13.6 kB
import chromadb
import os
import gradio as gr
import json
from huggingface_hub import InferenceClient
import gspread
from google.oauth2 import service_account
from datetime import datetime
import chromadb
# Google Sheets setup
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]
key1 = os.getenv("key1")
key2 = os.getenv("key2")
key3 = os.getenv("key3")
key4 = os.getenv("key4")
key5 = os.getenv("key5")
key6 = os.getenv("key6")
key7 = os.getenv("key7")
key8 = os.getenv("key8")
key9 = os.getenv("key9")
key10 = os.getenv("key10")
key11 = os.getenv("key11")
key12 = os.getenv("key12")
key13 = os.getenv("key13")
key14 = os.getenv("key14")
key15 = os.getenv("key15")
key16 = os.getenv("key16")
key17 = os.getenv("key17")
key18 = os.getenv("key18")
key19 = os.getenv("key19")
key20 = os.getenv("key20")
key21 = os.getenv("key21")
key22 = os.getenv("key22")
key23 = os.getenv("key23")
key24 = os.getenv("key24")
key25 = os.getenv("key25")
key26 = os.getenv("key26")
key27 = os.getenv("key27")
key28 = os.getenv("key28")
key29 = os.getenv("key29")
key30 = os.getenv("key30")
key31 = os.getenv("key31")
pkey="-----BEGIN PRIVATE KEY-----\n"+key2+"\n"+key3+"\n"+ key4+"\n"+key5+"\n"+ key6+"\n"+key7+"\n"+key8+"\n"+key9+"\n"+key10+"\n"+key11+"\n"+key12+"\n"+key13+"\n"+key14+"\n"+key15+"\n"+key16+"\n"+key17+"\n"+key18+"\n"+key19+"\n"+key20+"\n"+key21+"\n"+key22+"\n"+key24+"\n"+key25+"\n"+key26+"\n"+key27+"\n"+key28+"\n-----END PRIVATE KEY-----\n"
json_data={
"type": "service_account",
"project_id": "nestolechatbot",
"private_key_id": key1,
"private_key": pkey,
"client_email": key29,
"client_id": key30,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": key31,
"universe_domain": "googleapis.com"
}
creds = service_account.Credentials.from_service_account_info(json_data, scopes=scope)
client = gspread.authorize(creds)
sheet = client.open("nestolechatbot").sheet1 # Open the sheet
def save_to_sheet(date, name, message, IP, dev, header):
# Write user input to the Google Sheet
sheet.append_row([date, name, message, IP, dev, header])
return f"Thanks {name}, your message has been saved!"
path='/Users/thiloid/Desktop/LSKI/ole_nest/Chatbot/LLM/chroma'
if(os.path.exists(path)==False): path="/home/user/app/chroma"
print(path)
client = chromadb.PersistentClient(path=path)
print(client.heartbeat())
print(client.get_version())
print(client.list_collections())
from chromadb.utils import embedding_functions
default_ef = embedding_functions.DefaultEmbeddingFunction()
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="T-Systems-onsite/cross-en-de-roberta-sentence-transformer")#"VAGOsolutions/SauerkrautLM-Mixtral-8x7B-Instruct")
collection = client.get_collection(name="chromatsc", embedding_function=sentence_transformer_ef)
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1")
from nltk.tokenize import word_tokenize
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')
import pandas as pd
import string
import re
osa= pd.read_excel("/home/user/app/OSA.xlsx", nrows=136)
osa.loc[osa["Hochschule"] == "Übergreifend", "Hochschule"] = " "
osa.loc[osa["OSA: Fach (original)"] == "Allgemein", "OSA: Fach (original)"] = " "
# text that will be added to chatbot answer
osa.loc[osa["Link_Studium_Allgemein"].notna(), "Link_Studium_Allgemein"] = "Basierend auf deiner Frage empfehele ich dir diesen Interessenstest: "+ osa["Link_Studium_Allgemein"]
osa.loc[osa["Link_Uni_Allgemein"].notna(), "Link_Uni_Allgemein"] = "Basierend auf deinem Interesse an der "+osa["Hochschule"]+" empfehele ich dir diesen Interessenstest: "+ osa["Link_Uni_Allgemein"]
osa.loc[osa["Link_Fach"].notna(), "Link_Fach"] = "Basierend auf deinem Interesse an der " +osa["Hochschule"]+ " "+osa["OSA: Fach (original)"]+" zu studieren, empfehle ich dir diesen Interessenstest: "+ osa["Link_Fach"]
osa["chattext"]= osa["Link_Studium_Allgemein"].fillna('')+ osa["Link_Uni_Allgemein"].fillna('')+osa["Link_Fach"].fillna('')
# Text to compare with user prompt
osa["combi"]= osa["Hochschule"]+ " "+ osa["OSA: Fach (original)"]
osalist= osa["combi"].tolist()
osalist
def simosa(prompt, osalist, osa):
lcos = []
prompt = prompt.lower()
p_list = word_tokenize(prompt)
# Form a set containing keywords of the prompt
sw = [",", "?"]
p_set = {w for w in p_list if not w in sw}
for val in osalist:
val = val.lower()
v_list = word_tokenize(val)
# Form a set containing keywords of the current value
v_set = {w for w in v_list if not w in sw}
# Union of both sets
rvector = p_set.union(v_set)
# Create vectors
l1 = [1 if w in p_set else 0 for w in rvector]
l2 = [1 if w in v_set else 0 for w in rvector]
# Compute cosine similarity
dot_product = sum(l1[i] * l2[i] for i in range(len(rvector)))
magnitude1 = sum(l1)
magnitude2 = sum(l2)
if magnitude1 == 0 or magnitude2 == 0:
cosine = 0.0
else:
cosine = dot_product / float((magnitude1 * magnitude2) ** 0.5)
lcos.append(cosine)
osa["testsim"]=lcos
match=osa.loc[osa['testsim'].idxmax()]["testsim"]
#print(match)
if match >0.29:
answer = str(osa.loc[osa['testsim'].idxmax()]["chattext"])
else:
answer= "Wenn du dir unsicher bist, was du studieren könntest oder ob deine Fähigkeiten ausreichen, dann mach doch diesen Test (https://www.was-studiere-ich.de/) oder schau dir mal diese Seminare an (https://www.bw-best.de)."
print("Done simosa")
return answer
def parse_for_nc(text):
'''
Parses text for words relating to NC and Abiturnote
:param text: a string
:return: an automatic response in form of a string
'''
nc_words = [" nc ", "abischnitt", "abiturschnitt", "abinote", "abiturnote", "ncschnitt", "abschlussnote", "abschlussdurchschnitt", "abschlussnote", "zulassungsbeschränkung", "numerus clausus", "noten"]
response = "Wenn du dir unsicher bist, ob du die Zulassungsvoraussetzungen zu einem Studiengang erfüllst, schau am besten einmal auf der Website der Universität nach, was gefordert ist.\n Häufig entscheidet nicht allein die Abiturnote die Zulassung, sondern auch Faktoren wie praktische Erfahrung oder ein FSJ.\n Lass dich außerdem nicht von den NCs vergangener Jahre verunsichern.\n Der NC gibt nur an, was im vergangenen Jahr die schlechteste Note des regulären Prozesses war, mit der man noch zugelassen wurde.\n Der NC kann sich also von Jahr zu Jahr verändern und oft werden auch Leute zugelassen, die einen schlechteren Schnitt ab (bspw. durch Wartesemester).\n Wenn du dir hingegen unsicher bist, ob deine Fähigkeiten mit denen des Fachs übereinstimmen,\n dann mach doch vielleicht mal einen Test. Außerdem gibt es Aufbau- und Vorbereitungskurse mittels derer du Wissen und Fähigkeiten aufbauen kannst."
# if the string is not empty
if text:
text = text.strip() # strip
text = text.lower() # lower all letters
text = text.translate(str.maketrans('', '', string.punctuation)) # remove punctuation
text = " "+text+" " # add whitespaces so that nc can be found correctly
for nc_word in nc_words:
if nc_word in text:
return response
print("Done NC")
return "No"
def extract_ip_and_device(headers_obj):
ip_address = None
device_info = None
# Access the raw headers list
headers = headers_obj.raw
for header in headers:
if len(header) != 2:
print(f"Unexpected header format: {header}")
continue
key, value = header
if key == b'x-forwarded-for':
ip_address = value.decode('utf-8')
elif key == b'user-agent':
device_info = value.decode('utf-8')
return ip_address, device_info
def format_prompt(message, history):
prompt = ""
if history:
user_prompt, bot_response = history[-1]
prompt += f"[INST] {user_prompt} [/INST] {bot_response}</s> "
prompt += f"[INST] {message} [/INST]"
return prompt
def format_promptc(message):
prompt = "" #"<s>"
prompt += f"[INST] {message} [/INST]"
return prompt
def responsecritical(
prompt, temperature=0.9, max_new_tokens=10, top_p=0.95, repetition_penalty=1.0,
):
temperature = float(temperature)
if temperature < 1e-2: temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=42,
)
systemc="Bitte evaluiere ob die Frage soziokulturell oder allgemein problematisch oder auch sensibel oder politisch ist. Antworte ausschließlich mit Ja wenn sie soziokulturell oder allgemein problematisch ist, ansonsten nur mit Nein. Erkläre deine Entscheidung nicht.\n\nUser-Anliegen:"
formatted_promptc = format_promptc(systemc+"\n"+prompt)
streamc = client.text_generation(formatted_promptc, **generate_kwargs, stream=True, details=True, return_full_text=False)
outputc = ""
print(streamc)
for responsec in streamc:
outputc += responsec.token.text
print("CRITICAL")
print(outputc)
sentence_lower = outputc.lower()
print("Done critcial")
# Check if the word 'nein' is in the sentence
if 'ja' in sentence_lower:
return False
else:
return True
def get_value_after_question_mark(url):
url=str(url)
# Find the position of the question mark
question_mark_index = url.find('?')
# Extract the part after the question mark
if question_mark_index != -1:
value_after_question_mark = url[question_mark_index + 1:].split('=')[0]
else:
value_after_question_mark = ""
return value_after_question_mark
def response(
request: gr.Request, prompt,history, temperature=0.9, max_new_tokens=500, top_p=0.95, repetition_penalty=1.0,
):
temperature = float(0.9)
if temperature < 1e-2: temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=42,
)
#get ID
full_url = request.url
IDval= get_value_after_question_mark(full_url)
print(full_url)
headers = request.headers
IP, dev = extract_ip_and_device(headers)
if responsecritical(prompt)==False:
now = str(datetime.now())
save_to_sheet(now, prompt, "Es scheint so, als sei dies keine Frage, die sich auf die Studienorientierung bezieht", IP, dev, str(headers))
yield "Es scheint so, als sei dies keine Frage, die sich auf die Studienorientierung bezieht"
else:
answernc=parse_for_nc(prompt)
if answernc!="No":
now = str(datetime.now())
save_to_sheet(now, prompt, answernc, IP, dev, str(headers))
yield answernc
else:
prompt = re.sub(r'\buni\b', 'Universität', prompt, flags=re.IGNORECASE)
addon=""
search_prompt = format_prompt(prompt, history)
results=collection.query(
query_texts=[search_prompt],
n_results=60
)
dists=["<br><small>(relevance: "+str(round((1-d)*100)/100)+";" for d in results['distances'][0]]
results=results['documents'][0]
combination = zip(results,dists)
combination = [' '.join(triplets) for triplets in combination]
if(len(results)>1):
addon=" Bitte berücksichtige bei deiner Antwort ausschießlich folgende Auszüge aus unserer Datenbank, sofern sie für die Antwort erforderlich sind. Beantworte die Frage knapp und präzise. Ignoriere unpassende Datenbank-Auszüge OHNE sie zu kommentieren, zu erwähnen oder aufzulisten:\n"+"\n".join(results)
system="Du bist ein deutschsprachiges KI-basiertes Studienberater Assistenzsystem, das zu jedem Anliegen möglichst geeignete Studieninformationen empfiehlt."+addon+"\n\nUser-Anliegen:"
formatted_prompt = format_prompt(system+"\n"+prompt, history)
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
print(output)
osaanswer=simosa(prompt, osalist, osa)
output=output[:-4]+"\n"+osaanswer
now = str(datetime.now())
save_to_sheet(now, prompt, output, IP, dev, str(headers))
yield output
gr.ChatInterface(response, chatbot=gr.Chatbot(value=[[None,"Herzlich willkommen! Ich bin Chätti ein KI-basiertes Studienassistenzsystem, das für jede Anfrage die besten Studieninformationen empfiehlt.<br>Erzähle mir was dich interessiert! Allgemein kann ich dir diesen Test (https://www.was-studiere-ich.de/) oder diese Seminare (https://www.bw-best.de) zur Studienfindung empfehlen."]],render_markdown=True),title="German Studyhelper Chätti").queue().launch(share=True) #False, server_name="0.0.0.0", server_port=7864)
print("Interface up and running!")