functioncall / app.py
mistpe's picture
Upload 4 files
fe4533e verified
raw
history blame
8.68 kB
import os
import json
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from flask import Flask, request, jsonify, send_from_directory
from openai import OpenAI
from duckduckgo_search import DDGS
app = Flask(__name__)
API_KEY = "sk-909Q1F68ZRwTJfT4Bc81BeE1317c4986A5067f347a2f97C9"
BASE_URL = "https://api.oaipro.com/v1"
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
FUNCTIONS = [
{
"name": "search_duckduckgo",
"description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。",
"parameters": {
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。"
}
},
"required": ["keywords"]
}
},
{
"name": "search_papers",
"description": "使用Crossref API搜索学术论文。",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询字符串。例如:'climate change'。"
}
},
"required": ["query"]
}
},
{
"name": "send_email",
"description": "发送电子邮件。",
"parameters": {
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "收件人邮箱地址"
},
"subject": {
"type": "string",
"description": "邮件主题"
},
"content": {
"type": "string",
"description": "邮件内容"
}
},
"required": ["to", "subject", "content"]
}
}
]
def search_duckduckgo(keywords):
search_term = " ".join(keywords)
with DDGS() as ddgs:
return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5))
def search_papers(query):
url = f"https://api.crossref.org/works?query={query}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
papers = data['message']['items']
processed_papers = []
for paper in papers:
processed_paper = {
"标题": paper.get('title', [''])[0],
"作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]),
"DOI": paper.get('DOI', ''),
"ISBN": ", ".join(paper.get('ISBN', [])),
"摘要": paper.get('abstract', '').replace('<p>', '').replace('</p>', '').replace('<italic>', '').replace('</italic>', '')
}
processed_papers.append(processed_paper)
return processed_papers
else:
return []
def send_email(to, subject, content):
try:
with smtplib.SMTP('106.15.184.28', 8025) as smtp:
smtp.login("jwt", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZGRyZXNzIjoiYWl4aWFvQGFpeGlhby54eXoiLCJhZGRyZXNzX2lkIjoiMSJ9.CidcEUdbixWPkI0wwWJtcnUzfHtTJE3WMIDLHNAIIxo")
message = MIMEMultipart()
message['From'] = "Me <[email protected]>"
message['To'] = to
message['Subject'] = subject
message.attach(MIMEText(content, 'html'))
smtp.sendmail("[email protected]", to, message.as_string())
return True
except Exception as e:
print(f"发送邮件时出错: {str(e)}")
return False
def get_openai_response(messages, model="gpt-4o-mini", functions=None, function_call=None):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
functions=functions,
function_call=function_call
)
return response.choices[0].message
except Exception as e:
print(f"调用OpenAI API时出错: {str(e)}")
return None
def process_function_call(response_message):
function_name = response_message.function_call.name
function_args = json.loads(response_message.function_call.arguments)
if function_name == "search_duckduckgo":
keywords = function_args.get('keywords', [])
if not keywords:
return None
return search_duckduckgo(keywords)
elif function_name == "search_papers":
query = function_args.get('query', '')
if not query:
return None
return search_papers(query)
elif function_name == "send_email":
to = function_args.get('to', '')
subject = function_args.get('subject', '')
content = function_args.get('content', '')
if not to or not subject or not content:
return None
success = send_email(to, subject, content)
return {
"success": success,
"to": to,
"subject": subject,
"content": content
}
else:
return None
@app.route('/')
def index():
return send_from_directory('.', 'index.html')
@app.route('/chat', methods=['POST'])
def chat():
data = request.json
question = data['question']
history = data.get('history', [])
messages = history + [{"role": "user", "content": question}]
status_log = []
# First model: Decide whether to use search or send email
status_log.append("次模型:正在判断是否需要进行搜索或发送邮件")
decision_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS, function_call="auto")
search_results = None
email_result = None
if decision_response and decision_response.function_call:
function_name = decision_response.function_call.name
status_log.append(f"次模型:判断需要{function_name}")
status_log.append(f"正在进行{function_name}")
result = process_function_call(decision_response)
if result:
if function_name == "search_papers":
search_results = result[:10]
elif function_name == "search_duckduckgo":
search_results = result
elif function_name == "send_email":
email_result = result
status_log.append(f"{function_name}完成")
else:
status_log.append("次模型:判断不需要进行搜索或发送邮件")
# Generate response based on the action taken
if email_result:
response = f"邮件{'已成功' if email_result['success'] else '未能成功'}发送到 {email_result['to']}。\n\n主题:{email_result['subject']}\n\n内容:\n{email_result['content']}"
elif search_results:
# Second model: Generate final response with search results
status_log.append("主模型:正在生成回答")
messages.append({
"role": "assistant",
"content": decision_response.content if decision_response.content else f"I need to {function_name}."
})
messages.append({
"role": "function",
"name": function_name,
"content": json.dumps(search_results, ensure_ascii=False)
})
final_response = get_openai_response(messages, model="gpt-4o-mini")
response = final_response.content if final_response else "Error occurred"
status_log.append("主模型:回答生成完成")
else:
# Second model: Generate final response without search results
status_log.append("主模型:正在生成回答")
final_response = get_openai_response(messages, model="gpt-4o-mini")
response = final_response.content if final_response else "Error occurred"
status_log.append("主模型:回答生成完成")
return jsonify({
"response": response,
"search_used": bool(search_results),
"search_results": search_results,
"email_sent": email_result['success'] if email_result else False,
"status_log": status_log
})
@app.route('/settings', methods=['POST'])
def update_settings():
data = request.json
max_history = data.get('max_history', 10)
# 在这里可以添加其他设置项
return jsonify({"status": "success", "max_history": max_history})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)