Spaces:
Running
Running
File size: 6,592 Bytes
073d6c4 4a90510 073d6c4 4a90510 073d6c4 4a90510 073d6c4 4a90510 073d6c4 4a90510 073d6c4 4a90510 073d6c4 4a90510 073d6c4 1812acf 51df0df 9d6cba0 51df0df 4a90510 51df0df 4a90510 51df0df 9d6cba0 33f6bea 4a90510 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import pandas as pd
import yfinance as yf
import json, openai, os, time
from datetime import date
from openai import OpenAI
from tavily import TavilyClient
from typing import List
from utils import function_to_schema, show_json
openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
tavily_client = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))
assistant_id = "asst_DbCpNsJ0vHSSdl6ePlkKZ8wG"
assistant, thread = None, None
def today_tool() -> str:
"""Returns today's date. Use this function for any questions related to knowing today's date.
There should be no input. This function always returns today's date."""
return str(date.today())
def yf_download_tool(tickers: List[str], start_date: date, end_date: date) -> pd.DataFrame:
"""Returns historical stock data for a list of given tickers from start date to end date
using the yfinance library download function.
Use this function for any questions related to getting historical stock data.
The input should be the tickers as a List of strings, a start date, and an end date.
This function always returns a pandas DataFrame."""
return yf.download(tickers, start=start_date, end=end_date)
def tavily_search_tool(query: str) -> str:
"""Searches the web for a given query and returns an answer, "
ready for use as context in a RAG application, using the Tavily API.
Use this function for any questions requiring knowledge not available to the model.
The input should be the query string. This function always returns an answer string."""
return tavily_client.get_search_context(query=query, max_results=5)
tools = {
"today_tool": today_tool,
"yf_download_tool": yf_download_tool,
"tavily_search_tool": tavily_search_tool,
}
def create_assistant():
assistant = openai_client.beta.assistants.create(
name="Python Coding Assistant",
instructions=(
"You are a Python programming language expert that "
"generates Pylint-compliant code and explains it. "
"Execute code when explicitly asked to."
),
model="gpt-4o",
tools=[
{"type": "code_interpreter"},
{"type": "function", "function": function_to_schema(today_tool)},
{"type": "function", "function": function_to_schema(yf_download_tool)},
{"type": "function", "function": function_to_schema(tavily_search_tool)},
],
)
show_json("assistant", assistant)
return assistant
def load_assistant():
assistant = openai_client.beta.assistants.retrieve(assistant_id)
show_json("assistant", assistant)
return assistant
def create_thread():
thread = openai_client.beta.threads.create()
show_json("thread", thread)
return thread
def create_message(thread, msg):
message = openai_client.beta.threads.messages.create(
role="user",
thread_id=thread.id,
content=msg,
)
show_json("message", message)
return message
def create_run(assistant, thread):
run = openai_client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
parallel_tool_calls=False,
)
show_json("run", run)
return run
def wait_on_run(thread, run):
while run.status == "queued" or run.status == "in_progress":
run = openai_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
time.sleep(1)
show_json("run", run)
if hasattr(run, "last_error") and run.last_error:
raise gr.Error(run.last_error)
return run
def get_run_steps(thread, run):
run_steps = openai_client.beta.threads.runs.steps.list(
thread_id=thread.id,
run_id=run.id,
order="asc",
)
show_json("run_steps", run_steps)
return run_steps
def execute_tool_call(tool_call):
name = tool_call.function.name
args = {}
if len(tool_call.function.arguments) > 10:
args = json.loads(tool_call.function.arguments)
return tools[name](**args)
def execute_tool_calls(run_steps):
run_step_details = []
tool_call_ids = []
tool_call_results = []
for step in run_steps.data:
step_details = step.step_details
run_step_details.append(step_details)
show_json("step_details", step_details)
if hasattr(step_details, "tool_calls"):
for tool_call in step_details.tool_calls:
show_json("tool_call", tool_call)
if hasattr(tool_call, "function"):
tool_call_ids.append(tool_call.id)
tool_call_results.append(execute_tool_call(tool_call))
return tool_call_ids, tool_call_results
def recurse_execute_tool_calls(thread, run, run_steps, iteration):
tool_call_ids, tool_call_results = execute_tool_calls(run_steps)
if len(tool_call_ids) > iteration:
tool_output = {}
try:
tool_output = {
"tool_call_id": tool_call_ids[iteration],
"output": tool_call_results[iteration].to_json()
}
except AttributeError:
tool_output = {
"tool_call_id": tool_call_ids[iteration],
"output": tool_call_results[iteration]
}
# https://platform.openai.com/docs/api-reference/runs/submitToolOutputs
run = openai_client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,
run_id=run.id,
tool_outputs=[tool_output]
)
run = wait_on_run(thread, run)
run_steps = get_run_steps(thread, run)
recurse_execute_tool_calls(thread, run, run_steps, iteration + 1)
else:
return
def get_messages(thread):
messages = openai_client.beta.threads.messages.list(
thread_id=thread.id
)
show_json("messages", messages)
return messages
def extract_content_values(data):
text_values, image_values = [], []
for item in data.data:
for content in item.content:
# TODO: Handle other file types
if content.type == "text":
text_value = content.text.value
text_values.append(text_value)
if content.type == "image_file":
image_value = content.image_file.file_id
image_values.append(image_value)
return text_values, image_values |