import os import json import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from flask import Flask, request, jsonify, send_from_directory from openai import OpenAI from duckduckgo_search import DDGS from functions import FUNCTIONS_GROUP_1, FUNCTIONS_GROUP_2, get_function_descriptions app = Flask(__name__) API_KEY = os.getenv("OPENAI_API_KEY") BASE_URL = os.getenv("OPENAI_BASE_URL") emailkey = os.getenv("EMAIL_KEY") client = OpenAI(api_key=API_KEY, base_url=BASE_URL) def search_duckduckgo(keywords): search_term = " ".join(keywords) with DDGS() as ddgs: return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) def search_papers(query): url = f"https://api.crossref.org/works?query={query}" response = requests.get(url) if response.status_code == 200: data = response.json() papers = data['message']['items'] processed_papers = [] for paper in papers: processed_paper = { "标题": paper.get('title', [''])[0], "作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]), "DOI": paper.get('DOI', ''), "ISBN": ", ".join(paper.get('ISBN', [])), "摘要": paper.get('abstract', '').replace('

', '').replace('

', '').replace('', '').replace('', '') } processed_papers.append(processed_paper) return processed_papers else: return [] def send_email(to, subject, content): try: with smtplib.SMTP('106.15.184.28', 8025) as smtp: smtp.login("jwt", emailkey) message = MIMEMultipart() message['From'] = "Me " message['To'] = to message['Subject'] = subject message.attach(MIMEText(content, 'html')) smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string()) return True except Exception as e: print(f"发送邮件时出错: {str(e)}") return False def get_openai_response(messages, model="gpt-4o-mini", functions=None, function_call=None): try: response = client.chat.completions.create( model=model, messages=messages, functions=functions, function_call=function_call ) return response.choices[0].message except Exception as e: print(f"调用OpenAI API时出错: {str(e)}") return None def process_function_call(function_name, function_args): if function_name == "search_duckduckgo": keywords = function_args.get('keywords', []) if not keywords: return "搜索关键词为空,无法执行搜索。" return json.dumps(search_duckduckgo(keywords), ensure_ascii=False) elif function_name == "search_papers": query = function_args.get('query', '') if not query: return "搜索查询为空,无法执行论文搜索。" return json.dumps(search_papers(query), ensure_ascii=False) elif function_name == "send_email": to = function_args.get('to', '') subject = function_args.get('subject', '') content = function_args.get('content', '') if not to or not subject or not content: return "邮件信息不完整,无法发送邮件。" success = send_email(to, subject, content) return json.dumps({ "success": success, "message": "邮件发送成功" if success else "邮件发送失败", "to": to, "subject": subject, "content": content, "is_email": True # 添加这个标记来识别邮件发送功能 }, ensure_ascii=False) else: return "未知的函数调用。" @app.route('/') def index(): return send_from_directory('.', 'index.html') # @app.route('/chat', methods=['POST']) # def chat(): # data = request.json # question = data['question'] # history = data.get('history', []) # messages = history + [{"role": "user", "content": question}] # status_log = [] # # 次级模型1: 处理搜索相关函数 # status_log.append("次级模型1:正在判断是否需要选调第一组函数") # sub_model_1_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_1, function_call="auto") # # 次级模型2: 处理邮件发送相关函数 # status_log.append("次级模型2:正在判断是否需要选调第二组函数") # sub_model_2_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_2, function_call="auto") # function_call_1 = sub_model_1_response.function_call if sub_model_1_response and sub_model_1_response.function_call else None # function_call_2 = sub_model_2_response.function_call if sub_model_2_response and sub_model_2_response.function_call else None # final_function_call = None # response = None # if function_call_1 and function_call_2: # # 裁决模型: 决定使用哪个函数调用 # status_log.append("裁决模型:正在决定使用哪个函数调用") # arbitration_messages = messages + [ # {"role": "system", "content": "两个次级模型都建议使用函数。请决定使用哪个函数更合适。"}, # {"role": "assistant", "content": f"次级模型1建议使用函数:{function_call_1.name}"}, # {"role": "assistant", "content": f"次级模型2建议使用函数:{function_call_2.name}"} # ] # arbitration_response = get_openai_response(arbitration_messages, model="gpt-4o-mini") # if "模型1" in arbitration_response.content or function_call_1.name in arbitration_response.content: # final_function_call = function_call_1 # status_log.append(f"裁决模型:决定使用函数 {function_call_1.name}") # else: # final_function_call = function_call_2 # status_log.append(f"裁决模型:决定使用函数 {function_call_2.name}") # elif function_call_1: # final_function_call = function_call_1 # status_log.append(f"次级模型1:决定使用函数 {function_call_1.name}") # elif function_call_2: # final_function_call = function_call_2 # status_log.append(f"次级模型2:决定使用函数 {function_call_2.name}") # else: # status_log.append("次级模型:判断不需要进行搜索或发送邮件") # if final_function_call: # function_name = final_function_call.name # function_args = json.loads(final_function_call.arguments) # status_log.append(f"正在执行函数 {function_name}") # result = process_function_call(function_name, function_args) # status_log.append(f"函数 {function_name} 执行完成") # # 检查是否为邮件发送功能 # result_dict = json.loads(result) # if result_dict.get("is_email", False): # response = f"邮件{'已成功' if result_dict['success'] else '未能成功'}发送到 {result_dict['to']}。\n\n主题:{result_dict['subject']}\n\n内容:\n{result_dict['content']}" # else: # messages.append({ # "role": "function", # "name": function_name, # "content": result # }) # # 只有在没有邮件发送结果时才调用主模型 # if not response: # status_log.append("主模型:正在生成回答") # final_response = get_openai_response(messages, model="gpt-4o-mini") # response = final_response.content if final_response else "Error occurred" # status_log.append("主模型:回答生成完成") # return jsonify({ # "response": response, # "status_log": status_log # }) @app.route('/chat', methods=['POST']) def chat(): data = request.json question = data['question'] history = data.get('history', []) messages = history + [{"role": "user", "content": question}] status_log = [] # 次级模型1: 处理搜索相关函数 status_log.append("次级模型1:正在判断是否需要选调第一组函数") sub_model_1_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_1, function_call="auto") # 次级模型2: 处理邮件发送相关函数 status_log.append("次级模型2:正在判断是否需要选调第二组函数") sub_model_2_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_2, function_call="auto") function_call_1 = sub_model_1_response.function_call if sub_model_1_response and sub_model_1_response.function_call else None function_call_2 = sub_model_2_response.function_call if sub_model_2_response and sub_model_2_response.function_call else None if not function_call_1: status_log.append("次级模型1:判断不需要选调第一组函数") if not function_call_2: status_log.append("次级模型2:判断不需要选调第二组函数") final_function_call = None response = None if function_call_1 and function_call_2: # 裁决模型: 决定使用哪个函数调用 status_log.append("裁决模型:正在决定使用哪个函数调用") arbitration_messages = messages + [ {"role": "system", "content": "两个次级模型都建议使用函数。请决定使用哪个函数更合适。"}, {"role": "assistant", "content": f"次级模型1建议使用函数:{function_call_1.name}"}, {"role": "assistant", "content": f"次级模型2建议使用函数:{function_call_2.name}"} ] arbitration_response = get_openai_response(arbitration_messages, model="gpt-4o-mini") if "模型1" in arbitration_response.content or function_call_1.name in arbitration_response.content: final_function_call = function_call_1 status_log.append(f"裁决模型:决定使用函数 {function_call_1.name}") else: final_function_call = function_call_2 status_log.append(f"裁决模型:决定使用函数 {function_call_2.name}") elif function_call_1: final_function_call = function_call_1 status_log.append(f"次级模型1:决定使用函数 {function_call_1.name}") elif function_call_2: final_function_call = function_call_2 status_log.append(f"次级模型2:决定使用函数 {function_call_2.name}") else: status_log.append("所有次级模型:判断不需要进行任何函数调用") if final_function_call: function_name = final_function_call.name function_args = json.loads(final_function_call.arguments) status_log.append(f"正在执行函数 {function_name}") result = process_function_call(function_name, function_args) status_log.append(f"函数 {function_name} 执行完成") # 检查是否为邮件发送功能 result_dict = json.loads(result) if result_dict.get("is_email", False): response = f"邮件{'已成功' if result_dict['success'] else '未能成功'}发送到 {result_dict['to']}。\n\n主题:{result_dict['subject']}\n\n内容:\n{result_dict['content']}" else: messages.append({ "role": "function", "name": function_name, "content": result }) # 只有在没有邮件发送结果时才调用主模型 if not response: status_log.append("主模型:正在生成回答") final_response = get_openai_response(messages, model="gpt-4o-mini") response = final_response.content if final_response else "Error occurred" status_log.append("主模型:回答生成完成") return jsonify({ "response": response, "status_log": status_log }) @app.route('/settings', methods=['POST']) def update_settings(): data = request.json max_history = data.get('max_history', 10) return jsonify({"status": "success", "max_history": max_history}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)