pplx2api / app.py
smgc's picture
Update app.py
b9e956c verified
raw
history blame
12.6 kB
import os
import json
import uuid
from datetime import datetime
from flask import Flask, request, Response, jsonify
import socketio
import requests
import logging
from threading import Event
import re
# 创建 Flask 应用
app = Flask(__name__)
# 自定义日志格式
log_format = "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
# 创建不同的日志记录器
app_logger = logging.getLogger('app')
sio_logger = logging.getLogger('socketio.client')
engineio_logger = logging.getLogger('engineio.client')
# 调整日志级别,隐藏过多的调试信息
sio_logger.setLevel(logging.WARNING)
engineio_logger.setLevel(logging.WARNING)
# 从环境变量中获取 API 密钥
API_KEY = os.environ.get('PPLX_KEY')
# 代理设置
proxy_url = os.environ.get('PROXY_URL')
if proxy_url:
proxies = {
'http': proxy_url,
'https': proxy_url
}
transport = requests.Session()
transport.proxies.update(proxies)
else:
transport = None
sio = socketio.Client(http_session=transport, logger=sio_logger, engineio_logger=engineio_logger)
# 连接选项
connect_opts = {
'transports': ['websocket', 'polling'], # 允许回退到轮询
}
# 其他选项
sio_opts = {
'extraHeaders': {
'Cookie': os.environ.get('PPLX_COOKIE'),
'User-Agent': os.environ.get('USER_AGENT'),
'Accept': '*/*',
'priority': 'u=1, i',
'Referer': 'https://www.perplexity.ai/',
}
}
def log_request(ip, route, status):
timestamp = datetime.now().isoformat()
app_logger.info(f"{timestamp} - {ip} - {route} - {status}")
def validate_api_key():
api_key = request.headers.get('x-api-key')
if api_key != API_KEY:
log_request(request.remote_addr, request.path, 401)
return create_json_response({"error": "Invalid API key"}), 401
return None
def normalize_content(content):
"""
递归处理 msg['content'],确保其为字符串。
如果 content 是字典或列表,将其转换为字符串。
"""
if isinstance(content, str):
return content
elif isinstance(content, dict):
return json.dumps(content, ensure_ascii=False)
elif isinstance(content, list):
return " ".join([normalize_content(item) for item in content])
else:
return ""
def calculate_tokens(text):
"""
改进的 token 计算方法。
- 对于英文和有空格的文本,使用空格分词。
- 对于中文等没有空格的文本,使用字符级分词。
"""
if re.search(r'[^\x00-\x7F]', text):
return len(text)
else:
tokens = text.split()
return len(tokens)
def create_json_response(data):
"""创建一个JSON响应,并在发送之前记录它"""
json_str = json.dumps(data, ensure_ascii=False)
app_logger.info(f"Sending JSON response: {json_str}")
return Response(json_str, content_type='application/json')
@app.route('/')
def root():
log_request(request.remote_addr, request.path, 200)
return create_json_response({
"message": "Welcome to the Perplexity AI Proxy API",
"endpoints": {
"/ai/v1/messages": {
"method": "POST",
"description": "Send a message to the AI",
"headers": {
"x-api-key": "Your API key (required)",
"Content-Type": "application/json"
},
"body": {
"messages": "Array of message objects",
"stream": "Boolean (true for streaming response)",
"model": "Model to be used (optional, defaults to claude-3-opus-20240229)"
}
}
}
})
@app.route('/ai/v1/messages', methods=['POST'])
def messages():
auth_error = validate_api_key()
if auth_error:
return auth_error
try:
json_body = request.json
model = json_body.get('model', 'claude-3-opus-20240229')
stream = json_body.get('stream', True)
previous_messages = "\n\n".join([normalize_content(msg['content']) for msg in json_body['messages']])
input_tokens = calculate_tokens(previous_messages)
msg_id = str(uuid.uuid4())
response_event = Event()
response_text = []
if not stream:
return handle_non_stream(previous_messages, msg_id, model, input_tokens)
log_request(request.remote_addr, request.path, 200)
def generate():
try:
yield create_event("message_start", {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"content": [],
"model": model,
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": input_tokens, "output_tokens": 1},
},
})
yield create_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
yield create_event("ping", {"type": "ping"})
def on_connect():
app_logger.info("Connected to Perplexity AI")
emit_data = {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
sio.emit('perplexity_ask', (previous_messages, emit_data))
def on_query_progress(data):
nonlocal response_text
if 'text' in data:
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
response_text.append(chunk)
if data.get('final', False):
response_event.set()
def on_disconnect():
app_logger.info("Disconnected from Perplexity AI")
response_event.set()
def on_connect_error(data):
app_logger.error(f"Connection error: {data}")
response_text.append(f"Error connecting to Perplexity AI: {data}")
response_event.set()
sio.on('connect', on_connect)
sio.on('query_progress', on_query_progress)
sio.on('disconnect', on_disconnect)
sio.on('connect_error', on_connect_error)
sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
while not response_event.is_set():
sio.sleep(0.1)
while response_text:
chunk = response_text.pop(0)
app_logger.info(f"Returning chunk to client: {chunk}")
yield create_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": chunk},
})
output_tokens = calculate_tokens(''.join(response_text))
yield create_event("content_block_stop", {"type": "content_block_stop", "index": 0})
yield create_event("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"input_tokens": input_tokens, "output_tokens": output_tokens},
})
yield create_event("message_stop", {"type": "message_stop"})
except Exception as e:
app_logger.error(f"Error in generate function: {str(e)}")
yield create_event("error", {"type": "error", "message": str(e)})
finally:
if sio.connected:
sio.disconnect()
return Response(generate(), content_type='text/event-stream')
except Exception as e:
app_logger.error(f"Request error: {str(e)}")
log_request(request.remote_addr, request.path, 400)
return create_json_response({"error": str(e)}), 400
def handle_non_stream(previous_messages, msg_id, model, input_tokens):
try:
response_event = Event()
response_text = []
def on_connect():
app_logger.info("Connected to Perplexity AI")
emit_data = {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
sio.emit('perplexity_ask', (previous_messages, emit_data))
def on_query_progress(data):
nonlocal response_text
if 'text' in data:
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
response_text.append(chunk)
if data.get('final', False):
response_event.set()
def on_disconnect():
app_logger.info("Disconnected from Perplexity AI")
response_event.set()
def on_connect_error(data):
app_logger.error(f"Connection error: {data}")
response_text.append(f"Error connecting to Perplexity AI: {data}")
response_event.set()
sio.on('connect', on_connect)
sio.on('query_progress', on_query_progress)
sio.on('disconnect', on_disconnect)
sio.on('connect_error', on_connect_error)
sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
response_event.wait(timeout=30)
output_tokens = calculate_tokens(''.join(response_text))
full_response = {
"content": [{"text": ''.join(response_text), "type": "text"}],
"id": msg_id,
"model": model,
"role": "assistant",
"stop_reason": "end_turn",
"stop_sequence": None,
"type": "message",
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
},
}
return create_json_response(full_response)
except Exception as e:
app_logger.error(f"Error during socket connection: {str(e)}")
return create_json_response({"error": str(e)}), 500
finally:
if sio.connected:
sio.disconnect()
@app.errorhandler(404)
def not_found(error):
log_request(request.remote_addr, request.path, 404)
return create_json_response({"error": "Not Found"}), 404
@app.errorhandler(500)
def server_error(error):
app_logger.error(f"Server error: {str(error)}")
log_request(request.remote_addr, request.path, 500)
return create_json_response({"error": "Internal Server Error"}), 500
def create_event(event, data):
if isinstance(data, dict):
data = json.dumps(data, ensure_ascii=False)
event_str = f"event: {event}\ndata: {data}\n\n"
app_logger.info(f"Sending SSE event: {event_str}")
return event_str
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8081))
app_logger.info(f"Perplexity proxy listening on port {port}")
if not API_KEY:
app_logger.warning("Warning: PPLX_KEY environment variable is not set. API key validation will fail.")
app.run(host='0.0.0.0', port=port)