functioncall / app.py
mistpe's picture
Update app.py
3d3f22c verified
import os
import json
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from flask import Flask, request, jsonify, send_from_directory
from openai import OpenAI
from bs4 import BeautifulSoup
import random
from functions import FUNCTIONS_GROUP_1, FUNCTIONS_GROUP_2, get_function_descriptions
app = Flask(__name__)
API_KEY = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
emailkey = os.getenv("EMAIL_KEY")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
def search_duckduckgo(keywords):
search_term = " ".join(keywords)
url = "https://www.bing.com/search"
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
]
headers = {
"User-Agent": random.choice(user_agents)
}
params = {
"q": search_term,
"setlang": "zh-CN"
}
response = requests.get(url, params=params, headers=headers)
results = []
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
for item in soup.select('.b_algo')[:5]: # Limit to 5 results
title_elem = item.select_one('h2 a')
snippet_elem = item.select_one('.b_caption p')
if title_elem and snippet_elem:
results.append({
"title": title_elem.text,
"href": title_elem['href'],
"body": snippet_elem.text
})
return results
def search_papers(query):
url = f"https://api.crossref.org/works?query={query}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
papers = data['message']['items']
processed_papers = []
for paper in papers:
processed_paper = {
"标题": paper.get('title', [''])[0],
"作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]),
"DOI": paper.get('DOI', ''),
"ISBN": ", ".join(paper.get('ISBN', [])),
"摘要": paper.get('abstract', '').replace('<p>', '').replace('</p>', '').replace('<italic>', '').replace('</italic>', '')
}
processed_papers.append(processed_paper)
return processed_papers
else:
return []
def send_email(to, subject, content):
try:
with smtplib.SMTP('106.15.184.28', 8025) as smtp:
smtp.login("jwt", emailkey)
message = MIMEMultipart()
message['From'] = "Me <[email protected]>"
message['To'] = to
message['Subject'] = subject
message.attach(MIMEText(content, 'html'))
smtp.sendmail("[email protected]", to, message.as_string())
return True
except Exception as e:
print(f"发送邮件时出错: {str(e)}")
return False
def get_openai_response(messages, model="gpt-4o-mini", functions=None, function_call=None):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
functions=functions,
function_call=function_call
)
return response.choices[0].message
except Exception as e:
print(f"调用OpenAI API时出错: {str(e)}")
return None
def process_function_call(function_name, function_args):
if function_name == "search_duckduckgo":
keywords = function_args.get('keywords', [])
if not keywords:
return "搜索关键词为空,无法执行搜索。"
return search_duckduckgo(keywords)
elif function_name == "search_papers":
query = function_args.get('query', '')
if not query:
return "搜索查询为空,无法执行论文搜索。"
return search_papers(query)
elif function_name == "send_email":
to = function_args.get('to', '')
subject = function_args.get('subject', '')
content = function_args.get('content', '')
if not to or not subject or not content:
return "邮件信息不完整,无法发送邮件。"
success = send_email(to, subject, content)
return {
"success": success,
"message": "邮件发送成功" if success else "邮件发送失败",
"to": to,
"subject": subject,
"content": content,
"is_email": True
}
else:
return "未知的函数调用。"
@app.route('/')
def index():
return send_from_directory('.', 'index.html')
@app.route('/chat', methods=['POST'])
def chat():
data = request.json
question = data['question']
history = data.get('history', [])
messages = history + [{"role": "user", "content": question}]
status_log = []
# 次级模型1: 处理搜索相关函数
status_log.append("次级模型1:正在判断是否需要选调第一组函数")
sub_model_1_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_1, function_call="auto")
# 次级模型2: 处理邮件发送相关函数
status_log.append("次级模型2:正在判断是否需要选调第二组函数")
sub_model_2_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_2, function_call="auto")
function_call_1 = sub_model_1_response.function_call if sub_model_1_response and sub_model_1_response.function_call else None
function_call_2 = sub_model_2_response.function_call if sub_model_2_response and sub_model_2_response.function_call else None
if not function_call_1:
status_log.append("次级模型1:判断不需要选调第一组函数")
if not function_call_2:
status_log.append("次级模型2:判断不需要选调第二组函数")
final_function_call = None
response = None
search_results = None
email_sent = False
if function_call_1 and function_call_2:
# 裁决模型: 决定使用哪个函数调用
status_log.append("裁决模型:正在决定使用哪个函数调用")
arbitration_messages = messages + [
{"role": "system", "content": "两个次级模型都建议使用函数。请决定使用哪个函数更合适。"},
{"role": "assistant", "content": f"次级模型1建议使用函数:{function_call_1.name}"},
{"role": "assistant", "content": f"次级模型2建议使用函数:{function_call_2.name}"}
]
arbitration_response = get_openai_response(arbitration_messages, model="gpt-4o-mini")
if "模型1" in arbitration_response.content or function_call_1.name in arbitration_response.content:
final_function_call = function_call_1
status_log.append(f"裁决模型:决定使用函数 {function_call_1.name}")
else:
final_function_call = function_call_2
status_log.append(f"裁决模型:决定使用函数 {function_call_2.name}")
elif function_call_1:
final_function_call = function_call_1
status_log.append(f"次级模型1:决定使用函数 {function_call_1.name}")
elif function_call_2:
final_function_call = function_call_2
status_log.append(f"次级模型2:决定使用函数 {function_call_2.name}")
else:
status_log.append("所有次级模型:判断不需要进行任何函数调用")
if final_function_call:
function_name = final_function_call.name
function_args = json.loads(final_function_call.arguments)
status_log.append(f"正在执行函数 {function_name}")
result = process_function_call(function_name, function_args)
status_log.append(f"函数 {function_name} 执行完成")
if isinstance(result, dict) and result.get("is_email", False):
response = f"邮件{'已成功' if result['success'] else '未能成功'}发送到 {result['to']}。\n\n主题:{result['subject']}\n\n内容:\n{result['content']}"
email_sent = result['success']
elif isinstance(result, list):
search_results = result
messages.append({
"role": "function",
"name": function_name,
"content": json.dumps(result, ensure_ascii=False)
})
else:
messages.append({
"role": "function",
"name": function_name,
"content": str(result)
})
# 只有在没有邮件发送结果时才调用主模型
if not response:
status_log.append("主模型:正在生成回答")
final_response = get_openai_response(messages, model="gpt-4o-mini")
response = final_response.content if final_response else "Error occurred"
status_log.append("主模型:回答生成完成")
return jsonify({
"response": response,
"status_log": status_log,
"search_results": search_results,
"search_used": bool(search_results),
"email_sent": email_sent
})
@app.route('/settings', methods=['POST'])
def update_settings():
data = request.json
max_history = data.get('max_history', 10)
return jsonify({"status": "success", "max_history": max_history})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)
# from flask import Flask, request, jsonify, send_from_directory
# import requests
# from bs4 import BeautifulSoup
# import random
# import time
# app = Flask(__name__)
# def perform_bing_search(keywords):
# search_term = " ".join(keywords)
# url = "https://www.bing.com/search"
# user_agents = [
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
# "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
# ]
# headers = {"User-Agent": random.choice(user_agents)}
# params = {"q": search_term, "setlang": "zh-CN"}
# response = requests.get(url, params=params, headers=headers)
# if response.status_code == 200:
# soup = BeautifulSoup(response.text, 'html.parser')
# results = soup.select('.b_algo')
# search_results = []
# for result in results[:5]: # 只取前5个结果
# title = result.select_one('h2 a')
# snippet = result.select_one('.b_caption p')
# if title and snippet:
# search_results.append({
# "title": title.text,
# "url": title['href'],
# "snippet": snippet.text
# })
# return search_results
# return []
# @app.route('/')
# def index():
# return send_from_directory('.', 'we.html')
# @app.route('/start_test', methods=['POST'])
# def start_test():
# data = request.json
# keywords = data['keywords'].split()
# interval = int(data['interval'])
# first_search = perform_bing_search(keywords)
# time.sleep(interval)
# second_search = perform_bing_search(keywords)
# success = len(first_search) > 0 and len(second_search) > 0
# return jsonify({
# "success": success,
# "first_search": first_search,
# "second_search": second_search
# })
# if __name__ == '__main__':
# app.run(host='0.0.0.0', port=7860, debug=True)