pplx2api / app.py
smgc's picture
Update app.py
601fe9d verified
raw
history blame
17.2 kB
import os
import json
import uuid
import time
from datetime import datetime
from flask import Flask, request, Response, jsonify
import socketio
import requests
import logging
import re
import asyncio
from threading import Thread
app = Flask(__name__)
class CustomFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"message": self.remove_ansi_escape(record.getMessage()),
}
if hasattr(record, 'event_type'):
log_data['event_type'] = record.event_type
if hasattr(record, 'data'):
log_data['data'] = record.data
return json.dumps(log_data, ensure_ascii=False, indent=2)
def remove_ansi_escape(self, text):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(CustomFormatter())
logger.addHandler(handler)
logger = logging.getLogger(__name__)
API_KEY = os.environ.get('PPLX_KEY')
proxy_url = os.environ.get('PROXY_URL')
if proxy_url:
proxies = {'http': proxy_url, 'https': proxy_url}
transport = requests.Session()
transport.proxies.update(proxies)
else:
transport = None
sio = socketio.AsyncClient(http_session=transport, logger=False, engineio_logger=False)
connect_opts = {'transports': ['websocket', 'polling']}
sio_opts = {
'extraHeaders': {
'Cookie': os.environ.get('PPLX_COOKIE'),
'User-Agent': os.environ.get('USER_AGENT'),
'Accept': '*/*',
'priority': 'u=1, i',
'Referer': 'https://www.perplexity.ai/',
}
}
def log_request(ip, route, status):
timestamp = datetime.now().isoformat()
logger.info(f"Request logged", extra={
'event_type': 'request_log',
'data': {
'ip': ip,
'route': route,
'status': status,
'timestamp': timestamp
}
})
def validate_api_key():
api_key = request.headers.get('x-api-key')
if api_key != API_KEY:
log_request(request.remote_addr, request.path, 401)
return jsonify({"error": "Invalid API key"}), 401
return None
def normalize_content(content):
if isinstance(content, str):
return content
elif isinstance(content, dict):
return json.dumps(content, ensure_ascii=False)
elif isinstance(content, list):
return " ".join([normalize_content(item) for item in content])
else:
return ""
def calculate_tokens(text):
if re.search(r'[^\x00-\x7F]', text):
return len(text)
else:
return len(text.split())
def create_event(event, data):
if isinstance(data, dict):
data = json.dumps(data, ensure_ascii=False)
return f"event: {event}\ndata: {data}\n\n"
@app.route('/')
def root():
log_request(request.remote_addr, request.path, 200)
return jsonify({
"message": "Welcome to the Perplexity AI Proxy API",
"endpoints": {
"/ai/v1/messages": {
"method": "POST",
"description": "Send a message to the AI",
"headers": {
"x-api-key": "Your API key (required)",
"Content-Type": "application/json"
},
"body": {
"messages": "Array of message objects",
"stream": "Boolean (optional, defaults to false)",
"model": "Model to be used (optional, defaults to claude-3-5-sonnet-20240620)"
}
}
}
})
@app.route('/ai/v1/messages', methods=['POST'])
def messages():
auth_error = validate_api_key()
if auth_error:
return auth_error
try:
json_body = request.json
model = json_body.get('model', 'claude-3-5-sonnet-20240620')
stream = json_body.get('stream', False)
previous_messages = "\n\n".join([normalize_content(msg['content']) for msg in json_body['messages']])
input_tokens = calculate_tokens(previous_messages)
msg_id = str(uuid.uuid4())
response_event = asyncio.Event()
response_text = []
total_output_tokens = 0
if not stream:
return handle_non_stream(previous_messages, msg_id, model, input_tokens)
log_request(request.remote_addr, request.path, 200)
def generate():
nonlocal total_output_tokens
start_time = time.time()
last_activity_time = start_time
timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
def send_event(event_type, data):
event = create_event(event_type, data)
logger.info(f"Sending {event_type} event", extra={
'event_type': event_type,
'data': {'content': event}
})
return event
yield send_event("message_start", {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"model": model,
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": input_tokens, "output_tokens": total_output_tokens},
},
})
yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
yield send_event("ping", {"type": "ping"})
async def run_socket_io():
@sio.event
async def query_progress(data):
nonlocal total_output_tokens, response_text, last_activity_time
last_activity_time = time.time()
if 'text' in data:
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
response_text.append(chunk)
chunk_tokens = calculate_tokens(chunk)
total_output_tokens += chunk_tokens
logger.info("Received chunk", extra={
'event_type': 'chunk_received',
'data': {
'chunk': chunk,
'tokens': chunk_tokens,
'total_tokens': total_output_tokens
}
})
if data.get('final', False):
logger.info("Final response received", extra={
'event_type': 'response_complete',
'data': {
'total_tokens': total_output_tokens
}
})
response_event.set()
@sio.event
async def connect():
logger.info("Connected to Perplexity AI", extra={'event_type': 'connection_established'})
emit_data = {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
await sio.emit('perplexity_ask', (previous_messages, emit_data))
logger.info("Sent query to Perplexity AI", extra={
'event_type': 'query_sent',
'data': {
'message': previous_messages[:100] + '...' if len(previous_messages) > 100 else previous_messages
}
})
async def heartbeat():
while not response_event.is_set():
await sio.emit('ping')
await asyncio.sleep(25)
try:
await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
heartbeat_task = asyncio.create_task(heartbeat())
while not response_event.is_set() and (time.time() - start_time) < timeout:
current_time = time.time()
if current_time - last_activity_time > 60: # 如果60秒内没有活动,记录警告
logger.warning("No activity for 60 seconds", extra={'event_type': 'inactivity_warning'})
await asyncio.sleep(1)
if not response_event.is_set():
logger.warning(f"Request timed out after {timeout} seconds", extra={
'event_type': 'request_timeout',
'data': {
'timeout': timeout,
'input_tokens': input_tokens,
'output_tokens': total_output_tokens,
'elapsed_time': time.time() - start_time
}
})
except Exception as e:
logger.error(f"Error during socket connection: {str(e)}", exc_info=True)
finally:
heartbeat_task.cancel()
if sio.connected:
await sio.disconnect()
# 在新线程中运行异步Socket.IO
thread = Thread(target=lambda: asyncio.run(run_socket_io()))
thread.start()
while not response_event.is_set() and (time.time() - start_time) < timeout:
time.sleep(0.1)
while response_text:
chunk = response_text.pop(0)
yield send_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": chunk},
})
if not response_event.is_set():
yield send_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"},
})
thread.join() # 等待Socket.IO线程完成
yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
yield send_event("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"output_tokens": total_output_tokens},
})
yield send_event("message_stop", {"type": "message_stop"})
return Response(generate(), content_type='text/event-stream')
except Exception as e:
logger.error(f"Request error: {str(e)}", exc_info=True)
log_request(request.remote_addr, request.path, 400)
return jsonify({"error": str(e)}), 400
def handle_non_stream(previous_messages, msg_id, model, input_tokens):
try:
response_event = asyncio.Event()
response_text = []
total_output_tokens = 0
start_time = time.time()
timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
async def run_socket_io():
@sio.event
async def query_progress(data):
nonlocal total_output_tokens, response_text
if 'text' in data:
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
response_text.append(chunk)
chunk_tokens = calculate_tokens(chunk)
total_output_tokens += chunk_tokens
logger.info("Received chunk (non-stream)", extra={
'event_type': 'chunk_received_non_stream',
'data': {
'chunk': chunk,
'tokens': chunk_tokens,
'total_tokens': total_output_tokens
}
})
if data.get('final', False):
response_event.set()
@sio.event
async def connect():
logger.info("Connected to Perplexity AI (non-stream)", extra={'event_type': 'connection_established_non_stream'})
emit_data = {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
await sio.emit('perplexity_ask', (previous_messages, emit_data))
try:
await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
await asyncio.wait_for(response_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
logger.warning(f"Request timed out after {timeout} seconds (non-stream)", extra={
'event_type': 'request_timeout_non_stream',
'data': {
'timeout': timeout,
'input_tokens': input_tokens,
'elapsed_time': time.time() - start_time
}
})
finally:
if sio.connected:
await sio.disconnect()
# 在新线程中运行异步Socket.IO
thread = Thread(target=lambda: asyncio.run(run_socket_io()))
thread.start()
thread.join()
if not response_text:
logger.warning(f"No response received (non-stream) after {timeout} seconds", extra={
'event_type': 'no_response_non_stream',
'data': {
'timeout': timeout,
'input_tokens': input_tokens,
'elapsed_time': time.time() - start_time
}
})
return jsonify({"error": f"No response received after {timeout} seconds"}), 504
full_response = {
"content": [{"text": ''.join(response_text), "type": "text"}],
"id": msg_id,
"model": model,
"role": "assistant",
"stop_reason": "end_turn",
"stop_sequence": None,
"type": "message",
"usage": {
"input_tokens": input_tokens,
"output_tokens": total_output_tokens,
},
}
logger.info("Sending non-stream response", extra={
'event_type': 'non_stream_response',
'data': {
'content': full_response,
'elapsed_time': time.time() - start_time
}
})
return Response(json.dumps(full_response, ensure_ascii=False), content_type='application/json')
except Exception as e:
logger.error(f"Error during non-stream socket connection: {str(e)}", exc_info=True)
return jsonify({"error": str(e)}), 500
@app.errorhandler(404)
def not_found(error):
log_request(request.remote_addr, request.path, 404)
return "Not Found", 404
@app.errorhandler(500)
def server_error(error):
logger.error(f"Server error: {str(error)}", exc_info=True)
log_request(request.remote_addr, request.path, 500)
return "Something broke!", 500
if __name__ == '__main__':
setup_logging()
port = int(os.environ.get('PORT', 8081))
logger.info(f"Perplexity proxy starting", extra={
'event_type': 'server_start',
'data': {'port': port}
})
if not API_KEY:
logger.warning("PPLX_KEY environment variable is not set", extra={'event_type': 'config_warning'})
app.run(host='0.0.0.0', port=port)