pplx2api / app.py
smgc's picture
Update app.py
2a6dcad verified
raw
history blame
6.5 kB
import os
import json
import uuid
import asyncio
from aiohttp import web
from socketio import AsyncClient
from aiohttp_socks import ProxyConnector
# 从环境变量中获取 API 密钥和 Cookie
API_KEY = os.environ.get("PPLX_KEY")
PPLX_COOKIE = os.environ.get("PPLX_COOKIE")
USER_AGENT = os.environ.get("USER_AGENT")
# 代理设置(如果需要)
PROXY_URL = os.environ.get("PROXY_URL") # 例如: socks5://user:pass@host:port
connector = ProxyConnector.from_url(PROXY_URL) if PROXY_URL else None
# Socket.IO 客户端选项
opts = {
"auth": {"jwt": "anonymous-ask-user"},
"reconnection": False,
"transports": ["websocket"],
"path": "/socket.io",
"hostname": "www.perplexity.ai",
"secure": True,
"port": "443",
"extra_headers": {
"Cookie": PPLX_COOKIE,
"User-Agent": USER_AGENT,
"Accept": "*/*",
"priority": "u=1, i",
"Referer": "https://www.perplexity.ai/",
},
}
# 创建 aiohttp 应用
app = web.Application()
# API 密钥验证中间件
@web.middleware
async def validate_api_key(request, handler):
api_key = request.headers.get("x-api-key")
if api_key != API_KEY:
log_request(request, 401)
return web.json_response({"error": "无效的 API 密钥"}, status=401)
return await handler(request)
# 日志记录函数
def log_request(request, status):
timestamp = datetime.now().isoformat()
ip = request.remote or request.transport.get_extra_info("peername")[0]
route = request.path
print(f"{timestamp} - {ip} - {route} - {status}")
# 根路由处理
async def root_handler(request):
log_request(request, 200)
return web.json_response(
{
"message": "欢迎使用 Perplexity AI 代理 API",
"endpoints": {
"/ai/v1/messages": {
"method": "POST",
"description": "向 AI 发送消息",
"headers": {
"x-api-key": "你的 API 密钥(必需)",
"Content-Type": "application/json",
},
"body": {
"messages": "消息对象数组",
"stream": "布尔值(true 表示流式响应)",
# 其他可能的参数...
},
}
},
}
)
# 创建事件流数据
def create_event(event, data):
if isinstance(data, dict):
data = json.dumps(data)
return f"event: {event}\ndata: {data}\n\n"
# AI 消息处理
@web.post("/ai/v1/messages")
@validate_api_key
async def ai_messages_handler(request):
try:
json_body = await request.json()
if json_body.get("stream") is False:
log_request(request, 200)
return web.json_response(
{
"id": str(uuid.uuid4()),
"content": [
{"text": "请打开流式传输。"},
{"id": "string", "name": "string", "input": {}},
],
"model": "string",
"stop_reason": "end_turn",
"stop_sequence": "string",
"usage": {"input_tokens": 0, "output_tokens": 0},
}
)
elif json_body.get("stream") is True:
# 处理消息... (与 JavaScript 版本类似)
previous_messages = "\n\n".join([msg.get("content", "") for msg in json_body.get("messages", [])])
msgid = str(uuid.uuid4())
response = web.StreamResponse(
status=200,
reason="OK",
headers={"Content-Type": "text/event-stream;charset=utf-8"},
)
await response.prepare(request)
await response.write(
create_event(
"message_start",
{
"type": "message_start",
"message": {
"id": msgid,
# ... (其余部分与 JavaScript 版本类似)
},
},
).encode()
)
# 使用 AsyncClient 连接到 Socket.IO 服务器
async with AsyncClient(connector=connector) as socket:
await socket.connect("wss://www.perplexity.ai/", **opts)
# 发送 perplexity_ask 事件 (需要根据Perplexity.ai的API调整)
await socket.emit("perplexity_ask", previous_messages)
# 处理 Socket.IO 事件... (与 JavaScript 版本类似)
# ...处理 query_progress, disconnect, error, connect_error 等事件,并写入 response
@socket.on("query_progress")
async def on_query_progress(data):
if data.get("text"):
text = json.loads(data["text"])
chunk = text["chunks"][-1] if text.get("chunks") else None
if chunk:
await response.write(create_event("content_block_delta", { # ...}).encode())
# ... other event handlers
return response
else:
raise ValueError("无效的请求")
except (json.JSONDecodeError, ValueError, Exception) as e:
console.log(e) # debug
log_request(request, 400)
return web.json_response({"error": str(e)}, status=400)
# 处理其他路由
async def not_found_handler(request):
log_request(request, 404)
return web.Response(text="未找到", status=404)
# 错误处理中间件
@web.middleware
async def error_middleware(request, handler):
try:
return await handler(request)
except Exception as err:
print(err, traceback.format_exc()) # debug
log_request(request, 500)
return web.Response(text="服务器错误", status=500)
# 添加路由和中间件
app.add_routes([web.get("/", root_handler), web.post("/ai/v1/messages", ai_messages_handler)])
app.add_routes([web.static("/static", "static")])
app.middleware.append(error_middleware)
app.router.add_route("*", "/{path_info:.*}", not_found_handler)
# 启动服务器
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8081))
if not API_KEY:
print("警告:未设置 PPLX_KEY 环境变量。API 密钥验证将失败。")
web.run_app(app, port=port)