pplx2api / app.py
smgc's picture
Update app.py
21a614b verified
raw
history blame
15.9 kB
import os
import json
import uuid
from datetime import datetime
from flask import Flask, request, Response, jsonify
import socketio
import requests
import logging
from threading import Event, Timer
import re
import math
app = Flask(__name__)
# 自定义日志格式化器
class CustomFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"message": self.remove_ansi_escape(record.getMessage()),
}
if hasattr(record, 'event_type'):
log_data['event_type'] = record.event_type
if hasattr(record, 'data'):
log_data['data'] = record.data
return json.dumps(log_data, ensure_ascii=False, indent=2)
def remove_ansi_escape(self, text):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(CustomFormatter())
logger.addHandler(handler)
logger = logging.getLogger(__name__)
# 从环境变量中获取API密钥
API_KEY = os.environ.get('PPLX_KEY')
# 代理设置
proxy_url = os.environ.get('PROXY_URL')
# 设置代理
if proxy_url:
proxies = {
'http': proxy_url,
'https': proxy_url
}
transport = requests.Session()
transport.proxies.update(proxies)
else:
transport = None
sio = socketio.Client(http_session=transport, logger=False, engineio_logger=False)
# 连接选项
connect_opts = {
'transports': ['websocket', 'polling'],
}
# 其他选项
sio_opts = {
'extraHeaders': {
'Cookie': os.environ.get('PPLX_COOKIE'),
'User-Agent': os.environ.get('USER_AGENT'),
'Accept': '*/*',
'priority': 'u=1, i',
'Referer': 'https://www.perplexity.ai/',
}
}
def log_request(ip, route, status):
timestamp = datetime.now().isoformat()
logger.info(f"Request logged", extra={
'event_type': 'request_log',
'data': {
'ip': ip,
'route': route,
'status': status,
'timestamp': timestamp
}
})
def validate_api_key():
api_key = request.headers.get('x-api-key')
if api_key != API_KEY:
log_request(request.remote_addr, request.path, 401)
return jsonify({"error": "Invalid API key"}), 401
return None
def normalize_content(content):
if isinstance(content, str):
return content
elif isinstance(content, dict):
return json.dumps(content, ensure_ascii=False)
elif isinstance(content, list):
return " ".join([normalize_content(item) for item in content])
else:
return ""
def calculate_tokens(text):
if re.search(r'[^\x00-\x7F]', text):
return len(text)
else:
tokens = text.split()
return len(tokens)
def create_event(event, data):
if isinstance(data, dict):
data = json.dumps(data, ensure_ascii=False)
return f"event: {event}\ndata: {data}\n\n"
@app.route('/')
def root():
log_request(request.remote_addr, request.path, 200)
return jsonify({
"message": "Welcome to the Perplexity AI Proxy API",
"endpoints": {
"/ai/v1/messages": {
"method": "POST",
"description": "Send a message to the AI",
"headers": {
"x-api-key": "Your API key (required)",
"Content-Type": "application/json"
},
"body": {
"messages": "Array of message objects",
"stream": "Boolean (true for streaming response)",
"model": "Model to be used (optional, defaults to claude-3-opus-20240229)"
}
}
}
})
# 在文件开头添加这个函数
def split_messages(messages, max_tokens_per_chunk=8000):
chunks = []
current_chunk = []
current_chunk_tokens = 0
for message in messages:
message_tokens = calculate_tokens(message['content'])
if current_chunk_tokens + message_tokens > max_tokens_per_chunk and current_chunk:
chunks.append(current_chunk)
current_chunk = []
current_chunk_tokens = 0
current_chunk.append(message)
current_chunk_tokens += message_tokens
if current_chunk:
chunks.append(current_chunk)
return chunks
@app.route('/ai/v1/messages', methods=['POST'])
def messages():
auth_error = validate_api_key()
if auth_error:
return auth_error
try:
json_body = request.json
model = json_body.get('model', 'claude-3-opus-20240229')
stream = json_body.get('stream', True)
messages = json_body.get('messages', [])
# 分块处理
chunks = split_messages(messages)
total_chunks = len(chunks)
logger.info(f"Input split into {total_chunks} chunks", extra={
'event_type': 'input_split',
'data': {'total_chunks': total_chunks}
})
msg_id = str(uuid.uuid4())
total_input_tokens = sum(calculate_tokens(msg['content']) for msg in messages)
total_output_tokens = 0
full_response = []
def send_event(event_type, data):
event = create_event(event_type, data)
logger.info(f"Sending {event_type} event", extra={
'event_type': event_type,
'data': {'content': event}
})
return event
def generate():
nonlocal total_output_tokens
yield send_event("message_start", {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"model": model,
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": total_input_tokens, "output_tokens": 0},
},
})
yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
for chunk_index, chunk in enumerate(chunks):
chunk_input = "\n\n".join([normalize_content(msg['content']) for msg in chunk])
chunk_input_tokens = calculate_tokens(chunk_input)
response_event = Event()
timeout_event = Event()
response_text = []
# 动态调整超时时间
timeout_seconds = max(30, min(300, chunk_input_tokens // 1000 * 30))
yield send_event("chunk_start", {
"type": "chunk_start",
"chunk_index": chunk_index,
"total_chunks": total_chunks,
})
def on_query_progress(data):
nonlocal response_text
if 'text' in data:
text = json.loads(data['text'])
new_chunk = text['chunks'][-1] if text['chunks'] else None
if new_chunk:
response_text.append(new_chunk)
if data.get('final', False):
response_event.set()
def on_connect():
logger.info(f"Connected to Perplexity AI for chunk {chunk_index + 1}/{total_chunks}", extra={
'event_type': 'chunk_connection_established',
'data': {'chunk_index': chunk_index, 'total_chunks': total_chunks}
})
sio.emit('perplexity_ask', (chunk_input, get_emit_data()))
sio.on('connect', on_connect)
sio.on('query_progress', on_query_progress)
def timeout_handler():
logger.warning(f"Chunk {chunk_index + 1}/{total_chunks} timed out after {timeout_seconds} seconds", extra={
'event_type': 'chunk_timeout',
'data': {'chunk_index': chunk_index, 'total_chunks': total_chunks, 'timeout_seconds': timeout_seconds}
})
timeout_event.set()
response_event.set()
timer = Timer(timeout_seconds, timeout_handler)
timer.start()
try:
sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
while not response_event.is_set() and not timeout_event.is_set():
sio.sleep(0.1)
while response_text:
new_chunk = response_text.pop(0)
full_response.append(new_chunk)
chunk_tokens = calculate_tokens(new_chunk)
total_output_tokens += chunk_tokens
yield send_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": new_chunk},
})
except Exception as e:
logger.error(f"Error during chunk {chunk_index + 1}/{total_chunks} processing: {str(e)}", exc_info=True)
yield send_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": f"Error processing chunk {chunk_index + 1}/{total_chunks}: {str(e)}"},
})
finally:
timer.cancel()
if sio.connected:
sio.disconnect()
yield send_event("chunk_end", {
"type": "chunk_end",
"chunk_index": chunk_index,
"total_chunks": total_chunks,
})
yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
yield send_event("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"input_tokens": total_input_tokens, "output_tokens": total_output_tokens},
})
yield send_event("message_stop", {"type": "message_stop"})
if stream:
return Response(generate(), content_type='text/event-stream')
else:
# 非流式处理
full_response = []
for event in generate():
if 'content_block_delta' in event:
data = json.loads(event.split('data: ')[1])
full_response.append(data['delta']['text'])
return jsonify({
"content": [{"text": ''.join(full_response), "type": "text"}],
"id": msg_id,
"model": model,
"role": "assistant",
"stop_reason": "end_turn",
"stop_sequence": None,
"type": "message",
"usage": {
"input_tokens": total_input_tokens,
"output_tokens": total_output_tokens,
},
})
except Exception as e:
logger.error(f"Request error: {str(e)}", exc_info=True)
log_request(request.remote_addr, request.path, 500)
return jsonify({"error": str(e)}), 500
def get_emit_data():
return {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
def handle_non_stream(previous_messages, msg_id, model, input_tokens):
try:
response_event = Event()
response_text = []
total_output_tokens = 0
def on_query_progress(data):
nonlocal total_output_tokens, response_text
if 'text' in data:
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
response_text.append(chunk)
chunk_tokens = calculate_tokens(chunk)
total_output_tokens += chunk_tokens
if data.get('final', False):
response_event.set()
def on_connect():
logger.info("Connected to Perplexity AI (non-stream)", extra={'event_type': 'connection_established_non_stream'})
emit_data = {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
}
sio.emit('perplexity_ask', (previous_messages, emit_data))
sio.on('connect', on_connect)
sio.on('query_progress', on_query_progress)
sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
# Wait for response with timeout
response_event.wait(timeout=30)
if not response_text:
logger.warning("No response received (non-stream)", extra={'event_type': 'no_response_non_stream'})
return jsonify({"error": "No response received"}), 504
full_response = {
"content": [{"text": ''.join(response_text), "type": "text"}],
"id": msg_id,
"model": model,
"role": "assistant",
"stop_reason": "end_turn",
"stop_sequence": None,
"type": "message",
"usage": {
"input_tokens": input_tokens,
"output_tokens": total_output_tokens,
},
}
logger.info("Sending non-stream response", extra={
'event_type': 'non_stream_response',
'data': {'content': full_response}
})
return Response(json.dumps(full_response, ensure_ascii=False), content_type='application/json')
except Exception as e:
logger.error(f"Error during non-stream socket connection: {str(e)}", exc_info=True)
return jsonify({"error": str(e)}), 500
finally:
if sio.connected:
sio.disconnect()
@app.errorhandler(404)
def not_found(error):
log_request(request.remote_addr, request.path, 404)
return "Not Found", 404
@app.errorhandler(500)
def server_error(error):
logger.error(f"Server error: {str(error)}", exc_info=True)
log_request(request.remote_addr, request.path, 500)
return "Something broke!", 500
if __name__ == '__main__':
setup_logging()
port = int(os.environ.get('PORT', 8081))
logger.info(f"Perplexity proxy starting", extra={
'event_type': 'server_start',
'data': {'port': port}
})
if not API_KEY:
logger.warning("PPLX_KEY environment variable is not set", extra={'event_type': 'config_warning'})
app.run(host='0.0.0.0', port=port)