wewe / app-切换模型,预备删除切换模型.py
mistpe's picture
Rename app.py to app-切换模型,预备删除切换模型.py
9e8762e verified
raw
history blame
5.28 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, request, make_response
import hashlib
import time
import xml.etree.ElementTree as ET
import os
from openai import OpenAI
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
app = Flask(__name__)
# 配置
TOKEN = os.getenv('TOKEN')
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
# 定义可用的模型列表
AVAILABLE_MODELS = {
'gpt-3.5-turbo': 'GPT-3.5 Turbo',
'gpt-4o': 'GPT-4o',
'gpt-4o-mini': 'GPT-4o-mini',
}
# 存储用户会话信息
user_sessions = {}
def verify_wechat(request):
# 获取微信服务器发送过来的参数
data = request.args
signature = data.get('signature')
timestamp = data.get('timestamp')
nonce = data.get('nonce')
echostr = data.get('echostr')
# 对参数进行字典排序,拼接字符串
temp = [timestamp, nonce, TOKEN]
temp.sort()
temp = ''.join(temp)
# 加密
if (hashlib.sha1(temp.encode('utf8')).hexdigest() == signature):
return echostr
else:
return 'error', 403
def getUserMessageContentFromXML(xml_content):
# 解析XML字符串
root = ET.fromstring(xml_content)
# 提取数据
content = root.find('Content').text
from_user_name = root.find('FromUserName').text
to_user_name = root.find('ToUserName').text
return content, from_user_name, to_user_name
def generate_response_xml(from_user_name, to_user_name, output_content):
output_xml = '''
<xml>
<ToUserName><![CDATA[%s]]></ToUserName>
<FromUserName><![CDATA[%s]]></FromUserName>
<CreateTime>%s</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[%s]]></Content>
</xml>'''
response = make_response(output_xml % (from_user_name, to_user_name, str(int(time.time())), output_content))
response.content_type = 'application/xml'
return response
def get_openai_response(messages, model="gpt-4o-mini"):
try:
response = client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content
except Exception as e:
print(f"调用OpenAI API时出错: {str(e)}")
return "抱歉,我遇到了一些问题,无法回答您的问题。"
def split_message(message, max_length=500):
return [message[i:i+max_length] for i in range(0, len(message), max_length)]
def list_available_models():
return "\n".join([f"{key}: {value}" for key, value in AVAILABLE_MODELS.items()])
@app.route('/api/wx', methods=['GET', 'POST'])
def wechatai():
if request.method == 'GET':
return verify_wechat(request)
else:
# 处理POST请求
print("user request data: ", request.data)
user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(request.data)
print("user message content: ", user_message_content)
if user_message_content.lower() == '/models':
response_content = f"可用的模型列表:\n{list_available_models()}\n\n使用 /model 模型名称 来切换模型"
elif user_message_content.lower().startswith('/model'):
model = user_message_content.split(' ')[1]
if model in AVAILABLE_MODELS:
user_sessions[from_user_name] = {'model': model, 'messages': [], 'pending_response': []}
response_content = f'模型已切换为 {AVAILABLE_MODELS[model]}'
else:
response_content = f'无效的模型名称。可用的模型有:\n{list_available_models()}'
elif user_message_content.lower() == '继续':
if from_user_name in user_sessions and user_sessions[from_user_name]['pending_response']:
response_content = user_sessions[from_user_name]['pending_response'].pop(0)
if user_sessions[from_user_name]['pending_response']:
response_content += '\n\n回复"继续"获取下一部分。'
else:
response_content += '\n\n回复结束。'
else:
response_content = "没有待发送的消息。"
else:
if from_user_name not in user_sessions:
user_sessions[from_user_name] = {'model': 'gpt-4o-mini', 'messages': [], 'pending_response': []}
session = user_sessions[from_user_name]
session['messages'].append({"role": "user", "content": user_message_content})
gpt_response = get_openai_response(session['messages'], session['model'])
session['messages'].append({"role": "assistant", "content": gpt_response})
response_parts = split_message(gpt_response)
if len(response_parts) > 1:
response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。'
session['pending_response'] = response_parts[1:]
else:
response_content = response_parts[0]
return generate_response_xml(from_user_name, to_user_name, response_content)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)