bstraehle's picture
Update app.py
70bf8f7 verified
raw
history blame
11.9 kB
# TODO:
#
# 1. Gradio session / multi-user thread
# Reference:
#
# https://vimeo.com/990334325/56b552bc7a
# https://platform.openai.com/playground/assistants
# https://cookbook.openai.com/examples/assistants_api_overview_python
# https://platform.openai.com/docs/api-reference/assistants/createAssistant
# https://platform.openai.com/docs/assistants/tools
import gradio as gr
import pandas as pd
import yfinance as yf
import json, openai, os, time
from datetime import date
from openai import OpenAI
from tavily import TavilyClient
from typing import List
from utils import function_to_schema, show_json
openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
tavily_client = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))
assistant_id = "asst_YWSKwQEiYVGApJdxxrTH0Cy8"
assistant, thread = None, None
def today_tool() -> str:
"""Returns today's date. Use this function for any questions related to knowing today's date.
There should be no input. This function always returns today's date."""
return str(date.today())
def yf_download_tool(tickers: List[str], start_date: date, end_date: date) -> pd.DataFrame:
"""Returns historical stock data for a list of given tickers from start date to end date
using the yfinance library download function.
Use this function for any questions related to getting historical stock data.
The input should be the tickers as a List of strings, a start date, and an end date.
This function always returns a pandas DataFrame."""
return yf.download(tickers, start=start_date, end=end_date)
def tavily_search_tool(query: str) -> str:
"""Searches the web for a given query and returns an answer, "
ready for use as context in a RAG application, using the Tavily API.
Use this function for any questions requiring knowledge not available to the model.
The input should be the query string. This function always returns an answer string."""
return tavily_client.get_search_context(query=query, max_results=5)
tools = {
"today_tool": today_tool,
"yf_download_tool": yf_download_tool,
"tavily_search_tool": tavily_search_tool,
}
def create_assistant(openai_client):
assistant = openai_client.beta.assistants.create(
name="Python Coding Assistant",
instructions=(
"You are a Python programming language expert that "
"generates Pylint-compliant code and explains it. "
"Execute code when explicitly asked to."
),
model="gpt-4o",
tools=[
{"type": "code_interpreter"},
{"type": "function", "function": function_to_schema(today_tool)},
{"type": "function", "function": function_to_schema(yf_download_tool)},
{"type": "function", "function": function_to_schema(tavily_search_tool)},
],
)
show_json("assistant", assistant)
return assistant
def load_assistant(openai_client):
assistant = openai_client.beta.assistants.retrieve(assistant_id)
show_json("assistant", assistant)
return assistant
def create_thread(openai_client):
thread = openai_client.beta.threads.create()
show_json("thread", thread)
return thread
def create_message(openai_client, thread, msg):
message = openai_client.beta.threads.messages.create(
role="user",
thread_id=thread.id,
content=msg,
)
show_json("message", message)
return message
def create_run(openai_client, assistant, thread):
run = openai_client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
parallel_tool_calls=False,
)
show_json("run", run)
return run
def wait_on_run(openai_client, thread, run):
while run.status == "queued" or run.status == "in_progress":
run = openai_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
time.sleep(1)
show_json("run", run)
if hasattr(run, "last_error") and run.last_error:
raise gr.Error(run.last_error)
return run
def get_run_steps(openai_client, thread, run):
run_steps = openai_client.beta.threads.runs.steps.list(
thread_id=thread.id,
run_id=run.id,
order="asc",
)
show_json("run_steps", run_steps)
return run_steps
def execute_tool_call(tool_call):
name = tool_call.function.name
args = {}
if len(tool_call.function.arguments) > 10:
args = json.loads(tool_call.function.arguments)
return tools[name](**args)
def execute_tool_calls(run_steps):
run_step_details = []
tool_call_ids = []
tool_call_results = []
for step in run_steps.data:
step_details = step.step_details
run_step_details.append(step_details)
show_json("step_details", step_details)
if hasattr(step_details, "tool_calls"):
for tool_call in step_details.tool_calls:
show_json("tool_call", tool_call)
if hasattr(tool_call, "function"):
tool_call_ids.append(tool_call.id)
tool_call_results.append(execute_tool_call(tool_call))
return tool_call_ids, tool_call_results
def get_messages(openai_client, thread):
messages = openai_client.beta.threads.messages.list(
thread_id=thread.id
)
show_json("messages", messages)
return messages
def extract_content_values(data):
text_values, image_values = [], []
for item in data.data:
for content in item.content:
if content.type == "text":
text_value = content.text.value
text_values.append(text_value)
if content.type == "image_file":
image_value = content.image_file.file_id
image_values.append(image_value)
return text_values, image_values
###
def generate_tool_outputs(tool_call_ids, tool_call_results):
tool_outputs = []
for tool_call_id, tool_call_result in zip(tool_call_ids, tool_call_results):
tool_output = {}
try:
tool_output = {
"tool_call_id": tool_call_id,
"output": tool_call_result.to_json()
}
print("###")
print(tool_call_id)
print(tool_call_result.to_json())
print("###")
except AttributeError:
tool_output = {
"tool_call_id": tool_call_id,
"output": tool_call_result
}
print("###")
print(tool_call_id)
print(tool_call_result)
print("###")
tool_outputs.append(tool_output)
return tool_outputs
###
def chat(message, history):
if not message:
raise gr.Error("Message is required.")
global assistant, thread
if assistant == None:
assistant = create_assistant(openai_client) # on first run, create assistant and update assistant_id
# see https://platform.openai.com/playground/assistants
#assistant = load_assistant(openai_client) # on subsequent runs, load assistant
if thread == None or len(history) == 0:
thread = create_thread(openai_client)
create_message(openai_client, thread, message)
run = create_run(openai_client, assistant, thread)
run = wait_on_run(openai_client, thread, run)
run_steps = get_run_steps(openai_client, thread, run)
### TODO
tool_call_ids, tool_call_results = execute_tool_calls(run_steps)
if len(tool_call_ids) > 0:
print("###")
print(len(tool_call_ids))
print(tool_call_ids)
print(tool_call_ids[0])
print(tool_call_results)
print(tool_call_results[0])
print("###")
# https://platform.openai.com/docs/api-reference/runs/submitToolOutputs
tool_output = {}
try:
tool_output = {
"tool_call_id": tool_call_ids[0],
"output": tool_call_results[0].to_json()
}
except AttributeError:
tool_output = {
"tool_call_id": tool_call_ids[0],
"output": tool_call_results[0]
}
run = openai_client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,
run_id=run.id,
tool_outputs=[tool_output]
)
run = wait_on_run(openai_client, thread, run)
run_steps = get_run_steps(openai_client, thread, run)
###
tool_call_ids, tool_call_results = execute_tool_calls(run_steps)
if len(tool_call_ids) > 1:
print("###")
print(len(tool_call_ids))
print(tool_call_ids)
print(tool_call_ids[1])
print(tool_call_results)
print(tool_call_results[1])
print("###")
# https://platform.openai.com/docs/api-reference/runs/submitToolOutputs
tool_output = {}
try:
tool_output = {
"tool_call_id": tool_call_ids[1],
"output": tool_call_results[1].to_json()
}
except AttributeError:
tool_output = {
"tool_call_id": tool_call_ids[1],
"output": tool_call_results[1]
}
run = openai_client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,
run_id=run.id,
tool_outputs=[tool_output]
)
run = wait_on_run(openai_client, thread, run)
run_steps = get_run_steps(openai_client, thread, run)
###
messages = get_messages(openai_client, thread)
text_values, image_values = extract_content_values(messages)
download_link = ""
if len(image_values) > 0:
download_link = f"<p>Download: https://platform.openai.com/storage/files/{image_values[0]}</p>"
#return f"{text_values[0]}{download_link}"
return f"{'<hr>'.join(list(reversed(text_values))[1:])}{download_link}"
gr.ChatInterface(
fn=chat,
chatbot=gr.Chatbot(height=350),
textbox=gr.Textbox(placeholder="Ask anything", container=False, scale=7),
title="Python Coding Assistant",
description=(
"The assistant can **generate, explain, fix, optimize, and document Python code, "
"create unit test cases, and answer general coding-related questions.** "
"It can also **execute code**. "
"The assistant has access to a <b>today tool</b> (get current date), to a "
"**yfinance download tool** (get stock data), and to a "
"**tavily search tool** (web search for RAG)."
),
clear_btn="Clear",
retry_btn=None,
undo_btn=None,
examples=[
["Generate: Python code to fine-tune model meta-llama/Meta-Llama-3.1-8B on dataset gretelai/synthetic_text_to_sql using QLoRA"],
["Explain: r\"^(?=.*[A-Z])(?=.*[a-z])(?=.*[0-9])(?=.*[\\W]).{8,}$\""],
["Fix: x = [5, 2, 1, 3, 4]; print(x.sort())"],
["Optimize: x = []; for i in range(0, 10000): x.append(i)"],
["Execute: First 25 Fibbonaci numbers"],
["Execute with tools: Create a plot showing stock gain QTD for NVDA and AMD, x-axis is \"Day\" and y-axis is \"Gain %\""],
["Execute with tools: Get key announcements from the latest OpenAI Dev Day"]
],
cache_examples=False,
).launch()