import os
import json
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from flask import Flask, request, jsonify, send_from_directory
from openai import OpenAI
from duckduckgo_search import DDGS
app = Flask(__name__)
API_KEY = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
emailkey = os.getenv("EMAIL_KEY")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
FUNCTIONS = [
{
"name": "search_duckduckgo",
"description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。",
"parameters": {
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。"
}
},
"required": ["keywords"]
}
},
{
"name": "search_papers",
"description": "使用Crossref API搜索学术论文。",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询字符串。例如:'climate change'。"
}
},
"required": ["query"]
}
},
{
"name": "send_email",
"description": "发送电子邮件。",
"parameters": {
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "收件人邮箱地址"
},
"subject": {
"type": "string",
"description": "邮件主题"
},
"content": {
"type": "string",
"description": "邮件内容"
}
},
"required": ["to", "subject", "content"]
}
}
]
def search_duckduckgo(keywords):
search_term = " ".join(keywords)
with DDGS() as ddgs:
return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5))
def search_papers(query):
url = f"https://api.crossref.org/works?query={query}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
papers = data['message']['items']
processed_papers = []
for paper in papers:
processed_paper = {
"标题": paper.get('title', [''])[0],
"作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]),
"DOI": paper.get('DOI', ''),
"ISBN": ", ".join(paper.get('ISBN', [])),
"摘要": paper.get('abstract', '').replace('
', '').replace('
', '').replace('', '').replace('', '')
}
processed_papers.append(processed_paper)
return processed_papers
else:
return []
def send_email(to, subject, content):
try:
with smtplib.SMTP('106.15.184.28', 8025) as smtp:
smtp.login("jwt", emailkey)
message = MIMEMultipart()
message['From'] = "Me "
message['To'] = to
message['Subject'] = subject
message.attach(MIMEText(content, 'html'))
smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string())
return True
except Exception as e:
print(f"发送邮件时出错: {str(e)}")
return False
def get_openai_response(messages, model="gpt-4o-mini", functions=None, function_call=None):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
functions=functions,
function_call=function_call
)
return response.choices[0].message
except Exception as e:
print(f"调用OpenAI API时出错: {str(e)}")
return None
def process_function_call(response_message):
function_name = response_message.function_call.name
function_args = json.loads(response_message.function_call.arguments)
if function_name == "search_duckduckgo":
keywords = function_args.get('keywords', [])
if not keywords:
return None
return search_duckduckgo(keywords)
elif function_name == "search_papers":
query = function_args.get('query', '')
if not query:
return None
return search_papers(query)
elif function_name == "send_email":
to = function_args.get('to', '')
subject = function_args.get('subject', '')
content = function_args.get('content', '')
if not to or not subject or not content:
return None
success = send_email(to, subject, content)
return {
"success": success,
"to": to,
"subject": subject,
"content": content
}
else:
return None
@app.route('/')
def index():
return send_from_directory('.', 'index.html')
@app.route('/chat', methods=['POST'])
def chat():
data = request.json
question = data['question']
history = data.get('history', [])
messages = history + [{"role": "user", "content": question}]
status_log = []
# First model: Decide whether to use search or send email
status_log.append("次模型:正在判断是否需要进行搜索或发送邮件")
decision_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS, function_call="auto")
search_results = None
email_result = None
if decision_response and decision_response.function_call:
function_name = decision_response.function_call.name
status_log.append(f"次模型:判断需要{function_name}")
status_log.append(f"正在进行{function_name}")
result = process_function_call(decision_response)
if result:
if function_name == "search_papers":
search_results = result[:10]
elif function_name == "search_duckduckgo":
search_results = result
elif function_name == "send_email":
email_result = result
status_log.append(f"{function_name}完成")
else:
status_log.append("次模型:判断不需要进行搜索或发送邮件")
# Generate response based on the action taken
if email_result:
response = f"邮件{'已成功' if email_result['success'] else '未能成功'}发送到 {email_result['to']}。\n\n主题:{email_result['subject']}\n\n内容:\n{email_result['content']}"
elif search_results:
# Second model: Generate final response with search results
status_log.append("主模型:正在生成回答")
messages.append({
"role": "assistant",
"content": decision_response.content if decision_response.content else f"I need to {function_name}."
})
messages.append({
"role": "function",
"name": function_name,
"content": json.dumps(search_results, ensure_ascii=False)
})
final_response = get_openai_response(messages, model="gpt-4o-mini")
response = final_response.content if final_response else "Error occurred"
status_log.append("主模型:回答生成完成")
else:
# Second model: Generate final response without search results
status_log.append("主模型:正在生成回答")
final_response = get_openai_response(messages, model="gpt-4o-mini")
response = final_response.content if final_response else "Error occurred"
status_log.append("主模型:回答生成完成")
return jsonify({
"response": response,
"search_used": bool(search_results),
"search_results": search_results,
"email_sent": email_result['success'] if email_result else False,
"status_log": status_log
})
@app.route('/settings', methods=['POST'])
def update_settings():
data = request.json
max_history = data.get('max_history', 10)
# 在这里可以添加其他设置项
return jsonify({"status": "success", "max_history": max_history})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)