Spaces:
Running
Running
# TODO: Gradio session / multi-user thread | |
# Reference: | |
# | |
# https://vimeo.com/990334325/56b552bc7a | |
# https://platform.openai.com/playground/assistants | |
# https://cookbook.openai.com/examples/assistants_api_overview_python | |
# https://platform.openai.com/docs/api-reference/assistants/createAssistant | |
# https://platform.openai.com/docs/assistants/tools | |
import gradio as gr | |
import pandas as pd | |
import yfinance as yf | |
import json, openai, os, time | |
from datetime import date | |
from openai import OpenAI | |
from tavily import TavilyClient | |
from typing import List | |
from utils import function_to_schema, show_json | |
openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
tavily_client = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY")) | |
assistant_id = "asst_DbCpNsJ0vHSSdl6ePlkKZ8wG" | |
assistant, thread = None, None | |
def today_tool() -> str: | |
"""Returns today's date. Use this function for any questions related to knowing today's date. | |
There should be no input. This function always returns today's date.""" | |
return str(date.today()) | |
def yf_download_tool(tickers: List[str], start_date: date, end_date: date) -> pd.DataFrame: | |
"""Returns historical stock data for a list of given tickers from start date to end date | |
using the yfinance library download function. | |
Use this function for any questions related to getting historical stock data. | |
The input should be the tickers as a List of strings, a start date, and an end date. | |
This function always returns a pandas DataFrame.""" | |
return yf.download(tickers, start=start_date, end=end_date) | |
def tavily_search_tool(query: str) -> str: | |
"""Searches the web for a given query and returns an answer, " | |
ready for use as context in a RAG application, using the Tavily API. | |
Use this function for any questions requiring knowledge not available to the model. | |
The input should be the query string. This function always returns an answer string.""" | |
return tavily_client.get_search_context(query=query, max_results=5) | |
tools = { | |
"today_tool": today_tool, | |
"yf_download_tool": yf_download_tool, | |
"tavily_search_tool": tavily_search_tool, | |
} | |
def create_assistant(openai_client): | |
assistant = openai_client.beta.assistants.create( | |
name="Python Coding Assistant", | |
instructions=( | |
"You are a Python programming language expert that " | |
"generates Pylint-compliant code and explains it. " | |
"Execute code when explicitly asked to." | |
), | |
model="gpt-4o", | |
tools=[ | |
{"type": "code_interpreter"}, | |
{"type": "function", "function": function_to_schema(today_tool)}, | |
{"type": "function", "function": function_to_schema(yf_download_tool)}, | |
{"type": "function", "function": function_to_schema(tavily_search_tool)}, | |
], | |
) | |
show_json("assistant", assistant) | |
return assistant | |
def load_assistant(openai_client): | |
assistant = openai_client.beta.assistants.retrieve(assistant_id) | |
show_json("assistant", assistant) | |
return assistant | |
def create_thread(openai_client): | |
thread = openai_client.beta.threads.create() | |
show_json("thread", thread) | |
return thread | |
def create_message(openai_client, thread, msg): | |
message = openai_client.beta.threads.messages.create( | |
role="user", | |
thread_id=thread.id, | |
content=msg, | |
) | |
show_json("message", message) | |
return message | |
def create_run(openai_client, assistant, thread): | |
run = openai_client.beta.threads.runs.create( | |
assistant_id=assistant.id, | |
thread_id=thread.id, | |
parallel_tool_calls=False, | |
) | |
show_json("run", run) | |
return run | |
def wait_on_run(openai_client, thread, run): | |
while run.status == "queued" or run.status == "in_progress": | |
run = openai_client.beta.threads.runs.retrieve( | |
thread_id=thread.id, | |
run_id=run.id, | |
) | |
time.sleep(1) | |
show_json("run", run) | |
if hasattr(run, "last_error") and run.last_error: | |
raise gr.Error(run.last_error) | |
return run | |
def get_run_steps(openai_client, thread, run): | |
run_steps = openai_client.beta.threads.runs.steps.list( | |
thread_id=thread.id, | |
run_id=run.id, | |
order="asc", | |
) | |
show_json("run_steps", run_steps) | |
return run_steps | |
def execute_tool_call(tool_call): | |
name = tool_call.function.name | |
args = {} | |
if len(tool_call.function.arguments) > 10: | |
args = json.loads(tool_call.function.arguments) | |
return tools[name](**args) | |
def execute_tool_calls(run_steps): | |
run_step_details = [] | |
tool_call_ids = [] | |
tool_call_results = [] | |
for step in run_steps.data: | |
step_details = step.step_details | |
run_step_details.append(step_details) | |
show_json("step_details", step_details) | |
if hasattr(step_details, "tool_calls"): | |
for tool_call in step_details.tool_calls: | |
show_json("tool_call", tool_call) | |
if hasattr(tool_call, "function"): | |
tool_call_ids.append(tool_call.id) | |
tool_call_results.append(execute_tool_call(tool_call)) | |
return tool_call_ids, tool_call_results | |
def get_messages(openai_client, thread): | |
messages = openai_client.beta.threads.messages.list( | |
thread_id=thread.id | |
) | |
show_json("messages", messages) | |
return messages | |
def extract_content_values(data): | |
text_values, image_values = [], [] | |
for item in data.data: | |
for content in item.content: | |
if content.type == "text": | |
text_value = content.text.value | |
text_values.append(text_value) | |
if content.type == "image_file": | |
image_value = content.image_file.file_id | |
image_values.append(image_value) | |
return text_values, image_values | |
### | |
def generate_tool_outputs(tool_call_ids, tool_call_results): | |
tool_outputs = [] | |
for tool_call_id, tool_call_result in zip(tool_call_ids, tool_call_results): | |
tool_output = {} | |
try: | |
tool_output = { | |
"tool_call_id": tool_call_id, | |
"output": tool_call_result.to_json() | |
} | |
except AttributeError: | |
tool_output = { | |
"tool_call_id": tool_call_id, | |
"output": tool_call_result | |
} | |
tool_outputs.append(tool_output) | |
return tool_outputs | |
### | |
def chat(message, history): | |
if not message: | |
raise gr.Error("Message is required.") | |
global assistant, thread | |
if assistant == None: | |
#assistant = create_assistant(openai_client) # on first run, create assistant and update assistant_id | |
# see https://platform.openai.com/playground/assistants | |
assistant = load_assistant(openai_client) # on subsequent runs, load assistant | |
if thread == None or len(history) == 0: | |
thread = create_thread(openai_client) | |
create_message(openai_client, thread, message) | |
run = create_run(openai_client, assistant, thread) | |
run = wait_on_run(openai_client, thread, run) | |
run_steps = get_run_steps(openai_client, thread, run) | |
### TODO | |
tool_call_ids, tool_call_results = execute_tool_calls(run_steps) | |
if len(tool_call_ids) > 0: | |
# https://platform.openai.com/docs/api-reference/runs/submitToolOutputs | |
tool_output = {} | |
try: | |
tool_output = { | |
"tool_call_id": tool_call_ids[0], | |
"output": tool_call_results[0].to_json() | |
} | |
except AttributeError: | |
tool_output = { | |
"tool_call_id": tool_call_ids[0], | |
"output": tool_call_results[0] | |
} | |
run = openai_client.beta.threads.runs.submit_tool_outputs( | |
thread_id=thread.id, | |
run_id=run.id, | |
tool_outputs=[tool_output] | |
) | |
run = wait_on_run(openai_client, thread, run) | |
run_steps = get_run_steps(openai_client, thread, run) | |
### | |
tool_call_ids, tool_call_results = execute_tool_calls(run_steps) | |
if len(tool_call_ids) > 1: | |
# https://platform.openai.com/docs/api-reference/runs/submitToolOutputs | |
tool_output = {} | |
try: | |
tool_output = { | |
"tool_call_id": tool_call_ids[1], | |
"output": tool_call_results[1].to_json() | |
} | |
except AttributeError: | |
tool_output = { | |
"tool_call_id": tool_call_ids[1], | |
"output": tool_call_results[1] | |
} | |
run = openai_client.beta.threads.runs.submit_tool_outputs( | |
thread_id=thread.id, | |
run_id=run.id, | |
tool_outputs=[tool_output] | |
) | |
run = wait_on_run(openai_client, thread, run) | |
run_steps = get_run_steps(openai_client, thread, run) | |
### | |
messages = get_messages(openai_client, thread) | |
text_values, image_values = extract_content_values(messages) | |
download_link = "" | |
if len(image_values) > 0: | |
download_link = f"<p>Download: https://platform.openai.com/storage/files/{image_values[0]}</p>" | |
return f"{'<hr>'.join(list(reversed(text_values))[1:])}{download_link}" | |
gr.ChatInterface( | |
fn=chat, | |
chatbot=gr.Chatbot(height=350), | |
textbox=gr.Textbox(placeholder="Ask anything", container=False, scale=7), | |
title="Python Coding Assistant", | |
description=( | |
"The assistant can **generate, explain, fix, optimize,** and **document Python code, " | |
"create unit test cases,** and **answer general coding-related questions.** " | |
"It can also **execute code**. " | |
"The assistant has access to a <b>today tool</b> (get current date), to a " | |
"**yfinance download tool** (get stock data), and to a " | |
"**tavily search tool** (web search)." | |
), | |
clear_btn="Clear", | |
retry_btn=None, | |
undo_btn=None, | |
examples=[ | |
["Generate: Python code to fine-tune model meta-llama/Meta-Llama-3.1-8B on dataset gretelai/synthetic_text_to_sql using QLoRA"], | |
["Explain: r\"^(?=.*[A-Z])(?=.*[a-z])(?=.*[0-9])(?=.*[\\W]).{8,}$\""], | |
["Fix: x = [5, 2, 1, 3, 4]; print(x.sort())"], | |
["Optimize: x = []; for i in range(0, 10000): x.append(i)"], | |
["Execute: First 25 Fibbonaci numbers"], | |
["Execute with tools: Create a plot showing stock gain QTD for NVDA and AMD, x-axis is \"Day\" and y-axis is \"Gain %\""], | |
["Execute with tools: Get key announcements from the latest OpenAI Dev Day"] | |
], | |
cache_examples=False, | |
).launch() |