Spaces:
Running
Running
import os | |
import json | |
import requests | |
import smtplib | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
from flask import Flask, request, jsonify, send_from_directory | |
from openai import OpenAI | |
from duckduckgo_search import DDGS | |
app = Flask(__name__) | |
API_KEY = os.getenv("OPENAI_API_KEY") | |
BASE_URL = os.getenv("OPENAI_BASE_URL") | |
emailkey = os.getenv("EMAIL_KEY") | |
client = OpenAI(api_key=API_KEY, base_url=BASE_URL) | |
FUNCTIONS = [ | |
{ | |
"name": "search_duckduckgo", | |
"description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"keywords": { | |
"type": "array", | |
"items": {"type": "string"}, | |
"description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。" | |
} | |
}, | |
"required": ["keywords"] | |
} | |
}, | |
{ | |
"name": "search_papers", | |
"description": "使用Crossref API搜索学术论文。", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "搜索查询字符串。例如:'climate change'。" | |
} | |
}, | |
"required": ["query"] | |
} | |
}, | |
{ | |
"name": "send_email", | |
"description": "发送电子邮件。", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"to": { | |
"type": "string", | |
"description": "收件人邮箱地址" | |
}, | |
"subject": { | |
"type": "string", | |
"description": "邮件主题" | |
}, | |
"content": { | |
"type": "string", | |
"description": "邮件内容" | |
} | |
}, | |
"required": ["to", "subject", "content"] | |
} | |
} | |
] | |
def search_duckduckgo(keywords): | |
search_term = " ".join(keywords) | |
with DDGS() as ddgs: | |
return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) | |
def search_papers(query): | |
url = f"https://api.crossref.org/works?query={query}" | |
response = requests.get(url) | |
if response.status_code == 200: | |
data = response.json() | |
papers = data['message']['items'] | |
processed_papers = [] | |
for paper in papers: | |
processed_paper = { | |
"标题": paper.get('title', [''])[0], | |
"作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]), | |
"DOI": paper.get('DOI', ''), | |
"ISBN": ", ".join(paper.get('ISBN', [])), | |
"摘要": paper.get('abstract', '').replace('<p>', '').replace('</p>', '').replace('<italic>', '').replace('</italic>', '') | |
} | |
processed_papers.append(processed_paper) | |
return processed_papers | |
else: | |
return [] | |
def send_email(to, subject, content): | |
try: | |
with smtplib.SMTP('106.15.184.28', 8025) as smtp: | |
smtp.login("jwt", emailkey) | |
message = MIMEMultipart() | |
message['From'] = "Me <[email protected]>" | |
message['To'] = to | |
message['Subject'] = subject | |
message.attach(MIMEText(content, 'html')) | |
smtp.sendmail("[email protected]", to, message.as_string()) | |
return True | |
except Exception as e: | |
print(f"发送邮件时出错: {str(e)}") | |
return False | |
def get_openai_response(messages, model="gpt-4o-mini", functions=None, function_call=None): | |
try: | |
response = client.chat.completions.create( | |
model=model, | |
messages=messages, | |
functions=functions, | |
function_call=function_call | |
) | |
return response.choices[0].message | |
except Exception as e: | |
print(f"调用OpenAI API时出错: {str(e)}") | |
return None | |
def process_function_call(response_message): | |
function_name = response_message.function_call.name | |
function_args = json.loads(response_message.function_call.arguments) | |
if function_name == "search_duckduckgo": | |
keywords = function_args.get('keywords', []) | |
if not keywords: | |
return None | |
return search_duckduckgo(keywords) | |
elif function_name == "search_papers": | |
query = function_args.get('query', '') | |
if not query: | |
return None | |
return search_papers(query) | |
elif function_name == "send_email": | |
to = function_args.get('to', '') | |
subject = function_args.get('subject', '') | |
content = function_args.get('content', '') | |
if not to or not subject or not content: | |
return None | |
success = send_email(to, subject, content) | |
return { | |
"success": success, | |
"to": to, | |
"subject": subject, | |
"content": content | |
} | |
else: | |
return None | |
def index(): | |
return send_from_directory('.', 'index.html') | |
def chat(): | |
data = request.json | |
question = data['question'] | |
history = data.get('history', []) | |
messages = history + [{"role": "user", "content": question}] | |
status_log = [] | |
# First model: Decide whether to use search or send email | |
status_log.append("次模型:正在判断是否需要进行搜索或发送邮件") | |
decision_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS, function_call="auto") | |
search_results = None | |
email_result = None | |
if decision_response and decision_response.function_call: | |
function_name = decision_response.function_call.name | |
status_log.append(f"次模型:判断需要{function_name}") | |
status_log.append(f"正在进行{function_name}") | |
result = process_function_call(decision_response) | |
if result: | |
if function_name == "search_papers": | |
search_results = result[:10] | |
elif function_name == "search_duckduckgo": | |
search_results = result | |
elif function_name == "send_email": | |
email_result = result | |
status_log.append(f"{function_name}完成") | |
else: | |
status_log.append("次模型:判断不需要进行搜索或发送邮件") | |
# Generate response based on the action taken | |
if email_result: | |
response = f"邮件{'已成功' if email_result['success'] else '未能成功'}发送到 {email_result['to']}。\n\n主题:{email_result['subject']}\n\n内容:\n{email_result['content']}" | |
elif search_results: | |
# Second model: Generate final response with search results | |
status_log.append("主模型:正在生成回答") | |
messages.append({ | |
"role": "assistant", | |
"content": decision_response.content if decision_response.content else f"I need to {function_name}." | |
}) | |
messages.append({ | |
"role": "function", | |
"name": function_name, | |
"content": json.dumps(search_results, ensure_ascii=False) | |
}) | |
final_response = get_openai_response(messages, model="gpt-4o-mini") | |
response = final_response.content if final_response else "Error occurred" | |
status_log.append("主模型:回答生成完成") | |
else: | |
# Second model: Generate final response without search results | |
status_log.append("主模型:正在生成回答") | |
final_response = get_openai_response(messages, model="gpt-4o-mini") | |
response = final_response.content if final_response else "Error occurred" | |
status_log.append("主模型:回答生成完成") | |
return jsonify({ | |
"response": response, | |
"search_used": bool(search_results), | |
"search_results": search_results, | |
"email_sent": email_result['success'] if email_result else False, | |
"status_log": status_log | |
}) | |
def update_settings(): | |
data = request.json | |
max_history = data.get('max_history', 10) | |
# 在这里可以添加其他设置项 | |
return jsonify({"status": "success", "max_history": max_history}) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) |