import gradio as gr import json, openai, os, time from openai import OpenAI from utils import function_to_schema, show_json # Tools triage_agent, sales_agent, issues_repairs_agent = None, None, None triage_thread, sales_thread, issues_repairs_thread = None, None, None def transfer_to_triage_agent(): """Call this if the user brings up a topic outside of your purview, including escalating to human.""" print("=> transfer_to_triage_agent") global triage_agent, triage_thread set_current_agent(triage_agent) set_current_thread(triage_thread) def transfer_to_sales_agent(): """Use for anything sales or buying related.""" print("=> transfer_to_sales_agent") global sales_agent, sales_thread set_current_agent(sales_agent) set_current_thread(sales_thread) def transfer_to_issues_repairs_agent(): """Use for issues, repairs, or refunds.""" print("=> transfer_to_issues_repairs_agent") global issues_repairs_agent, issues_repairs_thread set_current_agent(issues_repairs_agent) set_current_thread(issues_repairs_thread) # def escalate_to_human(summary): """Only call this if explicitly asked to.""" print(f"=> escalate_to_human: summary: {summary}") #exit() # def execute_order(product, price: int): """Price should be in USD.""" print("\n\n=== Order Summary ===") print(f"Product: {product}") print(f"Price: ${price}") print("=================\n") confirm = input("Confirm order? y/n: ").strip().lower() if confirm == "y": print("Order execution successful!") return "Success" else: print(color("Order cancelled!", "red")) return "User cancelled order." def look_up_item(search_query): """Use to find item ID. Search query can be a description or keywords.""" item_id = "item_13261293" print("Found item:", item_id) return item_id def execute_refund(item_id, reason="not provided"): print("\n\n=== Refund Summary ===") print(f"Item ID: {item_id}") print(f"Reason: {reason}") print("=================\n") print("Refund execution successful!") return "Success" # tools = { "transfer_to_triage_agent": transfer_to_triage_agent, "transfer_to_sales_agent": transfer_to_sales_agent, "transfer_to_issues_repairs_agent": transfer_to_issues_repairs_agent, "escalate_to_human": escalate_to_human, "execute_order": execute_order, "look_up_item": look_up_item, "execute_refund": execute_refund, } # Agents MODEL = "gpt-4o-mini" def create_triage_agent(client): return client.beta.assistants.create( name="Triage Agent", instructions=( "You are a customer service bot for ACME Inc. " "Introduce yourself. Always be very brief. " "Gather information to direct the customer to the right department. " "But make your questions subtle and natural." ), model=MODEL, tools=[{"type": "function", "function": function_to_schema(transfer_to_sales_agent)}, {"type": "function", "function": function_to_schema(transfer_to_issues_repairs_agent)}, {"type": "function", "function": function_to_schema(escalate_to_human)}], ) def create_sales_agent(client): return client.beta.assistants.create( name="Sales Agent", instructions=( "You are a sales agent for ACME Inc." "Always answer in a sentence or less." "Follow the following routine with the user:" "1. Ask them about any problems in their life related to catching roadrunners.\n" "2. Casually mention one of ACME's crazy made-up products can help.\n" " - Don't mention price.\n" "3. Once the user is bought in, drop a ridiculous price.\n" "4. Only after everything, and if the user says yes, " "tell them a crazy caveat and execute their order.\n" "" ), model=MODEL, tools=[{"type": "function", "function": function_to_schema(execute_order)}, {"type": "function", "function": function_to_schema(transfer_to_triage_agent)}], ) def create_issues_repairs_agent(client): return client.beta.assistants.create( name="Issues and Repairs Agent", instructions=( "You are a customer support agent for ACME Inc. " "Always answer in a sentence or less. " "Follow the following routine with the user: " "1. First, ask probing questions and understand the user's problem deeper.\n" " - unless the user has already provided a reason.\n" "2. Propose a fix (make one up).\n" "3. ONLY if not satesfied, offer a refund.\n" "4. If accepted, search for the ID and then execute refund." "" ), model=MODEL, tools=[{"type": "function", "function": function_to_schema(look_up_item)}, {"type": "function", "function": function_to_schema(execute_refund)}, {"type": "function", "function": function_to_schema(transfer_to_triage_agent)}], ) # def create_thread(client): thread = client.beta.threads.create() #show_json("thread", thread) return thread def create_message(client, thread, msg): message = client.beta.threads.messages.create( role="user", thread_id=thread.id, content=msg, ) #show_json("message", message) return message def create_run(client, assistant, thread): run = client.beta.threads.runs.create( assistant_id=assistant.id, thread_id=thread.id, ) #show_json("run", run) return run def wait_on_run(client, thread, run): while run.status == "queued" or run.status == "in_progress": run = client.beta.threads.runs.retrieve( thread_id=thread.id, run_id=run.id, ) time.sleep(0.25) #show_json("run", run) return run def execute_tool_call(tool_call): name = tool_call.function.name args = json.loads(tool_call.function.arguments) global tools tools[name](**args) def list_run_steps(client, thread, run): run_steps = client.beta.threads.runs.steps.list( thread_id=thread.id, run_id=run.id, order="asc", ) for step in run_steps.data: step_details = step.step_details #show_json("step_details", step_details) if not hasattr(step_details, "tool_calls"): break for tool_call in step_details.tool_calls: execute_tool_call(tool_call) return run_steps def list_messages(client, thread): messages = client.beta.threads.messages.list( thread_id=thread.id ) #show_json("messages", messages) return messages def extract_content_values(data): content_values = [] for item in data.data: for content in item.content: if content.type == 'text': content_values.append(content.text.value) return content_values # current_agent, current_thread = None, None def set_current_agent(agent): global current_agent #print("### set_current_agent ###") current_agent = agent #show_json("Current Agent", current_agent) def set_current_thread(thread): global current_thread #print("### set_current_thread ###") current_thread = thread #show_json("Current Thread", current_thread) def get_current_agent(): global current_agent #print("### get_current_agent ###") #show_json("Current Agent", current_agent) return current_agent def get_current_thread(): global current_thread #print("### get_current_thread ###") #show_json("Current Thread", current_thread) return current_thread # client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) triage_agent = create_triage_agent(client) sales_agent = create_sales_agent(client) issues_repairs_agent = create_issues_repairs_agent(client) set_current_agent(triage_agent) triage_thread = create_thread(client) sales_thread = create_thread(client) issues_repairs_thread = create_thread(client) set_current_thread(triage_thread) def chat(message, history, openai_api_key): global client assistant = get_current_agent() show_json("Current Agent", assistant) thread = get_current_thread() show_json("Current Thread", thread) #print("111") create_message(client, thread, message) # async run = create_run(client, assistant, thread) run = wait_on_run(client, thread, run) print("222") list_run_steps(client, thread, run) print("333") messages = list_messages(client, thread) print("444") return extract_content_values(messages)[0] gr.ChatInterface( chat, chatbot=gr.Chatbot(height=500), textbox=gr.Textbox(placeholder="Type here", container=False, scale=7), title="Multi-Agent Orchestration", description="Demo using hand-off pattern: triage agent, sales agent, and issues & repairs agent", clear_btn=None, retry_btn=None, undo_btn=None, #examples=[["Generate the first 10 Fibbonaci numbers with code.", "sk-"]], #cache_examples=False, #additional_inputs=[ # gr.Textbox("sk-", label="OpenAI API Key", type = "password"), #], ).launch()