#!/usr/bin/env python # -*- coding: utf-8 -*- from flask import Flask, request, make_response import hashlib import time import xml.etree.ElementTree as ET import os import json from openai import OpenAI from dotenv import load_dotenv from duckduckgo_search import DDGS import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # 加载环境变量 load_dotenv() app = Flask(__name__) # 配置 TOKEN = os.getenv('TOKEN') API_KEY = os.getenv("API_KEY") BASE_URL = os.getenv("OPENAI_BASE_URL") emailkey = os.getenv("EMAIL_KEY") client = OpenAI(api_key=API_KEY, base_url=BASE_URL) # 存储用户会话信息 user_sessions = {} FUNCTIONS = [ { "name": "search_duckduckgo", "description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。", "parameters": { "type": "object", "properties": { "keywords": { "type": "array", "items": {"type": "string"}, "description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。" } }, "required": ["keywords"] } }, { "name": "search_papers", "description": "使用Crossref API搜索学术论文。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索查询字符串。例如:'climate change'。" } }, "required": ["query"] } }, { "name": "send_email", "description": "发送电子邮件。", "parameters": { "type": "object", "properties": { "to": { "type": "string", "description": "收件人邮箱地址" }, "subject": { "type": "string", "description": "邮件主题" }, "content": { "type": "string", "description": "邮件内容" } }, "required": ["to", "subject", "content"] } } ] def verify_wechat(request): data = request.args signature = data.get('signature') timestamp = data.get('timestamp') nonce = data.get('nonce') echostr = data.get('echostr') temp = [timestamp, nonce, TOKEN] temp.sort() temp = ''.join(temp) if (hashlib.sha1(temp.encode('utf8')).hexdigest() == signature): return echostr else: return 'error', 403 def getUserMessageContentFromXML(xml_content): root = ET.fromstring(xml_content) content = root.find('Content').text from_user_name = root.find('FromUserName').text to_user_name = root.find('ToUserName').text return content, from_user_name, to_user_name def generate_response_xml(from_user_name, to_user_name, output_content): output_xml = ''' %s ''' response = make_response(output_xml % (from_user_name, to_user_name, str(int(time.time())), output_content)) response.content_type = 'application/xml' return response def get_openai_response(messages): try: tools = [{"type": "function", "function": func} for func in FUNCTIONS] response = client.chat.completions.create( model="gpt-4o-mini", messages=messages, tools=tools, tool_choice="auto" ) return response.choices[0].message except Exception as e: print(f"调用OpenAI API时出错: {str(e)}") return None def split_message(message, max_length=500): return [message[i:i+max_length] for i in range(0, len(message), max_length)] def search_duckduckgo(keywords): search_term = " ".join(keywords) with DDGS() as ddgs: return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) def search_papers(query): url = f"https://api.crossref.org/works?query={query}" response = requests.get(url) if response.status_code == 200: data = response.json() papers = data['message']['items'][:5] # 限制结果数量 return [{"title": paper.get('title', [''])[0], "DOI": paper.get('DOI', '')} for paper in papers] else: return [] def send_email(to, subject, content): try: with smtplib.SMTP('106.15.184.28', 8025) as smtp: smtp.login("jwt", emailkey) message = MIMEMultipart() message['From'] = "Me " message['To'] = to message['Subject'] = subject message.attach(MIMEText(content, 'html')) smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string()) return True except Exception as e: print(f"发送邮件时出错: {str(e)}") return False def process_function_call(tool_call): function_name = tool_call.function.name function_args = json.loads(tool_call.function.arguments) if function_name == "search_duckduckgo": return search_duckduckgo(function_args.get('keywords', [])) elif function_name == "search_papers": return search_papers(function_args.get('query', '')) elif function_name == "send_email": return send_email(function_args.get('to', ''), function_args.get('subject', ''), function_args.get('content', '')) else: return None # @app.route('/api/wx', methods=['GET', 'POST']) # def wechatai(): # if request.method == 'GET': # return verify_wechat(request) # else: # xml_str = request.data # user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(xml_str) # if from_user_name not in user_sessions: # user_sessions[from_user_name] = {'messages': [], 'pending_response': []} # session = user_sessions[from_user_name] # if user_message_content.lower() == '继续': # if session['pending_response']: # response_content = session['pending_response'].pop(0) # if session['pending_response']: # response_content += '\n\n回复"继续"获取下一部分。' # else: # response_content += '\n\n回复结束。' # else: # response_content = "没有待发送的消息。" # else: # session['messages'].append({"role": "user", "content": user_message_content}) # gpt_response = get_openai_response(session['messages']) # if gpt_response.tool_calls: # for tool_call in gpt_response.tool_calls: # function_result = process_function_call(tool_call) # session['messages'].append({ # "role": "function", # "name": tool_call.function.name, # "content": json.dumps(function_result) # }) # final_response = get_openai_response(session['messages']) # response_content = final_response.content # else: # response_content = gpt_response.content # session['messages'].append({"role": "assistant", "content": response_content}) # response_parts = split_message(response_content) # if len(response_parts) > 1: # response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。' # session['pending_response'] = response_parts[1:] # else: # response_content = response_parts[0] # return generate_response_xml(from_user_name, to_user_name, response_content) @app.route('/api/wx', methods=['GET', 'POST']) def wechatai(): if request.method == 'GET': return verify_wechat(request) else: xml_str = request.data user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(xml_str) if from_user_name not in user_sessions: user_sessions[from_user_name] = {'messages': [], 'pending_response': []} session = user_sessions[from_user_name] if user_message_content.lower() == '继续': if session['pending_response']: response_content = session['pending_response'].pop(0) if session['pending_response']: response_content += '\n\n回复"继续"获取下一部分。' else: response_content += '\n\n回复结束。' else: response_content = "没有待发送的消息。" else: session['messages'].append({"role": "user", "content": user_message_content}) gpt_response = get_openai_response(session['messages']) if gpt_response.tool_calls: for tool_call in gpt_response.tool_calls: function_result = process_function_call(tool_call) session['messages'].append({ "role": "function", "name": tool_call.function.name, "content": json.dumps(function_result) }) # 直接使用 gpt_response 的内容,不再重新调用 API response_content = gpt_response.content else: response_content = gpt_response.content session['messages'].append({"role": "assistant", "content": response_content}) response_parts = split_message(response_content) if len(response_parts) > 1: response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。' session['pending_response'] = response_parts[1:] else: response_content = response_parts[0] return generate_response_xml(from_user_name, to_user_name, response_content) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)