import os import json import uuid from datetime import datetime from flask import Flask, request, Response, jsonify import socketio import requests import logging from threading import Event import re from functools import wraps # 创建 Flask 应用 app = Flask(__name__) # 自定义日志格式 log_format = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" logging.basicConfig(level=logging.INFO, format=log_format) app_logger = logging.getLogger('app') # 从环境变量中获取 API 密钥 API_KEY = os.environ.get('PPLX_KEY') # 代理设置 proxy_url = os.environ.get('PROXY_URL') transport = requests.Session() if proxy_url: transport.proxies.update({'http': proxy_url, 'https': proxy_url}) sio = socketio.Client(http_session=transport, logger=True, engineio_logger=True) # 连接选项 connect_opts = {'transports': ['websocket', 'polling']} # 其他选项 sio_opts = { 'extraHeaders': { 'Cookie': os.environ.get('PPLX_COOKIE'), 'User-Agent': os.environ.get('USER_AGENT'), 'Accept': '*/*', 'priority': 'u=1, i', 'Referer': 'https://www.perplexity.ai/', } } def log_request(func): @wraps(func) def wrapper(*args, **kwargs): start_time = datetime.now() response = func(*args, **kwargs) duration = (datetime.now() - start_time).total_seconds() app_logger.info(f"{request.remote_addr} - {request.method} {request.path} - {response.status_code} - {duration:.2f}s") return response return wrapper def validate_api_key(func): @wraps(func) def wrapper(*args, **kwargs): api_key = request.headers.get('x-api-key') if api_key != API_KEY: app_logger.warning(f"Invalid API key attempt from {request.remote_addr}") return jsonify({"error": "Invalid API key"}), 401 return func(*args, **kwargs) return wrapper def normalize_content(content): if isinstance(content, str): return content elif isinstance(content, (dict, list)): return json.dumps(content, ensure_ascii=False) return str(content) def calculate_tokens(text): return len(re.findall(r'\w+|[^\w\s]', text, re.UNICODE)) def create_json_response(data, status_code=200): response = jsonify(data) response.status_code = status_code app_logger.debug(f"Sending JSON response: {json.dumps(data, ensure_ascii=False)}") return response @app.route('/') @log_request def root(): return create_json_response({ "message": "Welcome to the Perplexity AI Proxy API", "endpoints": { "/ai/v1/messages": { "method": "POST", "description": "Send a message to the AI", "headers": { "x-api-key": "Your API key (required)", "Content-Type": "application/json" }, "body": { "messages": "Array of message objects", "stream": "Boolean (true for streaming response)", "model": "Model to be used (optional, defaults to claude-3-opus-20240229)" } } } }) @app.route('/ai/v1/messages', methods=['POST']) @log_request @validate_api_key def messages(): try: json_body = request.json model = json_body.get('model', 'claude-3-opus-20240229') stream = json_body.get('stream', True) previous_messages = "\n\n".join([normalize_content(msg['content']) for msg in json_body['messages']]) input_tokens = calculate_tokens(previous_messages) msg_id = str(uuid.uuid4()) response_event = Event() response_text = [] if not stream: return handle_non_stream(previous_messages, msg_id, model, input_tokens) def generate(): try: yield create_sse_event("message_start", { "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "content": [], "model": model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": input_tokens, "output_tokens": 1}, }, }) yield create_sse_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}) yield create_sse_event("ping", {"type": "ping"}) sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders']) @sio.on('connect') def on_connect(): app_logger.info("Connected to Perplexity AI") emit_data = { "version": "2.9", "source": "default", "attachments": [], "language": "en-GB", "timezone": "Europe/London", "mode": "concise", "is_related_query": False, "is_default_related_query": False, "visitor_id": str(uuid.uuid4()), "frontend_context_uuid": str(uuid.uuid4()), "prompt_source": "user", "query_source": "home" } sio.emit('perplexity_ask', (previous_messages, emit_data)) @sio.on('query_progress') def on_query_progress(data): if 'text' in data: text = json.loads(data['text']) chunk = text['chunks'][-1] if text['chunks'] else None if chunk: response_text.append(chunk) yield create_sse_event("content_block_delta", { "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": chunk}, }) if data.get('final', False): response_event.set() @sio.on('disconnect') def on_disconnect(): app_logger.info("Disconnected from Perplexity AI") response_event.set() @sio.on('connect_error') def on_connect_error(data): app_logger.error(f"Connection error: {data}") yield create_sse_event("error", {"type": "error", "message": f"Error connecting to Perplexity AI: {data}"}) response_event.set() while not response_event.is_set(): sio.sleep(0.1) output_tokens = calculate_tokens(''.join(response_text)) yield create_sse_event("content_block_stop", {"type": "content_block_stop", "index": 0}) yield create_sse_event("message_delta", { "type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": None}, "usage": {"input_tokens": input_tokens, "output_tokens": output_tokens}, }) yield create_sse_event("message_stop", {"type": "message_stop"}) except Exception as e: app_logger.error(f"Error in generate function: {str(e)}") yield create_sse_event("error", {"type": "error", "message": str(e)}) finally: if sio.connected: sio.disconnect() return Response(generate(), content_type='text/event-stream; charset=utf-8') except Exception as e: app_logger.error(f"Request error: {str(e)}") return create_json_response({"error": str(e)}, 400) def handle_non_stream(previous_messages, msg_id, model, input_tokens): try: response_event = Event() response_text = [] sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders']) @sio.on('connect') def on_connect(): app_logger.info("Connected to Perplexity AI") emit_data = { "version": "2.9", "source": "default", "attachments": [], "language": "en-GB", "timezone": "Europe/London", "mode": "concise", "is_related_query": False, "is_default_related_query": False, "visitor_id": str(uuid.uuid4()), "frontend_context_uuid": str(uuid.uuid4()), "prompt_source": "user", "query_source": "home" } sio.emit('perplexity_ask', (previous_messages, emit_data)) @sio.on('query_progress') def on_query_progress(data): if 'text' in data: text = json.loads(data['text']) chunk = text['chunks'][-1] if text['chunks'] else None if chunk: response_text.append(chunk) if data.get('final', False): response_event.set() @sio.on('disconnect') def on_disconnect(): app_logger.info("Disconnected from Perplexity AI") response_event.set() @sio.on('connect_error') def on_connect_error(data): app_logger.error(f"Connection error: {data}") response_text.append(f"Error connecting to Perplexity AI: {data}") response_event.set() response_event.wait(timeout=30) output_tokens = calculate_tokens(''.join(response_text)) full_response = { "content": [{"text": ''.join(response_text), "type": "text"}], "id": msg_id, "model": model, "role": "assistant", "stop_reason": "end_turn", "stop_sequence": None, "type": "message", "usage": { "input_tokens": input_tokens, "output_tokens": output_tokens, }, } return create_json_response(full_response) except Exception as e: app_logger.error(f"Error during socket connection: {str(e)}") return create_json_response({"error": str(e)}, 500) finally: if sio.connected: sio.disconnect() @app.errorhandler(404) def not_found(error): return create_json_response({"error": "Not Found"}, 404) @app.errorhandler(500) def server_error(error): app_logger.error(f"Server error: {str(error)}") return create_json_response({"error": "Internal Server Error"}, 500) def create_sse_event(event, data): json_data = json.dumps(data, ensure_ascii=False) event_str = f"event: {event}\ndata: {json_data}\n\n" app_logger.debug(f"Sending SSE event: {event_str}") return event_str if __name__ == '__main__': port = int(os.environ.get('PORT', 8081)) app_logger.info(f"Perplexity proxy listening on port {port}") if not API_KEY: app_logger.warning("Warning: PPLX_KEY environment variable is not set. API key validation will fail.") app.run(host='0.0.0.0', port=port, debug=False)