smgc commited on
Commit
da3156c
1 Parent(s): b0eee7b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +136 -137
app.py CHANGED
@@ -9,9 +9,7 @@ import requests
9
  import logging
10
  import re
11
  import asyncio
12
- from concurrent.futures import ThreadPoolExecutor
13
-
14
- thread_pool = ThreadPoolExecutor(max_workers=10)
15
 
16
  app = Flask(__name__)
17
 
@@ -148,157 +146,159 @@ def messages():
148
 
149
  log_request(request.remote_addr, request.path, 200)
150
 
151
- def generate():
152
- loop = asyncio.new_event_loop()
153
- asyncio.set_event_loop(loop)
154
-
155
- async def run_socket_io():
156
- response_event = asyncio.Event()
157
- response_text = []
158
- total_output_tokens = 0
159
- start_time = time.time()
160
- last_activity_time = start_time
161
- timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
162
-
163
- def send_event(event_type, data):
164
- event = create_event(event_type, data)
165
- logger.info(f"Sending {event_type} event", extra={
166
- 'event_type': event_type,
167
- 'data': {'content': event}
168
- })
169
- return event
170
-
171
- yield send_event("message_start", {
172
- "type": "message_start",
173
- "message": {
174
- "id": msg_id,
175
- "type": "message",
176
- "role": "assistant",
177
- "model": model,
178
- "content": [],
179
- "stop_reason": None,
180
- "stop_sequence": None,
181
- "usage": {"input_tokens": input_tokens, "output_tokens": total_output_tokens},
182
- },
183
  })
184
- yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
185
- yield send_event("ping", {"type": "ping"})
186
-
187
- @sio.event
188
- async def query_progress(data):
189
- nonlocal total_output_tokens, response_text, last_activity_time
190
- last_activity_time = time.time()
191
- if 'text' in data:
192
- text = json.loads(data['text'])
193
- chunk = text['chunks'][-1] if text['chunks'] else None
194
- if chunk:
195
- response_text.append(chunk)
196
- chunk_tokens = calculate_tokens(chunk)
197
- total_output_tokens += chunk_tokens
198
- logger.info("Received chunk", extra={
199
- 'event_type': 'chunk_received',
200
- 'data': {
201
- 'chunk': chunk,
202
- 'tokens': chunk_tokens,
203
- 'total_tokens': total_output_tokens
204
- }
205
- })
206
-
207
- if data.get('final', False):
208
- logger.info("Final response received", extra={
209
- 'event_type': 'response_complete',
 
 
 
 
 
210
  'data': {
 
 
211
  'total_tokens': total_output_tokens
212
  }
213
  })
214
- response_event.set()
215
-
216
- @sio.event
217
- async def connect():
218
- logger.info("Connected to Perplexity AI", extra={'event_type': 'connection_established'})
219
- emit_data = {
220
- "version": "2.9",
221
- "source": "default",
222
- "attachments": [],
223
- "language": "en-GB",
224
- "timezone": "Europe/London",
225
- "mode": "concise",
226
- "is_related_query": False,
227
- "is_default_related_query": False,
228
- "visitor_id": str(uuid.uuid4()),
229
- "frontend_context_uuid": str(uuid.uuid4()),
230
- "prompt_source": "user",
231
- "query_source": "home"
232
- }
233
- await sio.emit('perplexity_ask', (previous_messages, emit_data))
234
- logger.info("Sent query to Perplexity AI", extra={
235
- 'event_type': 'query_sent',
236
  'data': {
237
- 'message': previous_messages[:100] + '...' if len(previous_messages) > 100 else previous_messages
238
  }
239
  })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
 
241
- async def heartbeat():
242
- while not response_event.is_set():
243
- await sio.emit('ping')
244
- await asyncio.sleep(25)
245
-
246
- try:
247
- await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
248
- heartbeat_task = asyncio.create_task(heartbeat())
249
-
250
- while not response_event.is_set() and (time.time() - start_time) < timeout:
251
- current_time = time.time()
252
- if current_time - last_activity_time > 60: # 如果60秒内没有活动,记录警告
253
- logger.warning("No activity for 60 seconds", extra={'event_type': 'inactivity_warning'})
254
- await asyncio.sleep(0.1)
255
- while response_text:
256
- chunk = response_text.pop(0)
257
- yield send_event("content_block_delta", {
258
- "type": "content_block_delta",
259
- "index": 0,
260
- "delta": {"type": "text_delta", "text": chunk},
261
- })
262
-
263
- if not response_event.is_set():
264
- logger.warning(f"Request timed out after {timeout} seconds", extra={
265
- 'event_type': 'request_timeout',
266
- 'data': {
267
- 'timeout': timeout,
268
- 'input_tokens': input_tokens,
269
- 'output_tokens': total_output_tokens,
270
- 'elapsed_time': time.time() - start_time
271
- }
272
- })
273
  yield send_event("content_block_delta", {
274
  "type": "content_block_delta",
275
  "index": 0,
276
- "delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"},
277
  })
278
-
279
- except Exception as e:
280
- logger.error(f"Error during socket connection: {str(e)}", exc_info=True)
 
 
 
 
 
 
 
 
281
  yield send_event("content_block_delta", {
282
  "type": "content_block_delta",
283
  "index": 0,
284
- "delta": {"type": "text_delta", "text": f"Error during socket connection: {str(e)}"},
285
  })
286
- finally:
287
- heartbeat_task.cancel()
288
- if sio.connected:
289
- await sio.disconnect()
290
-
291
- yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
292
- yield send_event("message_delta", {
293
- "type": "message_delta",
294
- "delta": {"stop_reason": "end_turn", "stop_sequence": None},
295
- "usage": {"output_tokens": total_output_tokens},
296
  })
297
- yield send_event("message_stop", {"type": "message_stop"})
 
 
 
 
 
 
 
 
 
 
 
298
 
299
- return loop.run_until_complete(run_socket_io())
 
 
 
 
 
 
300
 
301
- return Response(generate(), content_type='text/event-stream')
302
 
303
  except Exception as e:
304
  logger.error(f"Request error: {str(e)}", exc_info=True)
@@ -306,9 +306,6 @@ def messages():
306
  return jsonify({"error": str(e)}), 400
307
 
308
  def handle_non_stream(previous_messages, msg_id, model, input_tokens):
309
- loop = asyncio.new_event_loop()
310
- asyncio.set_event_loop(loop)
311
-
312
  async def run_non_stream():
313
  response_text = []
314
  total_output_tokens = 0
@@ -405,6 +402,8 @@ def handle_non_stream(previous_messages, msg_id, model, input_tokens):
405
  })
406
  return Response(json.dumps(full_response, ensure_ascii=False), content_type='application/json')
407
 
 
 
408
  try:
409
  return loop.run_until_complete(run_non_stream())
410
  except Exception as e:
 
9
  import logging
10
  import re
11
  import asyncio
12
+ from functools import partial
 
 
13
 
14
  app = Flask(__name__)
15
 
 
146
 
147
  log_request(request.remote_addr, request.path, 200)
148
 
149
+ async def run_socket_io():
150
+ response_event = asyncio.Event()
151
+ response_text = []
152
+ total_output_tokens = 0
153
+ start_time = time.time()
154
+ last_activity_time = start_time
155
+ timeout = max(300, input_tokens / 100) # 动态设置超时时间,最少300秒
156
+
157
+ def send_event(event_type, data):
158
+ event = create_event(event_type, data)
159
+ logger.info(f"Sending {event_type} event", extra={
160
+ 'event_type': event_type,
161
+ 'data': {'content': event}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162
  })
163
+ return event
164
+
165
+ yield send_event("message_start", {
166
+ "type": "message_start",
167
+ "message": {
168
+ "id": msg_id,
169
+ "type": "message",
170
+ "role": "assistant",
171
+ "model": model,
172
+ "content": [],
173
+ "stop_reason": None,
174
+ "stop_sequence": None,
175
+ "usage": {"input_tokens": input_tokens, "output_tokens": total_output_tokens},
176
+ },
177
+ })
178
+ yield send_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
179
+ yield send_event("ping", {"type": "ping"})
180
+
181
+ @sio.event
182
+ async def query_progress(data):
183
+ nonlocal total_output_tokens, response_text, last_activity_time
184
+ last_activity_time = time.time()
185
+ if 'text' in data:
186
+ text = json.loads(data['text'])
187
+ chunk = text['chunks'][-1] if text['chunks'] else None
188
+ if chunk:
189
+ response_text.append(chunk)
190
+ chunk_tokens = calculate_tokens(chunk)
191
+ total_output_tokens += chunk_tokens
192
+ logger.info("Received chunk", extra={
193
+ 'event_type': 'chunk_received',
194
  'data': {
195
+ 'chunk': chunk,
196
+ 'tokens': chunk_tokens,
197
  'total_tokens': total_output_tokens
198
  }
199
  })
200
+
201
+ if data.get('final', False):
202
+ logger.info("Final response received", extra={
203
+ 'event_type': 'response_complete',
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  'data': {
205
+ 'total_tokens': total_output_tokens
206
  }
207
  })
208
+ response_event.set()
209
+
210
+ @sio.event
211
+ async def connect():
212
+ logger.info("Connected to Perplexity AI", extra={'event_type': 'connection_established'})
213
+ emit_data = {
214
+ "version": "2.9",
215
+ "source": "default",
216
+ "attachments": [],
217
+ "language": "en-GB",
218
+ "timezone": "Europe/London",
219
+ "mode": "concise",
220
+ "is_related_query": False,
221
+ "is_default_related_query": False,
222
+ "visitor_id": str(uuid.uuid4()),
223
+ "frontend_context_uuid": str(uuid.uuid4()),
224
+ "prompt_source": "user",
225
+ "query_source": "home"
226
+ }
227
+ await sio.emit('perplexity_ask', (previous_messages, emit_data))
228
+ logger.info("Sent query to Perplexity AI", extra={
229
+ 'event_type': 'query_sent',
230
+ 'data': {
231
+ 'message': previous_messages[:100] + '...' if len(previous_messages) > 100 else previous_messages
232
+ }
233
+ })
234
 
235
+ async def heartbeat():
236
+ while not response_event.is_set():
237
+ await sio.emit('ping')
238
+ await asyncio.sleep(25)
239
+
240
+ try:
241
+ await sio.connect('wss://www.perplexity.ai/', **connect_opts, headers=sio_opts['extraHeaders'])
242
+ heartbeat_task = asyncio.create_task(heartbeat())
243
+
244
+ while not response_event.is_set() and (time.time() - start_time) < timeout:
245
+ current_time = time.time()
246
+ if current_time - last_activity_time > 60: # 如果60秒内没有活动,记录警告
247
+ logger.warning("No activity for 60 seconds", extra={'event_type': 'inactivity_warning'})
248
+ await asyncio.sleep(0.1)
249
+ while response_text:
250
+ chunk = response_text.pop(0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
251
  yield send_event("content_block_delta", {
252
  "type": "content_block_delta",
253
  "index": 0,
254
+ "delta": {"type": "text_delta", "text": chunk},
255
  })
256
+
257
+ if not response_event.is_set():
258
+ logger.warning(f"Request timed out after {timeout} seconds", extra={
259
+ 'event_type': 'request_timeout',
260
+ 'data': {
261
+ 'timeout': timeout,
262
+ 'input_tokens': input_tokens,
263
+ 'output_tokens': total_output_tokens,
264
+ 'elapsed_time': time.time() - start_time
265
+ }
266
+ })
267
  yield send_event("content_block_delta", {
268
  "type": "content_block_delta",
269
  "index": 0,
270
+ "delta": {"type": "text_delta", "text": f"Request timed out after {timeout} seconds"},
271
  })
272
+
273
+ except Exception as e:
274
+ logger.error(f"Error during socket connection: {str(e)}", exc_info=True)
275
+ yield send_event("content_block_delta", {
276
+ "type": "content_block_delta",
277
+ "index": 0,
278
+ "delta": {"type": "text_delta", "text": f"Error during socket connection: {str(e)}"},
 
 
 
279
  })
280
+ finally:
281
+ heartbeat_task.cancel()
282
+ if sio.connected:
283
+ await sio.disconnect()
284
+
285
+ yield send_event("content_block_stop", {"type": "content_block_stop", "index": 0})
286
+ yield send_event("message_delta", {
287
+ "type": "message_delta",
288
+ "delta": {"stop_reason": "end_turn", "stop_sequence": None},
289
+ "usage": {"output_tokens": total_output_tokens},
290
+ })
291
+ yield send_event("message_stop", {"type": "message_stop"})
292
 
293
+ def run_async():
294
+ loop = asyncio.new_event_loop()
295
+ asyncio.set_event_loop(loop)
296
+ try:
297
+ return loop.run_until_complete(run_socket_io())
298
+ finally:
299
+ loop.close()
300
 
301
+ return Response(run_async(), content_type='text/event-stream')
302
 
303
  except Exception as e:
304
  logger.error(f"Request error: {str(e)}", exc_info=True)
 
306
  return jsonify({"error": str(e)}), 400
307
 
308
  def handle_non_stream(previous_messages, msg_id, model, input_tokens):
 
 
 
309
  async def run_non_stream():
310
  response_text = []
311
  total_output_tokens = 0
 
402
  })
403
  return Response(json.dumps(full_response, ensure_ascii=False), content_type='application/json')
404
 
405
+ loop = asyncio.new_event_loop()
406
+ asyncio.set_event_loop(loop)
407
  try:
408
  return loop.run_until_complete(run_non_stream())
409
  except Exception as e: