|
import chromadb |
|
import os |
|
import gradio as gr |
|
import json |
|
from huggingface_hub import InferenceClient |
|
import gspread |
|
from google.oauth2 import service_account |
|
from datetime import datetime |
|
import chromadb |
|
|
|
|
|
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] |
|
key1 = os.getenv("key1") |
|
key2 = os.getenv("key2") |
|
key3 = os.getenv("key3") |
|
key4 = os.getenv("key4") |
|
key5 = os.getenv("key5") |
|
key6 = os.getenv("key6") |
|
key7 = os.getenv("key7") |
|
key8 = os.getenv("key8") |
|
key9 = os.getenv("key9") |
|
key10 = os.getenv("key10") |
|
key11 = os.getenv("key11") |
|
key12 = os.getenv("key12") |
|
key13 = os.getenv("key13") |
|
key14 = os.getenv("key14") |
|
key15 = os.getenv("key15") |
|
key16 = os.getenv("key16") |
|
key17 = os.getenv("key17") |
|
key18 = os.getenv("key18") |
|
key19 = os.getenv("key19") |
|
key20 = os.getenv("key20") |
|
key21 = os.getenv("key21") |
|
key22 = os.getenv("key22") |
|
key23 = os.getenv("key23") |
|
key24 = os.getenv("key24") |
|
key25 = os.getenv("key25") |
|
key26 = os.getenv("key26") |
|
key27 = os.getenv("key27") |
|
key28 = os.getenv("key28") |
|
pkey="-----BEGIN PRIVATE KEY-----\n"+key2+"\n"+key3+"\n"+ key4+"\n"+key5+"\n"+ key6+"\n"+key7+"\n"+key8+"\n"+key9+"\n"+key10+"\n"+key11+"\n"+key12+"\n"+key13+"\n"+key14+"\n"+key15+"\n"+key16+"\n"+key17+"\n"+key18+"\n"+key19+"\n"+key20+"\n"+key21+"\n"+key22+"\n"+key24+"\n"+key25+"\n"+key26+"\n"+key27+"\n"+key28+"\n-----END PRIVATE KEY-----\n" |
|
print(pkey) |
|
json_data={ |
|
"type": "service_account", |
|
"project_id": "nestolechatbot", |
|
"private_key_id": key1, |
|
"private_key": pkey, |
|
"client_email": key29, |
|
"client_email": key29, |
|
"client_id": key30, |
|
"auth_uri": "https://accounts.google.com/o/oauth2/auth", |
|
"token_uri": "https://oauth2.googleapis.com/token", |
|
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", |
|
"client_x509_cert_url": key31, |
|
"universe_domain": "googleapis.com" |
|
} |
|
creds = service_account.Credentials.from_service_account_info(json_data, scopes=scope) |
|
|
|
client = gspread.authorize(creds) |
|
sheet = client.open("nestolechatbot").sheet1 |
|
|
|
def save_to_sheet(date, name, message, IP, dev, header): |
|
|
|
sheet.append_row([date, name, message, IP, dev, header]) |
|
return f"Thanks {name}, your message has been saved!" |
|
|
|
path='/Users/thiloid/Desktop/LSKI/ole_nest/Chatbot/LLM/chroma' |
|
if(os.path.exists(path)==False): path="/home/user/app/chroma" |
|
|
|
print(path) |
|
client = chromadb.PersistentClient(path=path) |
|
print(client.heartbeat()) |
|
print(client.get_version()) |
|
print(client.list_collections()) |
|
from chromadb.utils import embedding_functions |
|
default_ef = embedding_functions.DefaultEmbeddingFunction() |
|
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="T-Systems-onsite/cross-en-de-roberta-sentence-transformer") |
|
|
|
collection = client.get_collection(name="chromatsc", embedding_function=sentence_transformer_ef) |
|
|
|
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") |
|
|
|
from nltk.tokenize import word_tokenize |
|
import nltk |
|
nltk.download('punkt') |
|
import pandas as pd |
|
import string |
|
import re |
|
osa= pd.read_excel("/home/user/app/OSA.xlsx", nrows=136) |
|
osa.loc[osa["Hochschule"] == "Übergreifend", "Hochschule"] = " " |
|
osa.loc[osa["OSA: Fach (original)"] == "Allgemein", "OSA: Fach (original)"] = " " |
|
|
|
|
|
osa.loc[osa["Link_Studium_Allgemein"].notna(), "Link_Studium_Allgemein"] = "Basierend auf deiner Frage empfehele ich dir diesen Interessenstest: "+ osa["Link_Studium_Allgemein"] |
|
osa.loc[osa["Link_Uni_Allgemein"].notna(), "Link_Uni_Allgemein"] = "Basierend auf deinem Interessa an der "+osa["Hochschule"]+" empfehele ich dir diesen Interessenstest: "+ osa["Link_Uni_Allgemein"] |
|
osa.loc[osa["Link_Fach"].notna(), "Link_Fach"] = "Basierend auf deinem Interesse an der " +osa["Hochschule"]+ " "+osa["OSA: Fach (original)"]+" zu studieren, empfehle ich dir diesen Interessenstest: "+ osa["Link_Fach"] |
|
|
|
osa["chattext"]= osa["Link_Studium_Allgemein"].fillna('')+ osa["Link_Uni_Allgemein"].fillna('')+osa["Link_Fach"].fillna('') |
|
|
|
|
|
osa["combi"]= osa["Hochschule"]+ " "+ osa["OSA: Fach (original)"] |
|
osalist= osa["combi"].tolist() |
|
osalist |
|
|
|
def simosa(prompt, osalist, osa): |
|
lcos = [] |
|
prompt = prompt.lower() |
|
p_list = word_tokenize(prompt) |
|
|
|
|
|
sw = [",", "?"] |
|
p_set = {w for w in p_list if not w in sw} |
|
|
|
for val in osalist: |
|
val = val.lower() |
|
v_list = word_tokenize(val) |
|
|
|
|
|
v_set = {w for w in v_list if not w in sw} |
|
|
|
|
|
rvector = p_set.union(v_set) |
|
|
|
|
|
l1 = [1 if w in p_set else 0 for w in rvector] |
|
l2 = [1 if w in v_set else 0 for w in rvector] |
|
|
|
|
|
dot_product = sum(l1[i] * l2[i] for i in range(len(rvector))) |
|
magnitude1 = sum(l1) |
|
magnitude2 = sum(l2) |
|
|
|
if magnitude1 == 0 or magnitude2 == 0: |
|
cosine = 0.0 |
|
else: |
|
cosine = dot_product / float((magnitude1 * magnitude2) ** 0.5) |
|
|
|
lcos.append(cosine) |
|
osa["testsim"]=lcos |
|
match=osa.loc[osa['testsim'].idxmax()]["testsim"] |
|
|
|
if match >0.29: |
|
answer = str(osa.loc[osa['testsim'].idxmax()]["chattext"]) |
|
else: |
|
answer= "Wenn du dir unsicher bist, was du studieren könntest oder ob deine Fähigkeiten ausreichen, dann mach doch diesen Test (https://www.was-studiere-ich.de/) oder schau dir mal diese Seminare an (https://www.bw-best.de)." |
|
print("Done simosa") |
|
return answer |
|
|
|
def parse_for_nc(text): |
|
|
|
''' |
|
Parses text for words relating to NC and Abiturnote |
|
:param text: a string |
|
:return: an automatic response in form of a string |
|
''' |
|
nc_words = [" nc ", "abischnitt", "abiturschnitt", "abinote", "abiturnote", "ncschnitt", "abschlussnote", "abschlussdurchschnitt", "abschlussnote", "zulassungsbeschränkung", "numerus clausus", "noten"] |
|
response = "Wenn du dir unsicher bist, ob du die Zulassungsvoraussetzungen zu einem Studiengang erfüllst, schau am besten einmal auf der Website der Universität nach, was gefordert ist.\n Häufig entscheidet nicht allein die Abiturnote die Zulassung, sondern auch Faktoren wie praktische Erfahrung oder ein FSJ.\n Lass dich außerdem nicht von den NCs vergangener Jahre verunsichern.\n Der NC gibt nur an, was im vergangenen Jahr die schlechteste Note des regulären Prozesses war, mit der man noch zugelassen wurde.\n Der NC kann sich also von Jahr zu Jahr verändern und oft werden auch Leute zugelassen, die einen schlechteren Schnitt ab (bspw. durch Wartesemester).\n Wenn du dir hingegen unsicher bist, ob deine Fähigkeiten mit denen des Fachs übereinstimmen,\n dann mach doch vielleicht mal einen Test. Außerdem gibt es Aufbau- und Vorbereitungskurse mittels derer du Wissen und Fähigkeiten aufbauen kannst." |
|
|
|
|
|
if text: |
|
text = text.strip() |
|
text = text.lower() |
|
text = text.translate(str.maketrans('', '', string.punctuation)) |
|
text = " "+text+" " |
|
for nc_word in nc_words: |
|
if nc_word in text: |
|
return response |
|
print("Done NC") |
|
return "No" |
|
|
|
|
|
def extract_ip_and_device(headers_obj): |
|
ip_address = None |
|
device_info = None |
|
|
|
|
|
headers = headers_obj.raw |
|
|
|
for header in headers: |
|
if len(header) != 2: |
|
print(f"Unexpected header format: {header}") |
|
continue |
|
|
|
key, value = header |
|
|
|
if key == b'x-forwarded-for': |
|
ip_address = value.decode('utf-8') |
|
elif key == b'user-agent': |
|
device_info = value.decode('utf-8') |
|
|
|
return ip_address, device_info |
|
|
|
def format_prompt(message, history): |
|
print("HISTORY") |
|
print(history) |
|
prompt = "" |
|
if history: |
|
user_prompt, bot_response = history[-1] |
|
prompt += f"[INST] {user_prompt} [/INST] {bot_response}</s> " |
|
prompt += f"[INST] {message} [/INST]" |
|
print("Final P") |
|
print(prompt) |
|
return prompt |
|
|
|
def format_promptc(message): |
|
prompt = "" |
|
prompt += f"[INST] {message} [/INST]" |
|
return prompt |
|
|
|
def responsecritical( |
|
prompt, temperature=0.9, max_new_tokens=200, top_p=0.95, repetition_penalty=1.0, |
|
): |
|
temperature = float(temperature) |
|
if temperature < 1e-2: temperature = 1e-2 |
|
top_p = float(top_p) |
|
generate_kwargs = dict( |
|
temperature=temperature, |
|
max_new_tokens=max_new_tokens, |
|
top_p=top_p, |
|
repetition_penalty=repetition_penalty, |
|
do_sample=True, |
|
seed=42, |
|
) |
|
|
|
systemc="Bitte evaluiere ob die Frage soziokulturell oder allgemein problematisch oder auch sensibel oder politisch ist. Antworte ausschließlich mit Ja wenn sie soziokulturell oder allgemein problematisch ist, ansonsten nur mit Nein. Erkläre deine Entscheidung nicht.\n\nUser-Anliegen:" |
|
formatted_promptc = format_promptc(systemc+"\n"+prompt) |
|
streamc = client.text_generation(formatted_promptc, **generate_kwargs, stream=True, details=True, return_full_text=False) |
|
outputc = "" |
|
print(streamc) |
|
for responsec in streamc: |
|
outputc += responsec.token.text |
|
sentence_lower = outputc.lower() |
|
print("Done critcial") |
|
|
|
if 'nein' in sentence_lower: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def response( |
|
request: gr.Request, prompt,history, temperature=0.9, max_new_tokens=500, top_p=0.95, repetition_penalty=1.0, |
|
): |
|
temperature = float(0.9) |
|
if temperature < 1e-2: temperature = 1e-2 |
|
top_p = float(top_p) |
|
generate_kwargs = dict( |
|
temperature=temperature, |
|
max_new_tokens=max_new_tokens, |
|
top_p=top_p, |
|
repetition_penalty=repetition_penalty, |
|
do_sample=True, |
|
seed=42, |
|
) |
|
|
|
headers = request.headers |
|
IP, dev = extract_ip_and_device(headers) |
|
|
|
if responsecritical(prompt)==False: |
|
now = str(datetime.now()) |
|
save_to_sheet(now, prompt, "Es scheint so, als sei dies keine Frage, die sich auf die Studienorientierung bezieht", IP, dev, str(headers)) |
|
yield "Es scheint so, als sei dies keine Frage, die sich auf die Studienorientierung bezieht" |
|
else: |
|
answernc=parse_for_nc(prompt) |
|
if answernc!="No": |
|
now = str(datetime.now()) |
|
save_to_sheet(now, prompt, answernc, IP, dev, str(headers)) |
|
yield answernc |
|
else: |
|
prompt = re.sub(r'\buni\b', 'Universität', prompt, flags=re.IGNORECASE) |
|
addon="" |
|
search_prompt = format_prompt(prompt, history) |
|
results=collection.query( |
|
query_texts=[search_prompt], |
|
n_results=60 |
|
) |
|
|
|
dists=["<br><small>(relevance: "+str(round((1-d)*100)/100)+";" for d in results['distances'][0]] |
|
results=results['documents'][0] |
|
combination = zip(results,dists) |
|
combination = [' '.join(triplets) for triplets in combination] |
|
if(len(results)>1): |
|
addon=" Bitte berücksichtige bei deiner Antwort ausschießlich folgende Auszüge aus unserer Datenbank, sofern sie für die Antwort erforderlich sind. Beantworte die Frage knapp und präzise. Ignoriere unpassende Datenbank-Auszüge OHNE sie zu kommentieren, zu erwähnen oder aufzulisten:\n"+"\n".join(results) |
|
system="Du bist ein deutschsprachiges KI-basiertes Studienberater Assistenzsystem, das zu jedem Anliegen möglichst geeignete Studieninformationen empfiehlt."+addon+"\n\nUser-Anliegen:" |
|
formatted_prompt = format_prompt(system+"\n"+prompt, history) |
|
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
for response in stream: |
|
output += response.token.text |
|
osaanswer=simosa(prompt, osalist, osa) |
|
output=output[:-4]+"\n"+osaanswer |
|
now = str(datetime.now()) |
|
save_to_sheet(now, prompt, output, IP, dev, str(headers)) |
|
yield output |
|
|
|
gr.ChatInterface(response, chatbot=gr.Chatbot(value=[[None,"Herzlich willkommen! Ich bin Chätti ein KI-basiertes Studienassistenzsystem, das für jede Anfrage die besten Studieninformationen empfiehlt.<br>Erzähle mir was dich interessiert! Allgemein kann ich dir diesen Test (https://www.was-studiere-ich.de/) oder diese Seminare (https://www.bw-best.de) zur Studienfindung empfehlen."]],render_markdown=True),title="German Studyhelper Chätti").queue().launch(share=True) |
|
print("Interface up and running!") |
|
|
|
|
|
|