|
|
|
|
|
from flask import Flask, request, make_response |
|
import hashlib |
|
import time |
|
import xml.etree.ElementTree as ET |
|
import os |
|
import json |
|
from openai import OpenAI |
|
from dotenv import load_dotenv |
|
from duckduckgo_search import DDGS |
|
import requests |
|
import smtplib |
|
from email.mime.text import MIMEText |
|
from email.mime.multipart import MIMEMultipart |
|
|
|
|
|
load_dotenv() |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
TOKEN = os.getenv('WECHAT_TOKEN') |
|
API_KEY = os.getenv("OPENAI_API_KEY") |
|
BASE_URL = os.getenv("OPENAI_BASE_URL") |
|
emailkey = os.getenv("EMAIL_KEY") |
|
client = OpenAI(api_key=API_KEY, base_url=BASE_URL) |
|
|
|
|
|
AVAILABLE_MODELS = { |
|
'gpt-3.5-turbo': 'GPT-3.5 Turbo', |
|
'gpt-4': 'GPT-4', |
|
'gpt-4-turbo': 'GPT-4 Turbo', |
|
} |
|
|
|
|
|
user_sessions = {} |
|
|
|
|
|
def search_duckduckgo(keywords): |
|
search_term = " ".join(keywords) |
|
with DDGS() as ddgs: |
|
results = list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) |
|
return [{"title": result['title'], "body": result['body'].replace('\n', ' ')} for result in results] |
|
|
|
def search_papers(query): |
|
url = f"https://api.crossref.org/works?query={query}" |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
data = response.json() |
|
papers = data['message']['items'] |
|
processed_papers = [] |
|
for paper in papers: |
|
processed_paper = { |
|
"标题": paper.get('title', [''])[0], |
|
"作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]), |
|
"DOI": paper.get('DOI', ''), |
|
"摘要": paper.get('abstract', '').replace('<p>', '').replace('</p>', '').replace('<italic>', '*').replace('</italic>', '*') |
|
} |
|
processed_papers.append(processed_paper) |
|
return processed_papers |
|
else: |
|
return [] |
|
|
|
def send_email(to, subject, content): |
|
try: |
|
with smtplib.SMTP('106.15.184.28', 8025) as smtp: |
|
smtp.login("jwt", emailkey) |
|
message = MIMEMultipart() |
|
message['From'] = "Me <[email protected]>" |
|
message['To'] = to |
|
message['Subject'] = subject |
|
message.attach(MIMEText(content, 'html')) |
|
smtp.sendmail("[email protected]", to, message.as_string()) |
|
return True |
|
except Exception as e: |
|
print(f"发送邮件时出错: {str(e)}") |
|
return False |
|
|
|
|
|
FUNCTIONS = [ |
|
{ |
|
"name": "search_duckduckgo", |
|
"description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"keywords": { |
|
"type": "array", |
|
"items": {"type": "string"}, |
|
"description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。" |
|
} |
|
}, |
|
"required": ["keywords"] |
|
} |
|
}, |
|
{ |
|
"name": "search_papers", |
|
"description": "使用Crossref API搜索学术论文。", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "搜索查询字符串。例如:'climate change'。" |
|
} |
|
}, |
|
"required": ["query"] |
|
} |
|
}, |
|
{ |
|
"name": "send_email", |
|
"description": "发送电子邮件。", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"to": { |
|
"type": "string", |
|
"description": "收件人邮箱地址" |
|
}, |
|
"subject": { |
|
"type": "string", |
|
"description": "邮件主题" |
|
}, |
|
"content": { |
|
"type": "string", |
|
"description": "邮件内容" |
|
} |
|
}, |
|
"required": ["to", "subject", "content"] |
|
} |
|
} |
|
] |
|
|
|
def verify_wechat(request): |
|
|
|
data = request.args |
|
signature = data.get('signature') |
|
timestamp = data.get('timestamp') |
|
nonce = data.get('nonce') |
|
echostr = data.get('echostr') |
|
|
|
|
|
temp = [timestamp, nonce, TOKEN] |
|
temp.sort() |
|
temp = ''.join(temp) |
|
|
|
|
|
if (hashlib.sha1(temp.encode('utf8')).hexdigest() == signature): |
|
return echostr |
|
else: |
|
return 'error', 403 |
|
|
|
def getUserMessageContentFromXML(xml_content): |
|
|
|
root = ET.fromstring(xml_content) |
|
|
|
content = root.find('Content').text |
|
from_user_name = root.find('FromUserName').text |
|
to_user_name = root.find('ToUserName').text |
|
return content, from_user_name, to_user_name |
|
|
|
def generate_response_xml(from_user_name, to_user_name, output_content): |
|
output_xml = ''' |
|
<xml> |
|
<ToUserName><![CDATA[%s]]></ToUserName> |
|
<FromUserName><![CDATA[%s]]></FromUserName> |
|
<CreateTime>%s</CreateTime> |
|
<MsgType><![CDATA[text]]></MsgType> |
|
<Content><![CDATA[%s]]></Content> |
|
</xml>''' |
|
|
|
response = make_response(output_xml % (from_user_name, to_user_name, str(int(time.time())), output_content)) |
|
response.content_type = 'application/xml' |
|
return response |
|
|
|
def get_openai_response(messages, model="gpt-3.5-turbo", functions=None, function_call=None): |
|
try: |
|
response = client.chat.completions.create( |
|
model=model, |
|
messages=messages, |
|
functions=functions, |
|
function_call=function_call |
|
) |
|
return response.choices[0].message |
|
except Exception as e: |
|
print(f"调用OpenAI API时出错: {str(e)}") |
|
return None |
|
|
|
def process_function_call(function_name, function_args): |
|
if function_name == "search_duckduckgo": |
|
keywords = function_args.get('keywords', []) |
|
if not keywords: |
|
return "搜索关键词为空,无法执行搜索。" |
|
return search_duckduckgo(keywords) |
|
elif function_name == "search_papers": |
|
query = function_args.get('query', '') |
|
if not query: |
|
return "搜索查询为空,无法执行论文搜索。" |
|
return search_papers(query) |
|
elif function_name == "send_email": |
|
to = function_args.get('to', '') |
|
subject = function_args.get('subject', '') |
|
content = function_args.get('content', '') |
|
if not to or not subject or not content: |
|
return "邮件信息不完整,无法发送邮件。" |
|
success = send_email(to, subject, content) |
|
return { |
|
"success": success, |
|
"message": "邮件发送成功" if success else "邮件发送失败", |
|
"to": to, |
|
"subject": subject, |
|
"content": content, |
|
"is_email": True |
|
} |
|
else: |
|
return "未知的函数调用。" |
|
|
|
def split_message(message, max_length=500): |
|
return [message[i:i+max_length] for i in range(0, len(message), max_length)] |
|
|
|
@app.route('/api/wx', methods=['GET', 'POST']) |
|
def wechatai(): |
|
if request.method == 'GET': |
|
return verify_wechat(request) |
|
else: |
|
|
|
xml_str = request.data |
|
if not xml_str: |
|
return "" |
|
|
|
user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(xml_str) |
|
|
|
if from_user_name not in user_sessions: |
|
user_sessions[from_user_name] = {'model': 'gpt-3.5-turbo', 'messages': [], 'pending_response': []} |
|
|
|
session = user_sessions[from_user_name] |
|
|
|
if user_message_content.lower() == '/models': |
|
response_content = f"可用的模型列表:\n{list_available_models()}\n\n使用 /model 模型名称 来切换模型" |
|
return generate_response_xml(from_user_name, to_user_name, response_content) |
|
elif user_message_content.lower().startswith('/model'): |
|
model = user_message_content.split(' ')[1] |
|
if model in AVAILABLE_MODELS: |
|
session['model'] = model |
|
response_content = f'模型已切换为 {AVAILABLE_MODELS[model]}' |
|
else: |
|
response_content = f'无效的模型名称。可用的模型有:\n{list_available_models()}' |
|
return generate_response_xml(from_user_name, to_user_name, response_content) |
|
elif user_message_content.lower() == '继续': |
|
if session['pending_response']: |
|
response_content = session['pending_response'].pop(0) |
|
if session['pending_response']: |
|
response_content += '\n\n回复"继续"获取下一部分。' |
|
else: |
|
response_content += '\n\n回复结束。' |
|
else: |
|
response_content = "没有待发送的消息。" |
|
return generate_response_xml(from_user_name, to_user_name, response_content) |
|
|
|
session['messages'].append({"role": "user", "content": user_message_content}) |
|
|
|
|
|
ai_response = get_openai_response(session['messages'], model=session['model'], functions=FUNCTIONS, function_call="auto") |
|
|
|
if ai_response.function_call: |
|
function_name = ai_response.function_call.name |
|
function_args = json.loads(ai_response.function_call.arguments) |
|
function_result = process_function_call(function_name, function_args) |
|
|
|
session['messages'].append(ai_response.model_dump()) |
|
session['messages'].append({ |
|
"role": "function", |
|
"name": function_name, |
|
"content": json.dumps(function_result, ensure_ascii=False) |
|
}) |
|
|
|
final_response = get_openai_response(session['messages'], model=session['model']) |
|
response_content = final_response.content |
|
else: |
|
response_content = ai_response.content |
|
|
|
session['messages'].append({"role": "assistant", "content": response_content}) |
|
|
|
|
|
response_parts = split_message(response_content) |
|
if len(response_parts) > 1: |
|
session['pending_response'] = response_parts[1:] |
|
response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。' |
|
|
|
return generate_response_xml(from_user_name, to_user_name, response_content) |
|
|
|
def list_available_models(): |
|
return "\n".join([f"{key}: {value}" for key, value in AVAILABLE_MODELS.items()]) |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=7860, debug=True) |