import chromadb import os import gradio as gr import json from huggingface_hub import InferenceClient import gspread from google.oauth2 import service_account from datetime import datetime import chromadb # Google Sheets setup scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] key1 = os.getenv("key1") key2 = os.getenv("key2") key3 = os.getenv("key3") key4 = os.getenv("key4") key5 = os.getenv("key5") key6 = os.getenv("key6") key7 = os.getenv("key7") key8 = os.getenv("key8") key9 = os.getenv("key9") key10 = os.getenv("key10") key11 = os.getenv("key11") key12 = os.getenv("key12") key13 = os.getenv("key13") key14 = os.getenv("key14") key15 = os.getenv("key15") key16 = os.getenv("key16") key17 = os.getenv("key17") key18 = os.getenv("key18") key19 = os.getenv("key19") key20 = os.getenv("key20") key21 = os.getenv("key21") key22 = os.getenv("key22") key23 = os.getenv("key23") key24 = os.getenv("key24") key25 = os.getenv("key25") key26 = os.getenv("key26") key27 = os.getenv("key27") key28 = os.getenv("key28") key29 = os.getenv("key29") key30 = os.getenv("key30") key31 = os.getenv("key31") pkey="-----BEGIN PRIVATE KEY-----\n"+key2+"\n"+key3+"\n"+ key4+"\n"+key5+"\n"+ key6+"\n"+key7+"\n"+key8+"\n"+key9+"\n"+key10+"\n"+key11+"\n"+key12+"\n"+key13+"\n"+key14+"\n"+key15+"\n"+key16+"\n"+key17+"\n"+key18+"\n"+key19+"\n"+key20+"\n"+key21+"\n"+key22+"\n"+key24+"\n"+key25+"\n"+key26+"\n"+key27+"\n"+key28+"\n-----END PRIVATE KEY-----\n" json_data={ "type": "service_account", "project_id": "nestolechatbot", "private_key_id": key1, "private_key": pkey, "client_email": key29, "client_id": key30, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": key31, "universe_domain": "googleapis.com" } creds = service_account.Credentials.from_service_account_info(json_data, scopes=scope) client = gspread.authorize(creds) sheet = client.open("nestolechatbot").sheet1 # Open the sheet def save_to_sheet(date, name, message, IP, dev, header): # Write user input to the Google Sheet sheet.append_row([date, name, message, IP, dev, header]) return f"Thanks {name}, your message has been saved!" path='/Users/thiloid/Desktop/LSKI/ole_nest/Chatbot/LLM/chroma' if(os.path.exists(path)==False): path="/home/user/app/chroma" print(path) client = chromadb.PersistentClient(path=path) print(client.heartbeat()) print(client.get_version()) print(client.list_collections()) from chromadb.utils import embedding_functions default_ef = embedding_functions.DefaultEmbeddingFunction() sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="T-Systems-onsite/cross-en-de-roberta-sentence-transformer")#"VAGOsolutions/SauerkrautLM-Mixtral-8x7B-Instruct") collection = client.get_collection(name="chromatsc", embedding_function=sentence_transformer_ef) client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") from nltk.tokenize import word_tokenize import nltk nltk.download('punkt') nltk.download('punkt_tab') import pandas as pd import string import re osa= pd.read_excel("/home/user/app/OSA.xlsx", nrows=136) osa.loc[osa["Hochschule"] == "Übergreifend", "Hochschule"] = " " osa.loc[osa["OSA: Fach (original)"] == "Allgemein", "OSA: Fach (original)"] = " " # text that will be added to chatbot answer osa.loc[osa["Link_Studium_Allgemein"].notna(), "Link_Studium_Allgemein"] = "Basierend auf deiner Frage empfehele ich dir diesen Interessenstest: "+ osa["Link_Studium_Allgemein"] osa.loc[osa["Link_Uni_Allgemein"].notna(), "Link_Uni_Allgemein"] = "Basierend auf deinem Interesse an der "+osa["Hochschule"]+" empfehele ich dir diesen Interessenstest: "+ osa["Link_Uni_Allgemein"] osa.loc[osa["Link_Fach"].notna(), "Link_Fach"] = "Basierend auf deinem Interesse an der " +osa["Hochschule"]+ " "+osa["OSA: Fach (original)"]+" zu studieren, empfehle ich dir diesen Interessenstest: "+ osa["Link_Fach"] osa["chattext"]= osa["Link_Studium_Allgemein"].fillna('')+ osa["Link_Uni_Allgemein"].fillna('')+osa["Link_Fach"].fillna('') # Text to compare with user prompt osa["combi"]= osa["Hochschule"]+ " "+ osa["OSA: Fach (original)"] osalist= osa["combi"].tolist() osalist def simosa(prompt, osalist, osa): lcos = [] prompt = prompt.lower() p_list = word_tokenize(prompt) # Form a set containing keywords of the prompt sw = [",", "?"] p_set = {w for w in p_list if not w in sw} for val in osalist: val = val.lower() v_list = word_tokenize(val) # Form a set containing keywords of the current value v_set = {w for w in v_list if not w in sw} # Union of both sets rvector = p_set.union(v_set) # Create vectors l1 = [1 if w in p_set else 0 for w in rvector] l2 = [1 if w in v_set else 0 for w in rvector] # Compute cosine similarity dot_product = sum(l1[i] * l2[i] for i in range(len(rvector))) magnitude1 = sum(l1) magnitude2 = sum(l2) if magnitude1 == 0 or magnitude2 == 0: cosine = 0.0 else: cosine = dot_product / float((magnitude1 * magnitude2) ** 0.5) lcos.append(cosine) osa["testsim"]=lcos match=osa.loc[osa['testsim'].idxmax()]["testsim"] #print(match) if match >0.29: answer = str(osa.loc[osa['testsim'].idxmax()]["chattext"]) else: answer= "Wenn du dir unsicher bist, was du studieren könntest oder ob deine Fähigkeiten ausreichen, dann mach doch diesen Test (https://www.was-studiere-ich.de/) oder schau dir mal diese Seminare an (https://www.bw-best.de)." print("Done simosa") return answer def parse_for_nc(text): ''' Parses text for words relating to NC and Abiturnote :param text: a string :return: an automatic response in form of a string ''' nc_words = [" nc ", "abischnitt", "abiturschnitt", "abinote", "abiturnote", "ncschnitt", "abschlussnote", "abschlussdurchschnitt", "abschlussnote", "zulassungsbeschränkung", "numerus clausus", "noten"] response = "Wenn du dir unsicher bist, ob du die Zulassungsvoraussetzungen zu einem Studiengang erfüllst, schau am besten einmal auf der Website der Universität nach, was gefordert ist.\n Häufig entscheidet nicht allein die Abiturnote die Zulassung, sondern auch Faktoren wie praktische Erfahrung oder ein FSJ.\n Lass dich außerdem nicht von den NCs vergangener Jahre verunsichern.\n Der NC gibt nur an, was im vergangenen Jahr die schlechteste Note des regulären Prozesses war, mit der man noch zugelassen wurde.\n Der NC kann sich also von Jahr zu Jahr verändern und oft werden auch Leute zugelassen, die einen schlechteren Schnitt ab (bspw. durch Wartesemester).\n Wenn du dir hingegen unsicher bist, ob deine Fähigkeiten mit denen des Fachs übereinstimmen,\n dann mach doch vielleicht mal einen Test. Außerdem gibt es Aufbau- und Vorbereitungskurse mittels derer du Wissen und Fähigkeiten aufbauen kannst." # if the string is not empty if text: text = text.strip() # strip text = text.lower() # lower all letters text = text.translate(str.maketrans('', '', string.punctuation)) # remove punctuation text = " "+text+" " # add whitespaces so that nc can be found correctly for nc_word in nc_words: if nc_word in text: return response print("Done NC") return "No" def extract_ip_and_device(headers_obj): ip_address = None device_info = None # Access the raw headers list headers = headers_obj.raw for header in headers: if len(header) != 2: print(f"Unexpected header format: {header}") continue key, value = header if key == b'x-forwarded-for': ip_address = value.decode('utf-8') elif key == b'user-agent': device_info = value.decode('utf-8') return ip_address, device_info def format_prompt(message, history): prompt = "" if history: user_prompt, bot_response = history[-1] prompt += f"[INST] {user_prompt} [/INST] {bot_response} " prompt += f"[INST] {message} [/INST]" return prompt def format_promptc(message): prompt = "" #"" prompt += f"[INST] {message} [/INST]" return prompt def responsecritical( prompt, temperature=0.9, max_new_tokens=10, top_p=0.95, repetition_penalty=1.0, ): temperature = float(temperature) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=42, ) systemc="Bitte evaluiere ob die Frage soziokulturell oder allgemein problematisch oder auch sensibel oder politisch ist. Antworte ausschließlich mit Ja wenn sie soziokulturell oder allgemein problematisch ist, ansonsten nur mit Nein. Erkläre deine Entscheidung nicht.\n\nUser-Anliegen:" formatted_promptc = format_promptc(systemc+"\n"+prompt) streamc = client.text_generation(formatted_promptc, **generate_kwargs, stream=True, details=True, return_full_text=False) outputc = "" print(streamc) for responsec in streamc: outputc += responsec.token.text print("CRITICAL") print(outputc) sentence_lower = outputc.lower() print("Done critcial") # Check if the word 'nein' is in the sentence if 'ja' in sentence_lower: return False else: return True def get_value_after_question_mark(url): url=str(url) # Find the position of the question mark question_mark_index = url.find('?') # Extract the part after the question mark if question_mark_index != -1: value_after_question_mark = url[question_mark_index + 1:].split('=')[0] else: value_after_question_mark = "" return value_after_question_mark def response( request: gr.Request, prompt,history, temperature=0.9, max_new_tokens=500, top_p=0.95, repetition_penalty=1.0, ): temperature = float(0.9) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=42, ) #get ID full_url = request.url IDval= get_value_after_question_mark(full_url) print(full_url) headers = request.headers IP, dev = extract_ip_and_device(headers) if responsecritical(prompt)==False: now = str(datetime.now()) save_to_sheet(now, prompt, "Es scheint so, als sei dies keine Frage, die sich auf die Studienorientierung bezieht", IP, dev, str(headers)) yield "Es scheint so, als sei dies keine Frage, die sich auf die Studienorientierung bezieht" else: answernc=parse_for_nc(prompt) if answernc!="No": now = str(datetime.now()) save_to_sheet(now, prompt, answernc, IP, dev, str(headers)) yield answernc else: prompt = re.sub(r'\buni\b', 'Universität', prompt, flags=re.IGNORECASE) addon="" search_prompt = format_prompt(prompt, history) results=collection.query( query_texts=[search_prompt], n_results=60 ) dists=["
(relevance: "+str(round((1-d)*100)/100)+";" for d in results['distances'][0]] results=results['documents'][0] combination = zip(results,dists) combination = [' '.join(triplets) for triplets in combination] if(len(results)>1): addon=" Bitte berücksichtige bei deiner Antwort ausschießlich folgende Auszüge aus unserer Datenbank, sofern sie für die Antwort erforderlich sind. Beantworte die Frage knapp und präzise. Ignoriere unpassende Datenbank-Auszüge OHNE sie zu kommentieren, zu erwähnen oder aufzulisten:\n"+"\n".join(results) system="Du bist ein deutschsprachiges KI-basiertes Studienberater Assistenzsystem, das zu jedem Anliegen möglichst geeignete Studieninformationen empfiehlt."+addon+"\n\nUser-Anliegen:" formatted_prompt = format_prompt(system+"\n"+prompt, history) stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) output = "" for response in stream: output += response.token.text print(output) osaanswer=simosa(prompt, osalist, osa) output=output[:-4]+"\n"+osaanswer now = str(datetime.now()) save_to_sheet(now, prompt, output, IP, dev, str(headers)) yield output gr.ChatInterface(response, chatbot=gr.Chatbot(value=[[None,"Herzlich willkommen! Ich bin Chätti ein KI-basiertes Studienassistenzsystem, das für jede Anfrage die besten Studieninformationen empfiehlt.
Erzähle mir was dich interessiert! Allgemein kann ich dir diesen Test (https://www.was-studiere-ich.de/) oder diese Seminare (https://www.bw-best.de) zur Studienfindung empfehlen."]],render_markdown=True),title="German Studyhelper Chätti").queue().launch(share=True) #False, server_name="0.0.0.0", server_port=7864) print("Interface up and running!")