#!/usr/bin/env python # -*- coding: utf-8 -*- from flask import Flask, request, make_response import hashlib import time import xml.etree.ElementTree as ET import os import json from openai import OpenAI from dotenv import load_dotenv from duckduckgo_search import DDGS import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # 加载环境变量 load_dotenv() app = Flask(__name__) # 配置 TOKEN = os.getenv('WECHAT_TOKEN') API_KEY = os.getenv("OPENAI_API_KEY") BASE_URL = os.getenv("OPENAI_BASE_URL") emailkey = os.getenv("EMAIL_KEY") client = OpenAI(api_key=API_KEY, base_url=BASE_URL) # 定义可用的模型列表 AVAILABLE_MODELS = { 'gpt-3.5-turbo': 'GPT-3.5 Turbo', 'gpt-4': 'GPT-4', 'gpt-4-turbo': 'GPT-4 Turbo', } # 存储用户会话信息 user_sessions = {} # 定义函数 def search_duckduckgo(keywords): search_term = " ".join(keywords) with DDGS() as ddgs: results = list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) return [{"title": result['title'], "body": result['body'].replace('\n', ' ')} for result in results] def search_papers(query): url = f"https://api.crossref.org/works?query={query}" response = requests.get(url) if response.status_code == 200: data = response.json() papers = data['message']['items'] processed_papers = [] for paper in papers: processed_paper = { "标题": paper.get('title', [''])[0], "作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]), "DOI": paper.get('DOI', ''), "摘要": paper.get('abstract', '').replace('

', '').replace('

', '').replace('', '*').replace('', '*') } processed_papers.append(processed_paper) return processed_papers else: return [] def send_email(to, subject, content): try: with smtplib.SMTP('106.15.184.28', 8025) as smtp: smtp.login("jwt", emailkey) message = MIMEMultipart() message['From'] = "Me " message['To'] = to message['Subject'] = subject message.attach(MIMEText(content, 'html')) smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string()) return True except Exception as e: print(f"发送邮件时出错: {str(e)}") return False # 定义函数列表 FUNCTIONS = [ { "name": "search_duckduckgo", "description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。", "parameters": { "type": "object", "properties": { "keywords": { "type": "array", "items": {"type": "string"}, "description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。" } }, "required": ["keywords"] } }, { "name": "search_papers", "description": "使用Crossref API搜索学术论文。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索查询字符串。例如:'climate change'。" } }, "required": ["query"] } }, { "name": "send_email", "description": "发送电子邮件。", "parameters": { "type": "object", "properties": { "to": { "type": "string", "description": "收件人邮箱地址" }, "subject": { "type": "string", "description": "邮件主题" }, "content": { "type": "string", "description": "邮件内容" } }, "required": ["to", "subject", "content"] } } ] def verify_wechat(request): # 获取微信服务器发送过来的参数 data = request.args signature = data.get('signature') timestamp = data.get('timestamp') nonce = data.get('nonce') echostr = data.get('echostr') # 对参数进行字典排序,拼接字符串 temp = [timestamp, nonce, TOKEN] temp.sort() temp = ''.join(temp) # 加密 if (hashlib.sha1(temp.encode('utf8')).hexdigest() == signature): return echostr else: return 'error', 403 def getUserMessageContentFromXML(xml_content): # 解析XML字符串 root = ET.fromstring(xml_content) # 提取数据 content = root.find('Content').text from_user_name = root.find('FromUserName').text to_user_name = root.find('ToUserName').text return content, from_user_name, to_user_name def generate_response_xml(from_user_name, to_user_name, output_content): output_xml = ''' %s ''' response = make_response(output_xml % (from_user_name, to_user_name, str(int(time.time())), output_content)) response.content_type = 'application/xml' return response def get_openai_response(messages, model="gpt-3.5-turbo", functions=None, function_call=None): try: response = client.chat.completions.create( model=model, messages=messages, functions=functions, function_call=function_call ) return response.choices[0].message except Exception as e: print(f"调用OpenAI API时出错: {str(e)}") return None def process_function_call(function_name, function_args): if function_name == "search_duckduckgo": keywords = function_args.get('keywords', []) if not keywords: return "搜索关键词为空,无法执行搜索。" return search_duckduckgo(keywords) elif function_name == "search_papers": query = function_args.get('query', '') if not query: return "搜索查询为空,无法执行论文搜索。" return search_papers(query) elif function_name == "send_email": to = function_args.get('to', '') subject = function_args.get('subject', '') content = function_args.get('content', '') if not to or not subject or not content: return "邮件信息不完整,无法发送邮件。" success = send_email(to, subject, content) return { "success": success, "message": "邮件发送成功" if success else "邮件发送失败", "to": to, "subject": subject, "content": content, "is_email": True } else: return "未知的函数调用。" def split_message(message, max_length=500): return [message[i:i+max_length] for i in range(0, len(message), max_length)] @app.route('/api/wx', methods=['GET', 'POST']) def wechatai(): if request.method == 'GET': return verify_wechat(request) else: # 处理POST请求 xml_str = request.data if not xml_str: return "" user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(xml_str) if from_user_name not in user_sessions: user_sessions[from_user_name] = {'model': 'gpt-3.5-turbo', 'messages': [], 'pending_response': []} session = user_sessions[from_user_name] if user_message_content.lower() == '/models': response_content = f"可用的模型列表:\n{list_available_models()}\n\n使用 /model 模型名称 来切换模型" return generate_response_xml(from_user_name, to_user_name, response_content) elif user_message_content.lower().startswith('/model'): model = user_message_content.split(' ')[1] if model in AVAILABLE_MODELS: session['model'] = model response_content = f'模型已切换为 {AVAILABLE_MODELS[model]}' else: response_content = f'无效的模型名称。可用的模型有:\n{list_available_models()}' return generate_response_xml(from_user_name, to_user_name, response_content) elif user_message_content.lower() == '继续': if session['pending_response']: response_content = session['pending_response'].pop(0) if session['pending_response']: response_content += '\n\n回复"继续"获取下一部分。' else: response_content += '\n\n回复结束。' else: response_content = "没有待发送的消息。" return generate_response_xml(from_user_name, to_user_name, response_content) session['messages'].append({"role": "user", "content": user_message_content}) # 调用OpenAI API ai_response = get_openai_response(session['messages'], model=session['model'], functions=FUNCTIONS, function_call="auto") if ai_response.function_call: function_name = ai_response.function_call.name function_args = json.loads(ai_response.function_call.arguments) function_result = process_function_call(function_name, function_args) session['messages'].append(ai_response.model_dump()) session['messages'].append({ "role": "function", "name": function_name, "content": json.dumps(function_result, ensure_ascii=False) }) final_response = get_openai_response(session['messages'], model=session['model']) response_content = final_response.content else: response_content = ai_response.content session['messages'].append({"role": "assistant", "content": response_content}) # 处理长消息 response_parts = split_message(response_content) if len(response_parts) > 1: session['pending_response'] = response_parts[1:] response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。' return generate_response_xml(from_user_name, to_user_name, response_content) def list_available_models(): return "\n".join([f"{key}: {value}" for key, value in AVAILABLE_MODELS.items()]) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)