import os import json import uuid import asyncio from aiohttp import web from socketio import AsyncClient from aiohttp_socks import ProxyConnector # 从环境变量中获取 API 密钥和 Cookie API_KEY = os.environ.get("PPLX_KEY") PPLX_COOKIE = os.environ.get("PPLX_COOKIE") USER_AGENT = os.environ.get("USER_AGENT") # 代理设置(如果需要) PROXY_URL = os.environ.get("PROXY_URL") # 例如: socks5://user:pass@host:port connector = ProxyConnector.from_url(PROXY_URL) if PROXY_URL else None # Socket.IO 客户端选项 opts = { "auth": {"jwt": "anonymous-ask-user"}, "reconnection": False, "transports": ["websocket"], "path": "/socket.io", "hostname": "www.perplexity.ai", "secure": True, "port": "443", "extra_headers": { "Cookie": PPLX_COOKIE, "User-Agent": USER_AGENT, "Accept": "*/*", "priority": "u=1, i", "Referer": "https://www.perplexity.ai/", }, } # 创建 aiohttp 应用 app = web.Application() # API 密钥验证中间件 @web.middleware async def validate_api_key(request, handler): api_key = request.headers.get("x-api-key") if api_key != API_KEY: log_request(request, 401) return web.json_response({"error": "无效的 API 密钥"}, status=401) return await handler(request) # 日志记录函数 def log_request(request, status): timestamp = datetime.now().isoformat() ip = request.remote or request.transport.get_extra_info("peername")[0] route = request.path print(f"{timestamp} - {ip} - {route} - {status}") # 根路由处理 async def root_handler(request): log_request(request, 200) return web.json_response( { "message": "欢迎使用 Perplexity AI 代理 API", "endpoints": { "/ai/v1/messages": { "method": "POST", "description": "向 AI 发送消息", "headers": { "x-api-key": "你的 API 密钥(必需)", "Content-Type": "application/json", }, "body": { "messages": "消息对象数组", "stream": "布尔值(true 表示流式响应)", # 其他可能的参数... }, } }, } ) # 创建事件流数据 def create_event(event, data): if isinstance(data, dict): data = json.dumps(data) return f"event: {event}\ndata: {data}\n\n" # AI 消息处理 @web.post("/ai/v1/messages") @validate_api_key async def ai_messages_handler(request): try: json_body = await request.json() if json_body.get("stream") is False: log_request(request, 200) return web.json_response( { "id": str(uuid.uuid4()), "content": [ {"text": "请打开流式传输。"}, {"id": "string", "name": "string", "input": {}}, ], "model": "string", "stop_reason": "end_turn", "stop_sequence": "string", "usage": {"input_tokens": 0, "output_tokens": 0}, } ) elif json_body.get("stream") is True: # 处理消息... (与 JavaScript 版本类似) previous_messages = "\n\n".join([msg.get("content", "") for msg in json_body.get("messages", [])]) msgid = str(uuid.uuid4()) response = web.StreamResponse( status=200, reason="OK", headers={"Content-Type": "text/event-stream;charset=utf-8"}, ) await response.prepare(request) await response.write( create_event( "message_start", { "type": "message_start", "message": { "id": msgid, # ... (其余部分与 JavaScript 版本类似) }, }, ).encode() ) # 使用 AsyncClient 连接到 Socket.IO 服务器 async with AsyncClient(connector=connector) as socket: await socket.connect("wss://www.perplexity.ai/", **opts) # 发送 perplexity_ask 事件 (需要根据Perplexity.ai的API调整) await socket.emit("perplexity_ask", previous_messages) # 处理 Socket.IO 事件... (与 JavaScript 版本类似) # ...处理 query_progress, disconnect, error, connect_error 等事件,并写入 response @socket.on("query_progress") async def on_query_progress(data): if data.get("text"): text = json.loads(data["text"]) chunk = text["chunks"][-1] if text.get("chunks") else None if chunk: await response.write(create_event("content_block_delta", { # ...}).encode()) # ... other event handlers return response else: raise ValueError("无效的请求") except (json.JSONDecodeError, ValueError, Exception) as e: console.log(e) # debug log_request(request, 400) return web.json_response({"error": str(e)}, status=400) # 处理其他路由 async def not_found_handler(request): log_request(request, 404) return web.Response(text="未找到", status=404) # 错误处理中间件 @web.middleware async def error_middleware(request, handler): try: return await handler(request) except Exception as err: print(err, traceback.format_exc()) # debug log_request(request, 500) return web.Response(text="服务器错误", status=500) # 添加路由和中间件 app.add_routes([web.get("/", root_handler), web.post("/ai/v1/messages", ai_messages_handler)]) app.add_routes([web.static("/static", "static")]) app.middleware.append(error_middleware) app.router.add_route("*", "/{path_info:.*}", not_found_handler) # 启动服务器 if __name__ == "__main__": port = int(os.environ.get("PORT", 8081)) if not API_KEY: print("警告:未设置 PPLX_KEY 环境变量。API 密钥验证将失败。") web.run_app(app, port=port)