import os import json import uuid import time from datetime import datetime from flask import Flask, request, Response, jsonify import socketio import requests import logging from threading import Event import re app = Flask(__name__) class CustomFormatter(logging.Formatter): def format(self, record): log_data = { "timestamp": self.formatTime(record, self.datefmt), "level": record.levelname, "message": self.remove_ansi_escape(record.getMessage()), } if hasattr(record, 'event_type'): log_data['event_type'] = record.event_type if hasattr(record, 'data'): log_data['data'] = record.data return json.dumps(log_data, ensure_ascii=False, indent=2) def remove_ansi_escape(self, text): ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', text) def setup_logging(): logger = logging.getLogger() logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(CustomFormatter()) logger.addHandler(handler) logger = logging.getLogger(__name__) API_KEY = os.environ.get('PPLX_KEY') proxy_url = os.environ.get('PROXY_URL') if proxy_url: proxies = {'http': proxy_url, 'https': proxy_url} transport = requests.Session() transport.proxies.update(proxies) else: transport = None sio = socketio.Client(http_session=transport, logger=False, engineio_logger=False) connect_opts = {'transports': ['websocket', 'polling']} sio_opts = { 'extraHeaders': { 'Cookie': os.environ.get('PPLX_COOKIE'), 'User-Agent': os.environ.get('USER_AGENT'), 'Accept': '*/*', 'priority': 'u=1, i', 'Referer': 'https://www.perplexity.ai/', } } def log_request(ip, route, status): timestamp = datetime.now().isoformat() logger.info(f"Request logged", extra={ 'event_type': 'request_log', 'data': { 'ip': ip, 'route': route, 'status': status, 'timestamp': timestamp } }) def validate_api_key(): api_key = request.headers.get('x-api-key') if api_key != API_KEY: log_request(request.remote_addr, request.path, 401) return jsonify({"error": "Invalid API key"}), 401 return None def normalize_content(content): if isinstance(content, str): return content elif isinstance(content, dict): return json.dumps(content, ensure_ascii=False) elif isinstance(content, list): return " ".join([normalize_content(item) for item in content]) else: return "" def calculate_tokens(text): if re.search(r'[^\x00-\x7F]', text): return len(text) else: return len(text.split()) def create_event(event, data): if isinstance(data, dict): data = json.dumps(data, ensure_ascii=False) return f"event: {event}\ndata: {data}\n\n" @app.route('/') def root(): log_request(request.remote_addr, request.path, 200) return jsonify({ "message": "Welcome to the Perplexity AI Proxy API", "endpoints": { "/ai/v1/messages": { "method": "POST", "description": "Send a message to the AI", "headers": { "x-api-key": "Your API key (required)", "Content-Type": "application/json" }, "body": { "messages": "Array of message objects", "stream": "Boolean (optional, defaults to false)", "model": "Model to be used (optional, defaults to claude-3-5-sonnet-20240620)" } } } }) @app.route('/ai/v1/messages', methods=['POST']) def messages(): auth_error = validate_api_key() if auth_error: return auth_error try: json_body = request.json model = json_body.get('model', 'claude-3-5-sonnet-20240620') stream = json_body.get('stream', False) previous_messages = "\n\n".join([normalize_content(msg['content']) for msg in json_body['messages']]) input_tokens = calculate_tokens(previous_messages) msg_id = str(uuid.uuid4()) response_event = Event() response_text = [] total_output_tokens = 0 if not stream: return handle_non_stream(previous_messages, msg_id, model, input_tokens) log_request(request.remote_addr, request.path, 200) def generate(): nonlocal total_output_tokens start_time = time.time() last_activity_time = start_time timeout = max(60, input_tokens / 1000) # 动态设置超时时间,最少60秒 def send_event(event_type, data): event = create_event(event_type, data) logger.info(f"Sending {event_type} event", extra={ 'event_type': event_type, 'data': {'content': event} }) yield event yield from send_event("message_start", { "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "model": model, "content": [], "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": input_tokens, "output_tokens": total_output_tokens}, }, }) yield from send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}) yield from send_event("ping", {"type": "ping"}) def on_query_progress(data): nonlocal total_output_tokens, response_text, last_activity_time last_activity_time = time.time() if 'text' in data: text = json.loads(data['text']) chunk = text['chunks'][-1] if text['chunks'] else None if chunk: response_text.append(chunk) chunk_tokens = calculate_tokens(chunk) total_output_tokens += chunk_tokens logger.info("Received chunk", extra={ 'event_type': 'chunk_received', 'data': { 'chunk': chunk, 'tokens': chunk_tokens, 'total_tokens': total_output_tokens } }) if data.get('final', False): logger.info("Final response received", extra={ 'event_type': 'response_complete', 'data': { 'total_tokens': total_output_tokens } }) response_event.set() def on_connect(): logger.info("Connected to Perplexity AI", extra={'event_type': 'connection_established'}) emit_data = { "version": "2.9", "source": "default", "attachments": [], "language": "en-GB", "timezone": "Europe/London", "mode": "concise", "is_related_query": False, "is_default_related_query": False, "visitor_id": str(uuid.uuid4()), "frontend_context_uuid": str(uuid.uuid4()), "prompt_source": "user", "query_source": "home" } sio.emit('perplexity_ask', (previous_messages, emit_data)) logger.info("Sent query to Perplexity AI", extra={ 'event_type': 'query_sent', 'data': { 'message': previous_messages[:100] + '...' if len(previous_messages) > 100 else previous_messages } }) sio.on('connect', on_connect) sio.on('query_progress', on_query_progress) try: sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders']) while not response_event.is_set() and (time.time() - start_time) < timeout: current_time = time.time() if current_time - last_activity_time > 10: # 如果10秒内没有活动,记录警告 logger.warning("No activity for 10 seconds", extra={'event_type': 'inactivity_warning'}) sio.sleep(0.1) while response_text: chunk = response_text.pop(0) yield from send_event("content_block_delta", { "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": chunk}, }) if not response_event.is_set(): logger.warning(f"Request timed out after {timeout} seconds", extra={ 'event_type': 'request_timeout', 'data': { 'timeout': timeout, 'input_tokens': input_tokens, 'output_tokens': total_output_tokens, 'elapsed_time': time.time() - start_time } }) yield from send_event("content_block_delta", { "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"}, }) except Exception as e: logger.error(f"Error during socket connection: {str(e)}", exc_info=True) yield from send_event("content_block_delta", { "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": f"Error during socket connection: {str(e)}"}, }) finally: if sio.connected: sio.disconnect() yield from send_event("content_block_stop", {"type": "content_block_stop", "index": 0}) yield from send_event("message_delta", { "type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": None}, "usage": {"output_tokens": total_output_tokens}, }) yield from send_event("message_stop", {"type": "message_stop"}) return Response(generate(), content_type='text/event-stream') except Exception as e: logger.error(f"Request error: {str(e)}", exc_info=True) log_request(request.remote_addr, request.path, 400) return jsonify({"error": str(e)}), 400 def handle_non_stream(previous_messages, msg_id, model, input_tokens): try: response_event = Event() response_text = [] total_output_tokens = 0 start_time = time.time() timeout = max(60, input_tokens / 1000) # 动态设置超时时间,最少60秒 def on_query_progress(data): nonlocal total_output_tokens, response_text if 'text' in data: text = json.loads(data['text']) chunk = text['chunks'][-1] if text['chunks'] else None if chunk: response_text.append(chunk) chunk_tokens = calculate_tokens(chunk) total_output_tokens += chunk_tokens logger.info("Received chunk (non-stream)", extra={ 'event_type': 'chunk_received_non_stream', 'data': { 'chunk': chunk, 'tokens': chunk_tokens, 'total_tokens': total_output_tokens } }) if data.get('final', False): response_event.set() def on_connect(): logger.info("Connected to Perplexity AI (non-stream)", extra={'event_type': 'connection_established_non_stream'}) emit_data = { "version": "2.9", "source": "default", "attachments": [], "language": "en-GB", "timezone": "Europe/London", "mode": "concise", "is_related_query": False, "is_default_related_query": False, "visitor_id": str(uuid.uuid4()), "frontend_context_uuid": str(uuid.uuid4()), "prompt_source": "user", "query_source": "home" } sio.emit('perplexity_ask', (previous_messages, emit_data)) sio.on('connect', on_connect) sio.on('query_progress', on_query_progress) sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders']) # Wait for response with timeout response_event.wait(timeout=timeout) if not response_text: logger.warning(f"No response received (non-stream) after {timeout} seconds", extra={ 'event_type': 'no_response_non_stream', 'data': { 'timeout': timeout, 'input_tokens': input_tokens, 'elapsed_time': time.time() - start_time } }) return jsonify({"error": f"No response received after {timeout} seconds"}), 504 full_response = { "content": [{"text": ''.join(response_text), "type": "text"}], "id": msg_id, "model": model, "role": "assistant", "stop_reason": "end_turn", "stop_sequence": None, "type": "message", "usage": { "input_tokens": input_tokens, "output_tokens": total_output_tokens, }, } logger.info("Sending non-stream response", extra={ 'event_type': 'non_stream_response', 'data': { 'content': full_response, 'elapsed_time': time.time() - start_time } }) return Response(json.dumps(full_response, ensure_ascii=False), content_type='application/json') except Exception as e: logger.error(f"Error during non-stream socket connection: {str(e)}", exc_info=True) return jsonify({"error": str(e)}), 500 finally: if sio.connected: sio.disconnect() @app.errorhandler(404) def not_found(error): log_request(request.remote_addr, request.path, 404) return "Not Found", 404 @app.errorhandler(500) def server_error(error): logger.error(f"Server error: {str(error)}", exc_info=True) log_request(request.remote_addr, request.path, 500) return "Something broke!", 500 if __name__ == '__main__': setup_logging() port = int(os.environ.get('PORT', 8081)) logger.info(f"Perplexity proxy starting", extra={ 'event_type': 'server_start', 'data': {'port': port} }) if not API_KEY: logger.warning("PPLX_KEY environment variable is not set", extra={'event_type': 'config_warning'}) app.run(host='0.0.0.0', port=port)