smgc commited on
Commit
601fe9d
1 Parent(s): a8a4ce9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +161 -143
app.py CHANGED
@@ -2,15 +2,14 @@ import os
2
  import json
3
  import uuid
4
  import time
5
- import asyncio
6
- import socketio
7
  import socketio
8
  import requests
9
  import logging
10
  import re
11
- from datetime import datetime
12
- from flask import Flask, request, Response, jsonify
13
- from threading import Event
14
 
15
  app = Flask(__name__)
16
 
@@ -51,6 +50,7 @@ else:
51
  transport = None
52
 
53
  sio = socketio.AsyncClient(http_session=transport, logger=False, engineio_logger=False)
 
54
  connect_opts = {'transports': ['websocket', 'polling']}
55
 
56
  sio_opts = {
@@ -126,7 +126,7 @@ def root():
126
  })
127
 
128
  @app.route('/ai/v1/messages', methods=['POST'])
129
- async def messages():
130
  auth_error = validate_api_key()
131
  if auth_error:
132
  return auth_error
@@ -145,11 +145,11 @@ async def messages():
145
  total_output_tokens = 0
146
 
147
  if not stream:
148
- return await handle_non_stream(previous_messages, msg_id, model, input_tokens)
149
 
150
  log_request(request.remote_addr, request.path, 200)
151
 
152
- async def generate():
153
  nonlocal total_output_tokens
154
  start_time = time.time()
155
  last_activity_time = start_time
@@ -179,10 +179,144 @@ async def messages():
179
  yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
180
  yield send_event("ping", {"type": "ping"})
181
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  @sio.event
183
  async def query_progress(data):
184
- nonlocal total_output_tokens, response_text, last_activity_time
185
- last_activity_time = time.time()
186
  if 'text' in data:
187
  text = json.loads(data['text'])
188
  chunk = text['chunks'][-1] if text['chunks'] else None
@@ -190,8 +324,8 @@ async def messages():
190
  response_text.append(chunk)
191
  chunk_tokens = calculate_tokens(chunk)
192
  total_output_tokens += chunk_tokens
193
- logger.info("Received chunk", extra={
194
- 'event_type': 'chunk_received',
195
  'data': {
196
  'chunk': chunk,
197
  'tokens': chunk_tokens,
@@ -200,17 +334,11 @@ async def messages():
200
  })
201
 
202
  if data.get('final', False):
203
- logger.info("Final response received", extra={
204
- 'event_type': 'response_complete',
205
- 'data': {
206
- 'total_tokens': total_output_tokens
207
- }
208
- })
209
  response_event.set()
210
 
211
  @sio.event
212
  async def connect():
213
- logger.info("Connected to Perplexity AI", extra={'event_type': 'connection_established'})
214
  emit_data = {
215
  "version": "2.9",
216
  "source": "default",
@@ -226,132 +354,27 @@ async def messages():
226
  "query_source": "home"
227
  }
228
  await sio.emit('perplexity_ask', (previous_messages, emit_data))
229
- logger.info("Sent query to Perplexity AI", extra={
230
- 'event_type': 'query_sent',
231
- 'data': {
232
- 'message': previous_messages[:100] + '...' if len(previous_messages) > 100 else previous_messages
233
- }
234
- })
235
-
236
- async def heartbeat():
237
- while not response_event.is_set():
238
- await sio.emit('ping')
239
- await asyncio.sleep(25)
240
 
241
  try:
242
  await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
243
- heartbeat_task = asyncio.create_task(heartbeat())
244
-
245
- while not response_event.is_set() and (time.time() - start_time) < timeout:
246
- current_time = time.time()
247
- if current_time - last_activity_time > 60: # 如果60秒内没有活动,记录警告
248
- logger.warning("No activity for 60 seconds", extra={'event_type': 'inactivity_warning'})
249
- await asyncio.sleep(1)
250
- while response_text:
251
- chunk = response_text.pop(0)
252
- yield send_event("content_block_delta", {
253
- "type": "content_block_delta",
254
- "index": 0,
255
- "delta": {"type": "text_delta", "text": chunk},
256
- })
257
-
258
- if not response_event.is_set():
259
- logger.warning(f"Request timed out after {timeout} seconds", extra={
260
- 'event_type': 'request_timeout',
261
- 'data': {
262
- 'timeout': timeout,
263
- 'input_tokens': input_tokens,
264
- 'output_tokens': total_output_tokens,
265
- 'elapsed_time': time.time() - start_time
266
- }
267
- })
268
- yield send_event("content_block_delta", {
269
- "type": "content_block_delta",
270
- "index": 0,
271
- "delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"},
272
- })
273
-
274
- except Exception as e:
275
- logger.error(f"Error during socket connection: {str(e)}", exc_info=True)
276
- yield send_event("content_block_delta", {
277
- "type": "content_block_delta",
278
- "index": 0,
279
- "delta": {"type": "text_delta", "text": f"Error during socket connection: {str(e)}"},
280
  })
281
  finally:
282
- heartbeat_task.cancel()
283
  if sio.connected:
284
  await sio.disconnect()
285
 
286
- yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
287
- yield send_event("message_delta", {
288
- "type": "message_delta",
289
- "delta": {"stop_reason": "end_turn", "stop_sequence": None},
290
- "usage": {"output_tokens": total_output_tokens},
291
- })
292
- yield send_event("message_stop", {"type": "message_stop"})
293
-
294
- return Response(generate(), content_type='text/event-stream')
295
-
296
- except Exception as e:
297
- logger.error(f"Request error: {str(e)}", exc_info=True)
298
- log_request(request.remote_addr, request.path, 400)
299
- return jsonify({"error": str(e)}), 400
300
-
301
- def handle_non_stream(previous_messages, msg_id, model, input_tokens):
302
- try:
303
- response_event = Event()
304
- response_text = []
305
- total_output_tokens = 0
306
- start_time = time.time()
307
- timeout = max(60, input_tokens / 1000) # 动态设置超时时间,最少60秒
308
-
309
- def on_query_progress(data):
310
- nonlocal total_output_tokens, response_text
311
- if 'text' in data:
312
- text = json.loads(data['text'])
313
- chunk = text['chunks'][-1] if text['chunks'] else None
314
- if chunk:
315
- response_text.append(chunk)
316
- chunk_tokens = calculate_tokens(chunk)
317
- total_output_tokens += chunk_tokens
318
- logger.info("Received chunk (non-stream)", extra={
319
- 'event_type': 'chunk_received_non_stream',
320
- 'data': {
321
- 'chunk': chunk,
322
- 'tokens': chunk_tokens,
323
- 'total_tokens': total_output_tokens
324
- }
325
- })
326
-
327
- if data.get('final', False):
328
- response_event.set()
329
-
330
- def on_connect():
331
- logger.info("Connected to Perplexity AI (non-stream)", extra={'event_type': 'connection_established_non_stream'})
332
- emit_data = {
333
- "version": "2.9",
334
- "source": "default",
335
- "attachments": [],
336
- "language": "en-GB",
337
- "timezone": "Europe/London",
338
- "mode": "concise",
339
- "is_related_query": False,
340
- "is_default_related_query": False,
341
- "visitor_id": str(uuid.uuid4()),
342
- "frontend_context_uuid": str(uuid.uuid4()),
343
- "prompt_source": "user",
344
- "query_source": "home"
345
- }
346
- sio.emit('perplexity_ask', (previous_messages, emit_data))
347
-
348
- sio.on('connect', on_connect)
349
- sio.on('query_progress', on_query_progress)
350
-
351
- sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
352
-
353
- # Wait for response with timeout
354
- response_event.wait(timeout=timeout)
355
 
356
  if not response_text:
357
  logger.warning(f"No response received (non-stream) after {timeout} seconds", extra={
@@ -389,9 +412,6 @@ def handle_non_stream(previous_messages, msg_id, model, input_tokens):
389
  except Exception as e:
390
  logger.error(f"Error during non-stream socket connection: {str(e)}", exc_info=True)
391
  return jsonify({"error": str(e)}), 500
392
- finally:
393
- if sio.connected:
394
- sio.disconnect()
395
 
396
  @app.errorhandler(404)
397
  def not_found(error):
@@ -413,6 +433,4 @@ if __name__ == '__main__':
413
  })
414
  if not API_KEY:
415
  logger.warning("PPLX_KEY environment variable is not set", extra={'event_type': 'config_warning'})
416
- app.run(host='0.0.0.0', port=port, debug=True)
417
-
418
-
 
2
  import json
3
  import uuid
4
  import time
5
+ from datetime import datetime
6
+ from flask import Flask, request, Response, jsonify
7
  import socketio
8
  import requests
9
  import logging
10
  import re
11
+ import asyncio
12
+ from threading import Thread
 
13
 
14
  app = Flask(__name__)
15
 
 
50
  transport = None
51
 
52
  sio = socketio.AsyncClient(http_session=transport, logger=False, engineio_logger=False)
53
+
54
  connect_opts = {'transports': ['websocket', 'polling']}
55
 
56
  sio_opts = {
 
126
  })
127
 
128
  @app.route('/ai/v1/messages', methods=['POST'])
129
+ def messages():
130
  auth_error = validate_api_key()
131
  if auth_error:
132
  return auth_error
 
145
  total_output_tokens = 0
146
 
147
  if not stream:
148
+ return handle_non_stream(previous_messages, msg_id, model, input_tokens)
149
 
150
  log_request(request.remote_addr, request.path, 200)
151
 
152
+ def generate():
153
  nonlocal total_output_tokens
154
  start_time = time.time()
155
  last_activity_time = start_time
 
179
  yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
180
  yield send_event("ping", {"type": "ping"})
181
 
182
+ async def run_socket_io():
183
+ @sio.event
184
+ async def query_progress(data):
185
+ nonlocal total_output_tokens, response_text, last_activity_time
186
+ last_activity_time = time.time()
187
+ if 'text' in data:
188
+ text = json.loads(data['text'])
189
+ chunk = text['chunks'][-1] if text['chunks'] else None
190
+ if chunk:
191
+ response_text.append(chunk)
192
+ chunk_tokens = calculate_tokens(chunk)
193
+ total_output_tokens += chunk_tokens
194
+ logger.info("Received chunk", extra={
195
+ 'event_type': 'chunk_received',
196
+ 'data': {
197
+ 'chunk': chunk,
198
+ 'tokens': chunk_tokens,
199
+ 'total_tokens': total_output_tokens
200
+ }
201
+ })
202
+
203
+ if data.get('final', False):
204
+ logger.info("Final response received", extra={
205
+ 'event_type': 'response_complete',
206
+ 'data': {
207
+ 'total_tokens': total_output_tokens
208
+ }
209
+ })
210
+ response_event.set()
211
+
212
+ @sio.event
213
+ async def connect():
214
+ logger.info("Connected to Perplexity AI", extra={'event_type': 'connection_established'})
215
+ emit_data = {
216
+ "version": "2.9",
217
+ "source": "default",
218
+ "attachments": [],
219
+ "language": "en-GB",
220
+ "timezone": "Europe/London",
221
+ "mode": "concise",
222
+ "is_related_query": False,
223
+ "is_default_related_query": False,
224
+ "visitor_id": str(uuid.uuid4()),
225
+ "frontend_context_uuid": str(uuid.uuid4()),
226
+ "prompt_source": "user",
227
+ "query_source": "home"
228
+ }
229
+ await sio.emit('perplexity_ask', (previous_messages, emit_data))
230
+ logger.info("Sent query to Perplexity AI", extra={
231
+ 'event_type': 'query_sent',
232
+ 'data': {
233
+ 'message': previous_messages[:100] + '...' if len(previous_messages) > 100 else previous_messages
234
+ }
235
+ })
236
+
237
+ async def heartbeat():
238
+ while not response_event.is_set():
239
+ await sio.emit('ping')
240
+ await asyncio.sleep(25)
241
+
242
+ try:
243
+ await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
244
+ heartbeat_task = asyncio.create_task(heartbeat())
245
+
246
+ while not response_event.is_set() and (time.time() - start_time) < timeout:
247
+ current_time = time.time()
248
+ if current_time - last_activity_time > 60: # 如果60秒内没有活动,记录警告
249
+ logger.warning("No activity for 60 seconds", extra={'event_type': 'inactivity_warning'})
250
+ await asyncio.sleep(1)
251
+
252
+ if not response_event.is_set():
253
+ logger.warning(f"Request timed out after {timeout} seconds", extra={
254
+ 'event_type': 'request_timeout',
255
+ 'data': {
256
+ 'timeout': timeout,
257
+ 'input_tokens': input_tokens,
258
+ 'output_tokens': total_output_tokens,
259
+ 'elapsed_time': time.time() - start_time
260
+ }
261
+ })
262
+
263
+ except Exception as e:
264
+ logger.error(f"Error during socket connection: {str(e)}", exc_info=True)
265
+ finally:
266
+ heartbeat_task.cancel()
267
+ if sio.connected:
268
+ await sio.disconnect()
269
+
270
+ # 在新线程中运行异步Socket.IO
271
+ thread = Thread(target=lambda: asyncio.run(run_socket_io()))
272
+ thread.start()
273
+
274
+ while not response_event.is_set() and (time.time() - start_time) < timeout:
275
+ time.sleep(0.1)
276
+ while response_text:
277
+ chunk = response_text.pop(0)
278
+ yield send_event("content_block_delta", {
279
+ "type": "content_block_delta",
280
+ "index": 0,
281
+ "delta": {"type": "text_delta", "text": chunk},
282
+ })
283
+
284
+ if not response_event.is_set():
285
+ yield send_event("content_block_delta", {
286
+ "type": "content_block_delta",
287
+ "index": 0,
288
+ "delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"},
289
+ })
290
+
291
+ thread.join() # 等待Socket.IO线程完成
292
+
293
+ yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
294
+ yield send_event("message_delta", {
295
+ "type": "message_delta",
296
+ "delta": {"stop_reason": "end_turn", "stop_sequence": None},
297
+ "usage": {"output_tokens": total_output_tokens},
298
+ })
299
+ yield send_event("message_stop", {"type": "message_stop"})
300
+
301
+ return Response(generate(), content_type='text/event-stream')
302
+
303
+ except Exception as e:
304
+ logger.error(f"Request error: {str(e)}", exc_info=True)
305
+ log_request(request.remote_addr, request.path, 400)
306
+ return jsonify({"error": str(e)}), 400
307
+
308
+ def handle_non_stream(previous_messages, msg_id, model, input_tokens):
309
+ try:
310
+ response_event = asyncio.Event()
311
+ response_text = []
312
+ total_output_tokens = 0
313
+ start_time = time.time()
314
+ timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
315
+
316
+ async def run_socket_io():
317
  @sio.event
318
  async def query_progress(data):
319
+ nonlocal total_output_tokens, response_text
 
320
  if 'text' in data:
321
  text = json.loads(data['text'])
322
  chunk = text['chunks'][-1] if text['chunks'] else None
 
324
  response_text.append(chunk)
325
  chunk_tokens = calculate_tokens(chunk)
326
  total_output_tokens += chunk_tokens
327
+ logger.info("Received chunk (non-stream)", extra={
328
+ 'event_type': 'chunk_received_non_stream',
329
  'data': {
330
  'chunk': chunk,
331
  'tokens': chunk_tokens,
 
334
  })
335
 
336
  if data.get('final', False):
 
 
 
 
 
 
337
  response_event.set()
338
 
339
  @sio.event
340
  async def connect():
341
+ logger.info("Connected to Perplexity AI (non-stream)", extra={'event_type': 'connection_established_non_stream'})
342
  emit_data = {
343
  "version": "2.9",
344
  "source": "default",
 
354
  "query_source": "home"
355
  }
356
  await sio.emit('perplexity_ask', (previous_messages, emit_data))
 
 
 
 
 
 
 
 
 
 
 
357
 
358
  try:
359
  await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
360
+ await asyncio.wait_for(response_event.wait(), timeout=timeout)
361
+ except asyncio.TimeoutError:
362
+ logger.warning(f"Request timed out after {timeout} seconds (non-stream)", extra={
363
+ 'event_type': 'request_timeout_non_stream',
364
+ 'data': {
365
+ 'timeout': timeout,
366
+ 'input_tokens': input_tokens,
367
+ 'elapsed_time': time.time() - start_time
368
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
369
  })
370
  finally:
 
371
  if sio.connected:
372
  await sio.disconnect()
373
 
374
+ # 在新线程中运行异步Socket.IO
375
+ thread = Thread(target=lambda: asyncio.run(run_socket_io()))
376
+ thread.start()
377
+ thread.join()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
378
 
379
  if not response_text:
380
  logger.warning(f"No response received (non-stream) after {timeout} seconds", extra={
 
412
  except Exception as e:
413
  logger.error(f"Error during non-stream socket connection: {str(e)}", exc_info=True)
414
  return jsonify({"error": str(e)}), 500
 
 
 
415
 
416
  @app.errorhandler(404)
417
  def not_found(error):
 
433
  })
434
  if not API_KEY:
435
  logger.warning("PPLX_KEY environment variable is not set", extra={'event_type': 'config_warning'})
436
+ app.run(host='0.0.0.0', port=port)