pplx2api / app.py
smgc's picture
Update app.py
511f8f6 verified
raw
history blame
11.4 kB
import os
import json
import uuid
from datetime import datetime
from flask import Flask, request, Response, jsonify
import socketio
import requests
import logging
from threading import Event
import re
from functools import wraps
# 创建 Flask 应用
app = Flask(__name__)
# 自定义日志格式
log_format = "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
app_logger = logging.getLogger('app')
# 从环境变量中获取 API 密钥
API_KEY = os.environ.get('PPLX_KEY')
# 代理设置
proxy_url = os.environ.get('PROXY_URL')
transport = requests.Session()
if proxy_url:
transport.proxies.update({'http': proxy_url, 'https': proxy_url})
sio = socketio.Client(http_session=transport, logger=True, engineio_logger=True)
# 连接选项
connect_opts = {'transports': ['websocket', 'polling']}
# 其他选项
sio_opts = {
'extraHeaders': {
'Cookie': os.environ.get('PPLX_COOKIE'),
'User-Agent': os.environ.get('USER_AGENT'),
'Accept': '*/*',
'priority': 'u=1, i',
'Referer': 'https://www.perplexity.ai/',
}
}
def log_request(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = datetime.now()
response = func(*args, **kwargs)
duration = (datetime.now() - start_time).total_seconds()
app_logger.info(f"{request.remote_addr} - {request.method} {request.path} - {response.status_code} - {duration:.2f}s")
return response
return wrapper
def validate_api_key(func):
@wraps(func)
def wrapper(*args, **kwargs):
api_key = request.headers.get('x-api-key')
if api_key != API_KEY:
app_logger.warning(f"Invalid API key attempt from {request.remote_addr}")
return jsonify({"error": "Invalid API key"}), 401
return func(*args, **kwargs)
return wrapper
def normalize_content(content):
if isinstance(content, str):
return content
elif isinstance(content, (dict, list)):
return json.dumps(content, ensure_ascii=False)
return str(content)
def calculate_tokens(text):
return len(re.findall(r'\w+|[^\w\s]', text, re.UNICODE))
def create_json_response(data, status_code=200):
response = jsonify(data)
response.status_code = status_code
app_logger.debug(f"Sending JSON response: {json.dumps(data, ensure_ascii=False)}")
return response
@app.route('/')
@log_request
def root():
return create_json_response({
"message": "Welcome to the Perplexity AI Proxy API",
"endpoints": {
"/ai/v1/messages": {
"method": "POST",
"description": "Send a message to the AI",
"headers": {
"x-api-key": "Your API key (required)",
"Content-Type": "application/json"
},
"body": {
"messages": "Array of message objects",
"stream": "Boolean (true for streaming response)",
"model": "Model to be used (optional, defaults to claude-3-opus-20240229)"
}
}
}
})
@app.route('/ai/v1/messages', methods=['POST'])
@log_request
@validate_api_key
def messages():
try:
json_body = request.json
model = json_body.get('model', 'claude-3-opus-20240229')
stream = json_body.get('stream', True)
previous_messages = "\n\n".join([normalize_content(msg['content']) for msg in json_body['messages']])
input_tokens = calculate_tokens(previous_messages)
msg_id = str(uuid.uuid4())
response_event = Event()
response_text = []
if not stream:
return handle_non_stream(previous_messages, msg_id, model, input_tokens)
def generate():
try:
yield create_sse_event("message_start", {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"content": [],
"model": model,
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": input_tokens, "output_tokens": 1},
},
})
yield create_sse_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
yield create_sse_event("ping", {"type": "ping"})
sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
@sio.on('connect')
def on_connect():
app_logger.info("Connected to Perplexity AI")
emit_data = {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
sio.emit('perplexity_ask', (previous_messages, emit_data))
@sio.on('query_progress')
def on_query_progress(data):
if 'text' in data:
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
response_text.append(chunk)
yield create_sse_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": chunk},
})
if data.get('final', False):
response_event.set()
@sio.on('disconnect')
def on_disconnect():
app_logger.info("Disconnected from Perplexity AI")
response_event.set()
@sio.on('connect_error')
def on_connect_error(data):
app_logger.error(f"Connection error: {data}")
yield create_sse_event("error", {"type": "error", "message": f"Error connecting to Perplexity AI: {data}"})
response_event.set()
while not response_event.is_set():
sio.sleep(0.1)
output_tokens = calculate_tokens(''.join(response_text))
yield create_sse_event("content_block_stop", {"type": "content_block_stop", "index": 0})
yield create_sse_event("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"input_tokens": input_tokens, "output_tokens": output_tokens},
})
yield create_sse_event("message_stop", {"type": "message_stop"})
except Exception as e:
app_logger.error(f"Error in generate function: {str(e)}")
yield create_sse_event("error", {"type": "error", "message": str(e)})
finally:
if sio.connected:
sio.disconnect()
return Response(generate(), content_type='text/event-stream; charset=utf-8')
except Exception as e:
app_logger.error(f"Request error: {str(e)}")
return create_json_response({"error": str(e)}, 400)
def handle_non_stream(previous_messages, msg_id, model, input_tokens):
try:
response_event = Event()
response_text = []
sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
@sio.on('connect')
def on_connect():
app_logger.info("Connected to Perplexity AI")
emit_data = {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
sio.emit('perplexity_ask', (previous_messages, emit_data))
@sio.on('query_progress')
def on_query_progress(data):
if 'text' in data:
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
response_text.append(chunk)
if data.get('final', False):
response_event.set()
@sio.on('disconnect')
def on_disconnect():
app_logger.info("Disconnected from Perplexity AI")
response_event.set()
@sio.on('connect_error')
def on_connect_error(data):
app_logger.error(f"Connection error: {data}")
response_text.append(f"Error connecting to Perplexity AI: {data}")
response_event.set()
response_event.wait(timeout=30)
output_tokens = calculate_tokens(''.join(response_text))
full_response = {
"content": [{"text": ''.join(response_text), "type": "text"}],
"id": msg_id,
"model": model,
"role": "assistant",
"stop_reason": "end_turn",
"stop_sequence": None,
"type": "message",
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
},
}
return create_json_response(full_response)
except Exception as e:
app_logger.error(f"Error during socket connection: {str(e)}")
return create_json_response({"error": str(e)}, 500)
finally:
if sio.connected:
sio.disconnect()
@app.errorhandler(404)
def not_found(error):
return create_json_response({"error": "Not Found"}, 404)
@app.errorhandler(500)
def server_error(error):
app_logger.error(f"Server error: {str(error)}")
return create_json_response({"error": "Internal Server Error"}, 500)
def create_sse_event(event, data):
json_data = json.dumps(data, ensure_ascii=False)
event_str = f"event: {event}\ndata: {json_data}\n\n"
app_logger.debug(f"Sending SSE event: {event_str}")
return event_str
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8081))
app_logger.info(f"Perplexity proxy listening on port {port}")
if not API_KEY:
app_logger.warning("Warning: PPLX_KEY environment variable is not set. API key validation will fail.")
app.run(host='0.0.0.0', port=port, debug=False)