import os import json import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from flask import Flask, request, jsonify, send_from_directory from openai import OpenAI from duckduckgo_search import DDGS app = Flask(__name__) API_KEY = "sk-909Q1F68ZRwTJfT4Bc81BeE1317c4986A5067f347a2f97C9" BASE_URL = "https://api.oaipro.com/v1" client = OpenAI(api_key=API_KEY, base_url=BASE_URL) FUNCTIONS = [ { "name": "search_duckduckgo", "description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。", "parameters": { "type": "object", "properties": { "keywords": { "type": "array", "items": {"type": "string"}, "description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。" } }, "required": ["keywords"] } }, { "name": "search_papers", "description": "使用Crossref API搜索学术论文。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索查询字符串。例如:'climate change'。" } }, "required": ["query"] } }, { "name": "send_email", "description": "发送电子邮件。", "parameters": { "type": "object", "properties": { "to": { "type": "string", "description": "收件人邮箱地址" }, "subject": { "type": "string", "description": "邮件主题" }, "content": { "type": "string", "description": "邮件内容" } }, "required": ["to", "subject", "content"] } } ] def search_duckduckgo(keywords): search_term = " ".join(keywords) with DDGS() as ddgs: return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) def search_papers(query): url = f"https://api.crossref.org/works?query={query}" response = requests.get(url) if response.status_code == 200: data = response.json() papers = data['message']['items'] processed_papers = [] for paper in papers: processed_paper = { "标题": paper.get('title', [''])[0], "作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]), "DOI": paper.get('DOI', ''), "ISBN": ", ".join(paper.get('ISBN', [])), "摘要": paper.get('abstract', '').replace('

', '').replace('

', '').replace('', '').replace('', '') } processed_papers.append(processed_paper) return processed_papers else: return [] def send_email(to, subject, content): try: with smtplib.SMTP('106.15.184.28', 8025) as smtp: smtp.login("jwt", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZGRyZXNzIjoiYWl4aWFvQGFpeGlhby54eXoiLCJhZGRyZXNzX2lkIjoiMSJ9.CidcEUdbixWPkI0wwWJtcnUzfHtTJE3WMIDLHNAIIxo") message = MIMEMultipart() message['From'] = "Me " message['To'] = to message['Subject'] = subject message.attach(MIMEText(content, 'html')) smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string()) return True except Exception as e: print(f"发送邮件时出错: {str(e)}") return False def get_openai_response(messages, model="gpt-4o-mini", functions=None, function_call=None): try: response = client.chat.completions.create( model=model, messages=messages, functions=functions, function_call=function_call ) return response.choices[0].message except Exception as e: print(f"调用OpenAI API时出错: {str(e)}") return None def process_function_call(response_message): function_name = response_message.function_call.name function_args = json.loads(response_message.function_call.arguments) if function_name == "search_duckduckgo": keywords = function_args.get('keywords', []) if not keywords: return None return search_duckduckgo(keywords) elif function_name == "search_papers": query = function_args.get('query', '') if not query: return None return search_papers(query) elif function_name == "send_email": to = function_args.get('to', '') subject = function_args.get('subject', '') content = function_args.get('content', '') if not to or not subject or not content: return None success = send_email(to, subject, content) return { "success": success, "to": to, "subject": subject, "content": content } else: return None @app.route('/') def index(): return send_from_directory('.', 'index.html') @app.route('/chat', methods=['POST']) def chat(): data = request.json question = data['question'] history = data.get('history', []) messages = history + [{"role": "user", "content": question}] status_log = [] # First model: Decide whether to use search or send email status_log.append("次模型:正在判断是否需要进行搜索或发送邮件") decision_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS, function_call="auto") search_results = None email_result = None if decision_response and decision_response.function_call: function_name = decision_response.function_call.name status_log.append(f"次模型:判断需要{function_name}") status_log.append(f"正在进行{function_name}") result = process_function_call(decision_response) if result: if function_name == "search_papers": search_results = result[:10] elif function_name == "search_duckduckgo": search_results = result elif function_name == "send_email": email_result = result status_log.append(f"{function_name}完成") else: status_log.append("次模型:判断不需要进行搜索或发送邮件") # Generate response based on the action taken if email_result: response = f"邮件{'已成功' if email_result['success'] else '未能成功'}发送到 {email_result['to']}。\n\n主题:{email_result['subject']}\n\n内容:\n{email_result['content']}" elif search_results: # Second model: Generate final response with search results status_log.append("主模型:正在生成回答") messages.append({ "role": "assistant", "content": decision_response.content if decision_response.content else f"I need to {function_name}." }) messages.append({ "role": "function", "name": function_name, "content": json.dumps(search_results, ensure_ascii=False) }) final_response = get_openai_response(messages, model="gpt-4o-mini") response = final_response.content if final_response else "Error occurred" status_log.append("主模型:回答生成完成") else: # Second model: Generate final response without search results status_log.append("主模型:正在生成回答") final_response = get_openai_response(messages, model="gpt-4o-mini") response = final_response.content if final_response else "Error occurred" status_log.append("主模型:回答生成完成") return jsonify({ "response": response, "search_used": bool(search_results), "search_results": search_results, "email_sent": email_result['success'] if email_result else False, "status_log": status_log }) @app.route('/settings', methods=['POST']) def update_settings(): data = request.json max_history = data.get('max_history', 10) # 在这里可以添加其他设置项 return jsonify({"status": "success", "max_history": max_history}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)