smgc commited on
Commit
b0eee7b
1 Parent(s): 601fe9d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +124 -124
app.py CHANGED
@@ -9,7 +9,9 @@ import requests
9
  import logging
10
  import re
11
  import asyncio
12
- from threading import Thread
 
 
13
 
14
  app = Flask(__name__)
15
 
@@ -140,46 +142,48 @@ def messages():
140
  input_tokens = calculate_tokens(previous_messages)
141
 
142
  msg_id = str(uuid.uuid4())
143
- response_event = asyncio.Event()
144
- response_text = []
145
- total_output_tokens = 0
146
-
147
  if not stream:
148
  return handle_non_stream(previous_messages, msg_id, model, input_tokens)
149
 
150
  log_request(request.remote_addr, request.path, 200)
151
 
152
  def generate():
153
- nonlocal total_output_tokens
154
- start_time = time.time()
155
- last_activity_time = start_time
156
- timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
157
-
158
- def send_event(event_type, data):
159
- event = create_event(event_type, data)
160
- logger.info(f"Sending {event_type} event", extra={
161
- 'event_type': event_type,
162
- 'data': {'content': event}
163
- })
164
- return event
165
-
166
- yield send_event("message_start", {
167
- "type": "message_start",
168
- "message": {
169
- "id": msg_id,
170
- "type": "message",
171
- "role": "assistant",
172
- "model": model,
173
- "content": [],
174
- "stop_reason": None,
175
- "stop_sequence": None,
176
- "usage": {"input_tokens": input_tokens, "output_tokens": total_output_tokens},
177
- },
178
- })
179
- yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
180
- yield send_event("ping", {"type": "ping"})
181
 
182
  async def run_socket_io():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
  @sio.event
184
  async def query_progress(data):
185
  nonlocal total_output_tokens, response_text, last_activity_time
@@ -247,7 +251,14 @@ def messages():
247
  current_time = time.time()
248
  if current_time - last_activity_time > 60: # 如果60秒内没有活动,记录警告
249
  logger.warning("No activity for 60 seconds", extra={'event_type': 'inactivity_warning'})
250
- await asyncio.sleep(1)
 
 
 
 
 
 
 
251
 
252
  if not response_event.is_set():
253
  logger.warning(f"Request timed out after {timeout} seconds", extra={
@@ -259,44 +270,33 @@ def messages():
259
  'elapsed_time': time.time() - start_time
260
  }
261
  })
 
 
 
 
 
262
 
263
  except Exception as e:
264
  logger.error(f"Error during socket connection: {str(e)}", exc_info=True)
265
- finally:
266
- heartbeat_task.cancel()
267
- if sio.connected:
268
- await sio.disconnect()
269
-
270
- # 在新线程中运行异步Socket.IO
271
- thread = Thread(target=lambda: asyncio.run(run_socket_io()))
272
- thread.start()
273
-
274
- while not response_event.is_set() and (time.time() - start_time) < timeout:
275
- time.sleep(0.1)
276
- while response_text:
277
- chunk = response_text.pop(0)
278
  yield send_event("content_block_delta", {
279
  "type": "content_block_delta",
280
  "index": 0,
281
- "delta": {"type": "text_delta", "text": chunk},
282
  })
 
 
 
 
283
 
284
- if not response_event.is_set():
285
- yield send_event("content_block_delta", {
286
- "type": "content_block_delta",
287
- "index": 0,
288
- "delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"},
289
  })
 
290
 
291
- thread.join() # 等待Socket.IO线程完成
292
-
293
- yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
294
- yield send_event("message_delta", {
295
- "type": "message_delta",
296
- "delta": {"stop_reason": "end_turn", "stop_sequence": None},
297
- "usage": {"output_tokens": total_output_tokens},
298
- })
299
- yield send_event("message_stop", {"type": "message_stop"})
300
 
301
  return Response(generate(), content_type='text/event-stream')
302
 
@@ -306,75 +306,71 @@ def messages():
306
  return jsonify({"error": str(e)}), 400
307
 
308
  def handle_non_stream(previous_messages, msg_id, model, input_tokens):
309
- try:
310
- response_event = asyncio.Event()
 
 
311
  response_text = []
312
  total_output_tokens = 0
313
  start_time = time.time()
314
  timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
315
 
316
- async def run_socket_io():
317
- @sio.event
318
- async def query_progress(data):
319
- nonlocal total_output_tokens, response_text
320
- if 'text' in data:
321
- text = json.loads(data['text'])
322
- chunk = text['chunks'][-1] if text['chunks'] else None
323
- if chunk:
324
- response_text.append(chunk)
325
- chunk_tokens = calculate_tokens(chunk)
326
- total_output_tokens += chunk_tokens
327
- logger.info("Received chunk (non-stream)", extra={
328
- 'event_type': 'chunk_received_non_stream',
329
- 'data': {
330
- 'chunk': chunk,
331
- 'tokens': chunk_tokens,
332
- 'total_tokens': total_output_tokens
333
- }
334
- })
335
 
336
- if data.get('final', False):
337
- response_event.set()
338
-
339
- @sio.event
340
- async def connect():
341
- logger.info("Connected to Perplexity AI (non-stream)", extra={'event_type': 'connection_established_non_stream'})
342
- emit_data = {
343
- "version": "2.9",
344
- "source": "default",
345
- "attachments": [],
346
- "language": "en-GB",
347
- "timezone": "Europe/London",
348
- "mode": "concise",
349
- "is_related_query": False,
350
- "is_default_related_query": False,
351
- "visitor_id": str(uuid.uuid4()),
352
- "frontend_context_uuid": str(uuid.uuid4()),
353
- "prompt_source": "user",
354
- "query_source": "home"
 
 
 
 
 
 
 
 
 
 
 
 
 
355
  }
356
- await sio.emit('perplexity_ask', (previous_messages, emit_data))
357
-
358
- try:
359
- await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
360
- await asyncio.wait_for(response_event.wait(), timeout=timeout)
361
- except asyncio.TimeoutError:
362
- logger.warning(f"Request timed out after {timeout} seconds (non-stream)", extra={
363
- 'event_type': 'request_timeout_non_stream',
364
- 'data': {
365
- 'timeout': timeout,
366
- 'input_tokens': input_tokens,
367
- 'elapsed_time': time.time() - start_time
368
- }
369
- })
370
- finally:
371
- if sio.connected:
372
- await sio.disconnect()
373
-
374
- # 在新线程中运行异步Socket.IO
375
- thread = Thread(target=lambda: asyncio.run(run_socket_io()))
376
- thread.start()
377
- thread.join()
378
 
379
  if not response_text:
380
  logger.warning(f"No response received (non-stream) after {timeout} seconds", extra={
@@ -409,9 +405,13 @@ def handle_non_stream(previous_messages, msg_id, model, input_tokens):
409
  })
410
  return Response(json.dumps(full_response, ensure_ascii=False), content_type='application/json')
411
 
 
 
412
  except Exception as e:
413
  logger.error(f"Error during non-stream socket connection: {str(e)}", exc_info=True)
414
  return jsonify({"error": str(e)}), 500
 
 
415
 
416
  @app.errorhandler(404)
417
  def not_found(error):
 
9
  import logging
10
  import re
11
  import asyncio
12
+ from concurrent.futures import ThreadPoolExecutor
13
+
14
+ thread_pool = ThreadPoolExecutor(max_workers=10)
15
 
16
  app = Flask(__name__)
17
 
 
142
  input_tokens = calculate_tokens(previous_messages)
143
 
144
  msg_id = str(uuid.uuid4())
145
+
 
 
 
146
  if not stream:
147
  return handle_non_stream(previous_messages, msg_id, model, input_tokens)
148
 
149
  log_request(request.remote_addr, request.path, 200)
150
 
151
  def generate():
152
+ loop = asyncio.new_event_loop()
153
+ asyncio.set_event_loop(loop)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
 
155
  async def run_socket_io():
156
+ response_event = asyncio.Event()
157
+ response_text = []
158
+ total_output_tokens = 0
159
+ start_time = time.time()
160
+ last_activity_time = start_time
161
+ timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
162
+
163
+ def send_event(event_type, data):
164
+ event = create_event(event_type, data)
165
+ logger.info(f"Sending {event_type} event", extra={
166
+ 'event_type': event_type,
167
+ 'data': {'content': event}
168
+ })
169
+ return event
170
+
171
+ yield send_event("message_start", {
172
+ "type": "message_start",
173
+ "message": {
174
+ "id": msg_id,
175
+ "type": "message",
176
+ "role": "assistant",
177
+ "model": model,
178
+ "content": [],
179
+ "stop_reason": None,
180
+ "stop_sequence": None,
181
+ "usage": {"input_tokens": input_tokens, "output_tokens": total_output_tokens},
182
+ },
183
+ })
184
+ yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
185
+ yield send_event("ping", {"type": "ping"})
186
+
187
  @sio.event
188
  async def query_progress(data):
189
  nonlocal total_output_tokens, response_text, last_activity_time
 
251
  current_time = time.time()
252
  if current_time - last_activity_time > 60: # 如果60秒内没有活动,记录警告
253
  logger.warning("No activity for 60 seconds", extra={'event_type': 'inactivity_warning'})
254
+ await asyncio.sleep(0.1)
255
+ while response_text:
256
+ chunk = response_text.pop(0)
257
+ yield send_event("content_block_delta", {
258
+ "type": "content_block_delta",
259
+ "index": 0,
260
+ "delta": {"type": "text_delta", "text": chunk},
261
+ })
262
 
263
  if not response_event.is_set():
264
  logger.warning(f"Request timed out after {timeout} seconds", extra={
 
270
  'elapsed_time': time.time() - start_time
271
  }
272
  })
273
+ yield send_event("content_block_delta", {
274
+ "type": "content_block_delta",
275
+ "index": 0,
276
+ "delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"},
277
+ })
278
 
279
  except Exception as e:
280
  logger.error(f"Error during socket connection: {str(e)}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
281
  yield send_event("content_block_delta", {
282
  "type": "content_block_delta",
283
  "index": 0,
284
+ "delta": {"type": "text_delta", "text": f"Error during socket connection: {str(e)}"},
285
  })
286
+ finally:
287
+ heartbeat_task.cancel()
288
+ if sio.connected:
289
+ await sio.disconnect()
290
 
291
+ yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
292
+ yield send_event("message_delta", {
293
+ "type": "message_delta",
294
+ "delta": {"stop_reason": "end_turn", "stop_sequence": None},
295
+ "usage": {"output_tokens": total_output_tokens},
296
  })
297
+ yield send_event("message_stop", {"type": "message_stop"})
298
 
299
+ return loop.run_until_complete(run_socket_io())
 
 
 
 
 
 
 
 
300
 
301
  return Response(generate(), content_type='text/event-stream')
302
 
 
306
  return jsonify({"error": str(e)}), 400
307
 
308
  def handle_non_stream(previous_messages, msg_id, model, input_tokens):
309
+ loop = asyncio.new_event_loop()
310
+ asyncio.set_event_loop(loop)
311
+
312
+ async def run_non_stream():
313
  response_text = []
314
  total_output_tokens = 0
315
  start_time = time.time()
316
  timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
317
 
318
+ @sio.event
319
+ async def query_progress(data):
320
+ nonlocal total_output_tokens, response_text
321
+ if 'text' in data:
322
+ text = json.loads(data['text'])
323
+ chunk = text['chunks'][-1] if text['chunks'] else None
324
+ if chunk:
325
+ response_text.append(chunk)
326
+ chunk_tokens = calculate_tokens(chunk)
327
+ total_output_tokens += chunk_tokens
328
+ logger.info("Received chunk (non-stream)", extra={
329
+ 'event_type': 'chunk_received_non_stream',
330
+ 'data': {
331
+ 'chunk': chunk,
332
+ 'tokens': chunk_tokens,
333
+ 'total_tokens': total_output_tokens
334
+ }
335
+ })
 
336
 
337
+ if data.get('final', False):
338
+ return True
339
+
340
+ @sio.event
341
+ async def connect():
342
+ logger.info("Connected to Perplexity AI (non-stream)", extra={'event_type': 'connection_established_non_stream'})
343
+ emit_data = {
344
+ "version": "2.9",
345
+ "source": "default",
346
+ "attachments": [],
347
+ "language": "en-GB",
348
+ "timezone": "Europe/London",
349
+ "mode": "concise",
350
+ "is_related_query": False,
351
+ "is_default_related_query": False,
352
+ "visitor_id": str(uuid.uuid4()),
353
+ "frontend_context_uuid": str(uuid.uuid4()),
354
+ "prompt_source": "user",
355
+ "query_source": "home"
356
+ }
357
+ await sio.emit('perplexity_ask', (previous_messages, emit_data))
358
+
359
+ try:
360
+ await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
361
+ await asyncio.wait_for(sio.wait(), timeout=timeout)
362
+ except asyncio.TimeoutError:
363
+ logger.warning(f"Request timed out after {timeout} seconds (non-stream)", extra={
364
+ 'event_type': 'request_timeout_non_stream',
365
+ 'data': {
366
+ 'timeout': timeout,
367
+ 'input_tokens': input_tokens,
368
+ 'elapsed_time': time.time() - start_time
369
  }
370
+ })
371
+ finally:
372
+ if sio.connected:
373
+ await sio.disconnect()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
374
 
375
  if not response_text:
376
  logger.warning(f"No response received (non-stream) after {timeout} seconds", extra={
 
405
  })
406
  return Response(json.dumps(full_response, ensure_ascii=False), content_type='application/json')
407
 
408
+ try:
409
+ return loop.run_until_complete(run_non_stream())
410
  except Exception as e:
411
  logger.error(f"Error during non-stream socket connection: {str(e)}", exc_info=True)
412
  return jsonify({"error": str(e)}), 500
413
+ finally:
414
+ loop.close()
415
 
416
  @app.errorhandler(404)
417
  def not_found(error):