wewe / app.py
mistpe's picture
Update app.py
8008fbb verified
raw
history blame
11 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, request, make_response
import hashlib
import time
import xml.etree.ElementTree as ET
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from duckduckgo_search import DDGS
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# 加载环境变量
load_dotenv()
app = Flask(__name__)
# 配置
TOKEN = os.getenv('WECHAT_TOKEN')
API_KEY = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
emailkey = os.getenv("EMAIL_KEY")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
# 定义可用的模型列表
AVAILABLE_MODELS = {
'gpt-3.5-turbo': 'GPT-3.5 Turbo',
'gpt-4': 'GPT-4',
'gpt-4-turbo': 'GPT-4 Turbo',
}
# 存储用户会话信息
user_sessions = {}
# 定义函数
def search_duckduckgo(keywords):
search_term = " ".join(keywords)
with DDGS() as ddgs:
results = list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5))
return [{"title": result['title'], "body": result['body'].replace('\n', ' ')} for result in results]
def search_papers(query):
url = f"https://api.crossref.org/works?query={query}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
papers = data['message']['items']
processed_papers = []
for paper in papers:
processed_paper = {
"标题": paper.get('title', [''])[0],
"作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]),
"DOI": paper.get('DOI', ''),
"摘要": paper.get('abstract', '').replace('<p>', '').replace('</p>', '').replace('<italic>', '*').replace('</italic>', '*')
}
processed_papers.append(processed_paper)
return processed_papers
else:
return []
def send_email(to, subject, content):
try:
with smtplib.SMTP('106.15.184.28', 8025) as smtp:
smtp.login("jwt", emailkey)
message = MIMEMultipart()
message['From'] = "Me <[email protected]>"
message['To'] = to
message['Subject'] = subject
message.attach(MIMEText(content, 'html'))
smtp.sendmail("[email protected]", to, message.as_string())
return True
except Exception as e:
print(f"发送邮件时出错: {str(e)}")
return False
# 定义函数列表
FUNCTIONS = [
{
"name": "search_duckduckgo",
"description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。",
"parameters": {
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。"
}
},
"required": ["keywords"]
}
},
{
"name": "search_papers",
"description": "使用Crossref API搜索学术论文。",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询字符串。例如:'climate change'。"
}
},
"required": ["query"]
}
},
{
"name": "send_email",
"description": "发送电子邮件。",
"parameters": {
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "收件人邮箱地址"
},
"subject": {
"type": "string",
"description": "邮件主题"
},
"content": {
"type": "string",
"description": "邮件内容"
}
},
"required": ["to", "subject", "content"]
}
}
]
def verify_wechat(request):
# 获取微信服务器发送过来的参数
data = request.args
signature = data.get('signature')
timestamp = data.get('timestamp')
nonce = data.get('nonce')
echostr = data.get('echostr')
# 对参数进行字典排序,拼接字符串
temp = [timestamp, nonce, TOKEN]
temp.sort()
temp = ''.join(temp)
# 加密
if (hashlib.sha1(temp.encode('utf8')).hexdigest() == signature):
return echostr
else:
return 'error', 403
def getUserMessageContentFromXML(xml_content):
# 解析XML字符串
root = ET.fromstring(xml_content)
# 提取数据
content = root.find('Content').text
from_user_name = root.find('FromUserName').text
to_user_name = root.find('ToUserName').text
return content, from_user_name, to_user_name
def generate_response_xml(from_user_name, to_user_name, output_content):
output_xml = '''
<xml>
<ToUserName><![CDATA[%s]]></ToUserName>
<FromUserName><![CDATA[%s]]></FromUserName>
<CreateTime>%s</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[%s]]></Content>
</xml>'''
response = make_response(output_xml % (from_user_name, to_user_name, str(int(time.time())), output_content))
response.content_type = 'application/xml'
return response
def get_openai_response(messages, model="gpt-3.5-turbo", functions=None, function_call=None):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
functions=functions,
function_call=function_call
)
return response.choices[0].message
except Exception as e:
print(f"调用OpenAI API时出错: {str(e)}")
return None
def process_function_call(function_name, function_args):
if function_name == "search_duckduckgo":
keywords = function_args.get('keywords', [])
if not keywords:
return "搜索关键词为空,无法执行搜索。"
return search_duckduckgo(keywords)
elif function_name == "search_papers":
query = function_args.get('query', '')
if not query:
return "搜索查询为空,无法执行论文搜索。"
return search_papers(query)
elif function_name == "send_email":
to = function_args.get('to', '')
subject = function_args.get('subject', '')
content = function_args.get('content', '')
if not to or not subject or not content:
return "邮件信息不完整,无法发送邮件。"
success = send_email(to, subject, content)
return {
"success": success,
"message": "邮件发送成功" if success else "邮件发送失败",
"to": to,
"subject": subject,
"content": content,
"is_email": True
}
else:
return "未知的函数调用。"
def split_message(message, max_length=500):
return [message[i:i+max_length] for i in range(0, len(message), max_length)]
@app.route('/api/wx', methods=['GET', 'POST'])
def wechatai():
if request.method == 'GET':
return verify_wechat(request)
else:
# 处理POST请求
xml_str = request.data
if not xml_str:
return ""
user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(xml_str)
if from_user_name not in user_sessions:
user_sessions[from_user_name] = {'model': 'gpt-3.5-turbo', 'messages': [], 'pending_response': []}
session = user_sessions[from_user_name]
if user_message_content.lower() == '/models':
response_content = f"可用的模型列表:\n{list_available_models()}\n\n使用 /model 模型名称 来切换模型"
return generate_response_xml(from_user_name, to_user_name, response_content)
elif user_message_content.lower().startswith('/model'):
model = user_message_content.split(' ')[1]
if model in AVAILABLE_MODELS:
session['model'] = model
response_content = f'模型已切换为 {AVAILABLE_MODELS[model]}'
else:
response_content = f'无效的模型名称。可用的模型有:\n{list_available_models()}'
return generate_response_xml(from_user_name, to_user_name, response_content)
elif user_message_content.lower() == '继续':
if session['pending_response']:
response_content = session['pending_response'].pop(0)
if session['pending_response']:
response_content += '\n\n回复"继续"获取下一部分。'
else:
response_content += '\n\n回复结束。'
else:
response_content = "没有待发送的消息。"
return generate_response_xml(from_user_name, to_user_name, response_content)
session['messages'].append({"role": "user", "content": user_message_content})
# 调用OpenAI API
ai_response = get_openai_response(session['messages'], model=session['model'], functions=FUNCTIONS, function_call="auto")
if ai_response.function_call:
function_name = ai_response.function_call.name
function_args = json.loads(ai_response.function_call.arguments)
function_result = process_function_call(function_name, function_args)
session['messages'].append(ai_response.model_dump())
session['messages'].append({
"role": "function",
"name": function_name,
"content": json.dumps(function_result, ensure_ascii=False)
})
final_response = get_openai_response(session['messages'], model=session['model'])
response_content = final_response.content
else:
response_content = ai_response.content
session['messages'].append({"role": "assistant", "content": response_content})
# 处理长消息
response_parts = split_message(response_content)
if len(response_parts) > 1:
session['pending_response'] = response_parts[1:]
response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。'
return generate_response_xml(from_user_name, to_user_name, response_content)
def list_available_models():
return "\n".join([f"{key}: {value}" for key, value in AVAILABLE_MODELS.items()])
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)