Spaces:
Running
Running
import gradio as gr | |
import openai, os, time | |
from openai import OpenAI | |
from utils import function_to_schema, show_json | |
# Tools | |
triage_agent, sales_agent, issues_repairs_agent = None, None, None | |
def transfer_to_triage_agent(): | |
"""Call this if the user brings up a topic outside of your purview, | |
including escalating to human.""" | |
set_current_agent(triage_agent) | |
def transfer_to_sales_agent(): | |
"""Use for anything sales or buying related.""" | |
set_current_agent(sales_agent) | |
def transfer_to_issues_repairs_agent(): | |
"""Use for issues, repairs, or refunds.""" | |
set_current_agent(issues_repairs_agent) | |
# | |
def escalate_to_human(summary): | |
"""Only call this if explicitly asked to.""" | |
print("Escalating to human agent...") | |
print("\n=== Escalation Report ===") | |
print(f"Summary: {summary}") | |
print("=========================\n") | |
exit() | |
# | |
def execute_order(product, price: int): | |
"""Price should be in USD.""" | |
print("\n\n=== Order Summary ===") | |
print(f"Product: {product}") | |
print(f"Price: ${price}") | |
print("=================\n") | |
confirm = input("Confirm order? y/n: ").strip().lower() | |
if confirm == "y": | |
print("Order execution successful!") | |
return "Success" | |
else: | |
print(color("Order cancelled!", "red")) | |
return "User cancelled order." | |
def look_up_item(search_query): | |
"""Use to find item ID. | |
Search query can be a description or keywords.""" | |
item_id = "item_132612938" | |
print("Found item:", item_id) | |
return item_id | |
def execute_refund(item_id, reason="not provided"): | |
print("\n\n=== Refund Summary ===") | |
print(f"Item ID: {item_id}") | |
print(f"Reason: {reason}") | |
print("=================\n") | |
print("Refund execution successful!") | |
return "Success" | |
# Agents | |
MODEL = "gpt-4o-mini" | |
def create_triage_agent(client): | |
return client.beta.assistants.create( | |
name="Triage Agent", | |
instructions=( | |
"You are a customer service bot for ACME Inc. " | |
"Introduce yourself. Always be very brief. " | |
"Gather information to direct the customer to the right department. " | |
"But make your questions subtle and natural." | |
), | |
model=MODEL, | |
tools=[{"type": "function", "function": function_to_schema(transfer_to_sales_agent)}, | |
{"type": "function", "function": function_to_schema(transfer_to_issues_repairs_agent)}, | |
{"type": "function", "function": function_to_schema(escalate_to_human)}], | |
) | |
def create_sales_agent(client): | |
return client.beta.assistants.create( | |
name="Sales Agent", | |
instructions=( | |
"You are a sales agent for ACME Inc." | |
"Always answer in a sentence or less." | |
"Follow the following routine with the user:" | |
"1. Ask them about any problems in their life related to catching roadrunners.\n" | |
"2. Casually mention one of ACME's crazy made-up products can help.\n" | |
" - Don't mention price.\n" | |
"3. Once the user is bought in, drop a ridiculous price.\n" | |
"4. Only after everything, and if the user says yes, " | |
"tell them a crazy caveat and execute their order.\n" | |
"" | |
), | |
model=MODEL, | |
tools=[{"type": "function", "function": function_to_schema(execute_order)}, | |
{"type": "function", "function": function_to_schema(transfer_to_triage_agent)}], | |
) | |
def create_issues_repairs_agent(client): | |
return client.beta.assistants.create( | |
name="Issues and Repairs Agent", | |
instructions=( | |
"You are a customer support agent for ACME Inc." | |
"Always answer in a sentence or less." | |
"Follow the following routine with the user:" | |
"1. First, ask probing questions and understand the user's problem deeper.\n" | |
" - unless the user has already provided a reason.\n" | |
"2. Propose a fix (make one up).\n" | |
"3. ONLY if not satesfied, offer a refund.\n" | |
"4. If accepted, search for the ID and then execute refund." | |
"" | |
), | |
model=MODEL, | |
tools=[{"type": "function", "function": function_to_schema(look_up_item)}, | |
{"type": "function", "function": function_to_schema(execute_refund)}, | |
{"type": "function", "function": function_to_schema(transfer_to_triage_agent)}], | |
) | |
# | |
def create_thread(client): | |
thread = client.beta.threads.create() | |
#show_json("thread", thread) | |
return thread | |
def create_message(client, thread, msg): | |
message = client.beta.threads.messages.create( | |
role="user", | |
thread_id=thread.id, | |
content=msg, | |
) | |
#show_json("message", message) | |
return message | |
def create_run(client, assistant, thread): | |
run = client.beta.threads.runs.create( | |
assistant_id=assistant.id, | |
thread_id=thread.id, | |
) | |
#show_json("run", run) | |
return run | |
def wait_on_run(client, thread, run): | |
while run.status == "queued" or run.status == "in_progress": | |
run = client.beta.threads.runs.retrieve( | |
thread_id=thread.id, | |
run_id=run.id, | |
) | |
time.sleep(0.25) | |
#show_json("run", run) | |
return run | |
### | |
def execute_tool_call(tool_call): | |
name = tool_call.function.name | |
args = json.loads(tool_call.function.arguments) | |
print(f"{name}, {args}") | |
#return tools[name](**args) | |
### | |
def list_run_steps(client, thread, run): | |
run_steps = client.beta.threads.runs.steps.list( | |
thread_id=thread.id, | |
run_id=run.id, | |
order="asc", | |
) | |
for step in run_steps.data: | |
step_details = step.step_details | |
show_json("step_details", step_details) | |
if not step_details.tool_calls: | |
break | |
for tool_call in step_details.tool_calls: | |
result = execute_tool_call(tool_call) | |
return run_steps | |
def list_messages(client, thread): | |
messages = client.beta.threads.messages.list( | |
thread_id=thread.id | |
) | |
#show_json("messages", messages) | |
return messages | |
def extract_content_values(data): | |
content_values = [] | |
for item in data.data: | |
for content in item.content: | |
if content.type == 'text': | |
content_values.append(content.text.value) | |
return content_values | |
# | |
current_agent, current_thread = None, None | |
def set_current_agent(agent): | |
global current_agent | |
current_agent = agent | |
def set_current_thread(thread): | |
global current_thread | |
current_thread = thread | |
def get_current_agent(): | |
global current_agent | |
return current_agent | |
def get_current_thread(): | |
global current_thread | |
return current_thread | |
# | |
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
triage_agent = create_triage_agent(client) | |
sales_agent = create_sales_agent(client) | |
issues_repairs_agent = create_issues_repairs_agent(client) | |
set_current_agent(triage_agent) | |
triage_thread = create_thread(client) | |
sales_thread = create_thread(client) | |
issues_repairs_thread = create_thread(client) | |
set_current_thread(triage_thread) | |
def chat(message, history, openai_api_key): | |
global client | |
assistant = get_current_agent() | |
show_json("Current Agent", assistant) | |
thread = get_current_thread() | |
#show_json("Current Thread", thread) | |
create_message(client, thread, message) | |
# async | |
run = create_run(client, assistant, thread) | |
run = wait_on_run(client, thread, run) | |
list_run_steps(client, thread, run) | |
messages = list_messages(client, thread) | |
return extract_content_values(messages)[0] | |
gr.ChatInterface( | |
chat, | |
chatbot=gr.Chatbot(height=300), | |
textbox=gr.Textbox(placeholder="Question", container=False, scale=7), | |
title="Multi-Agent Orchestration", | |
description="Demo using hand-off pattern: triage agent, sales agent, and issues & repairs agent", | |
clear_btn="Clear", | |
retry_btn=None, | |
undo_btn=None, | |
#examples=[["Generate the first 10 Fibbonaci numbers with code.", "sk-<BringYourOwn>"]], | |
#cache_examples=False, | |
#additional_inputs=[ | |
# gr.Textbox("sk-", label="OpenAI API Key", type = "password"), | |
#], | |
).launch() |