|
import os |
|
import openai |
|
import sys |
|
import re |
|
|
|
import gradio as gr |
|
from IPython import get_ipython |
|
import json |
|
import requests |
|
from tenacity import retry, wait_random_exponential, stop_after_attempt |
|
from IPython import get_ipython |
|
|
|
import ast |
|
|
|
GPT_MODEL = "gpt-3.5-turbo-1106" |
|
|
|
openai.api_key = os.environ['OPENAI_API_KEY'] |
|
|
|
def exec_python(cell): |
|
|
|
print(cell) |
|
|
|
|
|
|
|
|
|
inputcode = cell |
|
print(inputcode) |
|
code = inputcode |
|
|
|
local_namespace = {} |
|
try: |
|
exec(code, globals(), local_namespace) |
|
except Exception as e: |
|
return "tidakada" |
|
print(local_namespace) |
|
if not local_namespace: |
|
return "tidakada" |
|
else: |
|
theanswers = local_namespace.values() |
|
print(theanswers) |
|
local_ans = list(theanswers)[-1] |
|
print(local_ans) |
|
return local_ans |
|
|
|
|
|
functions = [ |
|
{ |
|
"name": "exec_python", |
|
"description": "run python code and return the execution result.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"cell": { |
|
"type": "string", |
|
"description": "Valid Python code to execute.", |
|
} |
|
}, |
|
"required": ["cell"], |
|
}, |
|
}, |
|
] |
|
|
|
|
|
functions_dict = { |
|
"exec_python": exec_python, |
|
} |
|
|
|
def openai_api_calculate_cost(usage,model): |
|
pricing = { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
'gpt-3.5-turbo-1106': { |
|
'prompt': 0.001, |
|
'completion': 0.002, |
|
}, |
|
'gpt-4-1106-preview': { |
|
'prompt': 0.01, |
|
'completion': 0.03, |
|
}, |
|
'gpt-4': { |
|
'prompt': 0.03, |
|
'completion': 0.06, |
|
}, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
try: |
|
model_pricing = pricing[model] |
|
except KeyError: |
|
raise ValueError("Invalid model specified") |
|
|
|
prompt_cost = usage['prompt_tokens'] * model_pricing['prompt'] / 1000 |
|
completion_cost = usage['completion_tokens'] * model_pricing['completion'] / 1000 |
|
|
|
total_cost = prompt_cost + completion_cost |
|
print(f"\nTokens used: {usage['prompt_tokens']:,} prompt + {usage['completion_tokens']:,} completion = {usage['total_tokens']:,} tokens") |
|
print(f"Total cost for {model}: ${total_cost:.4f}\n") |
|
|
|
return total_cost |
|
|
|
|
|
@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3)) |
|
def chat_completion_request(messages, model, functions=None, function_call=None, temperature=0.2, top_p=0.1): |
|
""" |
|
This function sends a POST request to the OpenAI API to generate a chat completion. |
|
Parameters: |
|
- messages (list): A list of message objects. Each object should have a 'role' (either 'system', 'user', or 'assistant') and 'content' |
|
(the content of the message). |
|
- functions (list, optional): A list of function objects that describe the functions that the model can call. |
|
- function_call (str or dict, optional): If it's a string, it can be either 'auto' (the model decides whether to call a function) or 'none' |
|
(the model will not call a function). If it's a dict, it should describe the function to call. |
|
- model (str): The ID of the model to use. |
|
Returns: |
|
- response (requests.Response): The response from the OpenAI API. If the request was successful, the response's JSON will contain the chat completion. |
|
""" |
|
|
|
|
|
headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": "Bearer " + openai.api_key, |
|
} |
|
|
|
|
|
|
|
|
|
json_data = {"model": model, "messages": messages, "temperature": temperature, "top_p":top_p} |
|
|
|
|
|
if functions is not None: |
|
json_data.update({"functions": functions}) |
|
|
|
|
|
if function_call is not None: |
|
json_data.update({"function_call": function_call}) |
|
|
|
print(json_data) |
|
|
|
|
|
try: |
|
response = requests.post( |
|
"https://api.openai.com/v1/chat/completions", |
|
headers=headers, |
|
json=json_data, |
|
) |
|
return response |
|
except Exception as e: |
|
print("Unable to generate ChatCompletion response") |
|
print(f"Exception: {e}") |
|
return e |
|
|
|
def first_call(init_prompt, user_input, input_temperature, input_top_p, model_dropdown_1): |
|
|
|
messages = [] |
|
messages.append({"role": "system", "content": init_prompt}) |
|
|
|
|
|
messages.append({"role": "user", "content": user_input}) |
|
|
|
|
|
chat_response = chat_completion_request( |
|
messages, model_dropdown_1, functions=functions, function_call='auto', temperature=float(input_temperature), top_p=float(input_top_p) |
|
) |
|
|
|
|
|
|
|
|
|
assistant_message = chat_response.json()["choices"][0]["message"] |
|
|
|
|
|
messages.append(assistant_message) |
|
|
|
usage = chat_response.json()['usage'] |
|
cost1 = openai_api_calculate_cost(usage,model_dropdown_1) |
|
|
|
finish_response_status = chat_response.json()["choices"][0]["finish_reason"] |
|
|
|
return assistant_message, cost1, messages, finish_response_status |
|
|
|
def is_valid_dict_string(s): |
|
try: |
|
ast.literal_eval(s) |
|
return True |
|
except (SyntaxError, ValueError): |
|
return False |
|
|
|
def function_call_process(assistant_message): |
|
if assistant_message.get("function_call") != None: |
|
|
|
|
|
function_name = assistant_message["function_call"]["name"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_valid_dict_string(assistant_message["function_call"]["arguments"])==True: |
|
arg_dict = json.loads(r"{jsonload}".format(jsonload=assistant_message["function_call"]["arguments"]), strict=False) |
|
arg_dict = arg_dict['cell'] |
|
print("arg_dict : " + arg_dict) |
|
else: |
|
arg_dict = assistant_message["function_call"]["arguments"] |
|
print(arg_dict) |
|
|
|
|
|
result = functions_dict[function_name](arg_dict) |
|
return result |
|
|
|
|
|
def custom_format(match): |
|
|
|
return "{" + match.group(1) + "}" if match.group(1) == "ans" else "{{" + match.group(1) + "}}" |
|
|
|
def second_prompt_build(prompt, log, prompt_cadangan): |
|
pattern = re.compile(r'\{([^}]*)\}') |
|
prompt_sub = pattern.sub(custom_format, prompt) |
|
if log == "tidakada": |
|
if "{ans}" in prompt: |
|
prompt_second = prompt_sub.format(ans = prompt_cadangan) |
|
else: |
|
prompt_second = prompt_cadangan |
|
else: |
|
prompt_second = prompt_sub.format(ans=log) |
|
return prompt_second |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def second_call(prompt, prompt_second, messages, input_temperature_2, input_top_p_2, model_dropdown_2, function_name = "exec_python"): |
|
|
|
messages.append({ |
|
"role": "function", |
|
"name": function_name, |
|
"content": str(prompt_second), |
|
}) |
|
|
|
|
|
chat_response = chat_completion_request( |
|
messages, model_dropdown_2, functions=functions, temperature=float(input_temperature_2), top_p=float(input_top_p_2) |
|
) |
|
print("second call : "+ str(chat_response.json())) |
|
assistant_message = chat_response.json()["choices"][0]["message"] |
|
messages.append(assistant_message) |
|
|
|
usage = chat_response.json()['usage'] |
|
cost2 = openai_api_calculate_cost(usage,model_dropdown_2) |
|
|
|
|
|
|
|
finish_response_status_2 = chat_response.json()["choices"][0]["finish_reason"] |
|
print("finish_response_status_2 : " + finish_response_status_2) |
|
return assistant_message, cost2, messages, finish_response_status_2 |
|
|
|
def format_math_in_sentence(sentence): |
|
|
|
math_pattern = re.compile(r'\\[a-zA-Z]+\{[^\}]+\}|\\frac\{[^\}]+\}\{[^\}]+\}') |
|
|
|
|
|
math_matches = re.findall(math_pattern, sentence) |
|
|
|
|
|
for math_match in math_matches: |
|
markdown_math = f"${math_match}$" |
|
sentence = sentence.replace(math_match, markdown_math) |
|
|
|
return sentence |
|
|
|
def format_mathjax_equation(input_str): |
|
|
|
formatted_str = re.sub(r'\\\(.*?\\\)', lambda x: f"${x.group(0)[2:-2]}$", input_str) |
|
|
|
|
|
formatted_str = re.sub(r'\\\[.*?\\\]', lambda x: f"$$\n{x.group(0)[2:-2]}\n$$", formatted_str) |
|
|
|
|
|
formatted_str = re.sub(r'\\frac\{(.*?)\}\{(.*?)\}', r'\\frac{\1}{\2}', formatted_str) |
|
|
|
|
|
formatted_str = re.sub(r'\\sqrt\{(.*?)\}', r'\\sqrt{\1}', formatted_str) |
|
formatted_str = re.sub(r'\\sum\{(.*?)\}', r'\\sum{\1}', formatted_str) |
|
formatted_str = re.sub(r'\\int\{(.*?)\}', r'\\int{\1}', formatted_str) |
|
|
|
|
|
return formatted_str |
|
|
|
def main_function(init_prompt, prompt, prompt_cadangan, user_input,input_temperature_1, input_top_p_1, input_temperature_2, input_top_p_2, model_dropdown_1, model_dropdown_2): |
|
first_call_result, cost1, messages, finish_response_status = first_call(init_prompt, user_input, input_temperature_1, input_top_p_1, model_dropdown_1) |
|
print("finish_response_status "+finish_response_status) |
|
print(messages) |
|
cost_list=[] |
|
if finish_response_status == 'stop': |
|
function_call_process_result = "Tidak dipanggil" |
|
second_prompt_build_result = "Tidak dipanggil" |
|
second_call_result = {'status':'Tidak dipanggil'} |
|
cost2 = 0 |
|
finalmessages = messages |
|
finalcostresult = cost1 |
|
cost_list.append(cost1) |
|
else: |
|
function_call_process_result = function_call_process(first_call_result) |
|
second_prompt_build_result = second_prompt_build(prompt, function_call_process_result, prompt_cadangan) |
|
second_call_result, cost2, finalmessages, finish_response_status_2 = second_call(function_call_process_result, second_prompt_build_result, messages, input_temperature_2, input_top_p_2, model_dropdown_2) |
|
cost_list.append(cost1) |
|
cost_list.append(cost2) |
|
finalcostresult = cost1 + cost2 |
|
while finish_response_status_2 != 'stop': |
|
function_call_process_result = function_call_process(second_call_result) |
|
second_prompt_build_result = second_prompt_build(prompt, function_call_process_result, prompt_cadangan) |
|
second_call_result, cost2, finalmessages, finish_response_status_2 = second_call(function_call_process_result, second_prompt_build_result, messages, input_temperature_2, input_top_p_2, model_dropdown_2) |
|
finalcostresult += cost2 |
|
cost_list.append(cost2) |
|
finalcostrpresult = finalcostresult * 15000 |
|
cost_dict = {f"Cost {i+1}": price for i, price in enumerate(cost_list)} |
|
cost_dict_rp = {f"Cost {i+1}": price*15000 for i, price in enumerate(cost_list)} |
|
veryfinaloutput = format_mathjax_equation(str(finalmessages[-1].get("content", ""))) |
|
return first_call_result, function_call_process_result, second_prompt_build_result, second_call_result, cost1, cost2, finalmessages, finalcostresult, finalcostrpresult, cost_dict, cost_dict_rp, veryfinaloutput |
|
|
|
def gradio_function(): |
|
init_prompt = gr.Textbox(label="init_prompt (for 1st call)",value="""You are a genius math tutor, Python code expert, and a helpful assistant. |
|
If this is a math subject or topic, you must create a code how to solve it accurately! The code must be worked and written in Python string and please use the easiest and the most correct way step by step to solve it! |
|
After that, show me the evaluated and verified answer correctly once! This is very important to your career, so please code it perfectly, answer correctly and the answer must be already verified! |
|
This problem is:""") |
|
prompt = gr.Textbox(label="prompt (for 2nd call)",value="""Here's the answer: {ans}. You must solve the problem step by step correctly and accurately! |
|
The problem: |
|
Rara memiliki tali sepanjang 3/5 meter. Lalu Dina menambahkan tali Rara sepanjang 3/6 meter. Total panjang tali yang dimiliki Rara menjadi ... meter |
|
A. 33/30 |
|
B. 34/30 |
|
C. 35/30 |
|
D. 36/30 |
|
The answer: {ans} |
|
You must solve the problem by explain it step by step, then if it doesn't match with the answer, please solve it by yourself!""") |
|
prompt_cadangan = gr.Textbox(label="Prompt Cadangan",value="""Please figure and solve it step by step with explanation by yourself. Remember, you must give the correct answer!""") |
|
user_input = gr.Textbox(label="User Input",value="""Rara memiliki tali sepanjang 3/5 meter. Lalu Dina menambahkan tali Rara sepanjang 3/6 meter. Total panjang tali yang dimiliki Rara menjadi ... meter |
|
A. 33/30 |
|
B. 34/30 |
|
C. 35/30 |
|
D. 36/30""") |
|
input_temperature_1 = gr.Textbox(label="temperature_1", value=0.2) |
|
input_top_p_1 = gr.Textbox(label="top_p_1", value=0.1) |
|
input_temperature_2 = gr.Textbox(label="temperature_2", value=0.2) |
|
input_top_p_2 = gr.Textbox(label="top_p_2", value=0.1) |
|
|
|
|
|
output_1st_call = gr.JSON(label="Assistant (output_1st_call)") |
|
output_fc_call = gr.Textbox(label="Function Call (exec_python) Result (output_fc_call)") |
|
output_fc_call_with_prompt = gr.Textbox(label="Building 2nd Prompt (output_fc_call_with_2nd_prompt)") |
|
output_2nd_call = gr.JSON(label="Assistant (output_2nd_call_buat_user)") |
|
cost = gr.Textbox(label="Cost 1") |
|
cost2 = gr.Textbox(label="Cost 2") |
|
finalcost = gr.Textbox(label="Final Cost ($)") |
|
finalcostrp = gr.Textbox(label="Final Cost (Rp)") |
|
finalmessages = gr.JSON(label="Final Messages") |
|
model_dropdown_1 = gr.Dropdown(["gpt-4", "gpt-4-1106-preview", "gpt-3.5-turbo-1106"], label="Model 1", info="Pilih model 1!", value="gpt-3.5-turbo-1106") |
|
model_dropdown_2 = gr.Dropdown(["gpt-4", "gpt-4-1106-preview", "gpt-3.5-turbo-1106"], label="Model 2", info="Pilih model 2!", value="gpt-3.5-turbo-1106") |
|
cost_list = gr.JSON(label="Cost List ($)") |
|
cost_list_rp = gr.JSON(label="Cost List (Rp)") |
|
prettieroutput = gr.Markdown(label="Last Output", latex_delimiters=[ |
|
{'left': "$$", 'right': "$$", 'display': True}, |
|
{'left': "$", 'right': "$", 'display': False}, |
|
{'left': "\\(", 'right': "\\)", 'display': False}, |
|
{'left': "\\[", 'right': "\\]", 'display': True} |
|
]) |
|
|
|
iface = gr.Interface( |
|
fn=main_function, |
|
inputs=[init_prompt, prompt, prompt_cadangan, user_input,input_temperature_1, input_top_p_1, input_temperature_2, input_top_p_2, model_dropdown_1, model_dropdown_2], |
|
outputs=[output_1st_call, output_fc_call, output_fc_call_with_prompt, output_2nd_call, cost, cost2, finalmessages, finalcost, finalcostrp, cost_list, cost_list_rp, prettieroutput], |
|
title="Test", |
|
description="Accuracy", |
|
) |
|
|
|
iface.launch(share=True, debug=True) |
|
|
|
if __name__ == "__main__": |
|
gradio_function() |