lanbogao commited on
Commit
5c48e61
1 Parent(s): d53a7e5

1. Add api subtitle-dl to stream download subtitle.

Browse files

2. Add id,url, thumbnail to fetchSubtitleUrls, fetchAnySubtitle

Files changed (2) hide show
  1. fetchYoutubeSubtitle.py +225 -96
  2. main.py +77 -8
fetchYoutubeSubtitle.py CHANGED
@@ -1,6 +1,7 @@
1
  import os
2
  import json
3
  import math
 
4
  import time
5
  import traceback
6
  from typing import Optional
@@ -22,7 +23,52 @@ debug = os.getenv("DEBUG")
22
  # }
23
  # ]
24
  # }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  def getUrlFromSubtitleItem(item, lang="en", subType="vtt"):
 
26
  for subtitle in item[lang]:
27
  if lang != "live_chat" and subType == "xml":
28
  if debug:
@@ -38,10 +84,21 @@ def getUrlFromSubtitleItem(item, lang="en", subType="vtt"):
38
  return None
39
 
40
 
41
- def getRequestedSubtitlesUrl(info_dict, lang, subType):
42
  item = info_dict.get("requested_subtitles")
43
- if item:
44
- langs = item.keys()
 
 
 
 
 
 
 
 
 
 
 
45
  for l in langs:
46
  if l.startswith(lang):
47
  item = {l: [item[l]]} if type(item[l]) == dict else item
@@ -58,6 +115,7 @@ def getSubtitleLangUrl(
58
  lang="en",
59
  subType="vtt",
60
  subTitleKeys=["subtitles", "automatic_captions"],
 
61
  ):
62
  for subtitle_item in subTitleKeys:
63
  langs = info_dict.get(subtitle_item).keys()
@@ -68,15 +126,18 @@ def getSubtitleLangUrl(
68
  print("getSubtitleLangUrl lang:{}".format(lang))
69
  return url
70
 
71
- for subtitle_item in subTitleKeys:
72
- langs = info_dict.get(subtitle_item).keys()
73
- for l in langs:
74
- if l.startswith(lang):
75
- url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType)
76
- if url:
77
- if debug:
78
- print("getSubtitleLangUrl lang:{} url:{}".format(l, url))
79
- return url
 
 
 
80
 
81
  return None
82
 
@@ -111,47 +172,21 @@ async def fetchSubtitle(
111
  subType: Optional[str] = "vtt",
112
  proxy: Optional[str] = None,
113
  ) -> dict:
114
- return await fetchSubtitlebyType(url, lang, subType, proxy)
115
 
116
 
117
- async def fetchSubtitlebyType(
118
  url: str,
119
  lang: Optional[str] = "en",
120
  subType: Optional[str] = "vtt",
 
121
  proxy: Optional[str] = None,
122
  ) -> dict:
123
  # lang-code or lang.* .* is regex
124
  # reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*"
125
 
126
- ydl_opts = {
127
- "noplaylist": True,
128
- "writesubtitles": True,
129
- "writeautomaticsub": True,
130
- # "listsubtitles": True,
131
- # "subtitlesformat": subType, # mark due to default youtube no srt and xml format
132
- "subtitleslangs": [
133
- lang,
134
- "-live_chat",
135
- ], # filter live chat to requested_subtitles
136
- "skip_download": True,
137
- "socket_timeout": 10,
138
- "extractor_retries": 0,
139
- # "debug_printtraffic": True,
140
- "extractor_args": {
141
- "youtube": {
142
- "player_skip": [
143
- "configs",
144
- "initial",
145
- ], # skip "webpage" will cause l2P5PgL1LfI missing some langs,
146
- "player_client": ["ios"],
147
- "skip": ["hls", "dash"], # don't skip "translated_subs"
148
- }
149
- },
150
- }
151
 
152
- if proxy:
153
- ydl_opts.update({"proxy": proxy, "socket_timeout": 20})
154
- # print(ydl_opts)
155
  title = "unknow"
156
  duration = ""
157
  try:
@@ -161,52 +196,53 @@ async def fetchSubtitlebyType(
161
  title = info_dict.get("title", "unknow")
162
  seconds = info_dict.get("duration")
163
  duration = str(seconds) if seconds else ""
164
- isSrt = False
165
- if info_dict.get("extractor") == "youtube" and subType == "srt":
166
- subType = "xml"
167
- isSrt = True
 
 
 
 
 
168
  if debug:
169
  print(
170
  "subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format(
171
  info_dict.get("subtitles").keys(),
172
  info_dict.get("automatic_captions").keys(),
173
- info_dict.get("requested_subtitles").keys(),
 
 
 
 
174
  )
175
  )
176
 
177
- # subtitle_url = getRequestedSubtitlesUrl(info_dict, lang, subType)
178
- # if not subtitle_url:
179
- # subtitle_url = getSubtitleLangUrl(info_dict, lang, subType)
180
- # if not subtitle_url:
181
- # subtitle_url = getSubtitleOtherUrl(info_dict, lang, subType)
182
-
183
  subtitle_funcs = [
184
  getRequestedSubtitlesUrl,
185
  getSubtitleLangUrl,
186
  getSubtitleOtherUrl,
187
  ]
188
  for index in range(len(subtitle_funcs)):
189
- subtitle_url = subtitle_funcs[index](info_dict, lang, subType)
190
  if subtitle_url:
191
  # print("subtitle_url: {}".format(subtitle_url))
192
- with ydl.urlopen(subtitle_url) as response:
193
- subtitle = (
194
- xml_caption_to_srt(response.read().decode())
195
- if isSrt
196
- else response.read().decode()
197
  )
198
- print(
199
- "function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format(
200
- index, url, title, duration, len(subtitle or "")
201
- )
202
- )
203
- if subtitle is not None:
204
- return {
205
- "title": title,
206
- "duration": duration,
207
- "subtitle": subtitle,
208
- "chapters": info_dict.get("chapters", None),
209
- }
210
  except Exception as e:
211
  print(e)
212
  traceback.print_exc()
@@ -234,7 +270,7 @@ def is_spaces_only(variable):
234
  return True
235
 
236
 
237
- def xml_caption_to_srt(xml_captions: str) -> str:
238
  """Convert xml caption tracks to "SubRip Subtitle (srt)".
239
  :param str xml_captions:
240
  XML formatted caption tracks.
@@ -246,7 +282,7 @@ def xml_caption_to_srt(xml_captions: str) -> str:
246
  caption = unescape(
247
  text.replace("\n", " ").replace(" ", " "),
248
  )
249
- if len(caption) == 0 or is_spaces_only(caption):
250
  continue
251
  try:
252
  duration = float(child.attrib["dur"])
@@ -262,31 +298,39 @@ def xml_caption_to_srt(xml_captions: str) -> str:
262
  text=caption,
263
  )
264
  segments.append(line)
265
- # return None if no text in xml
266
- return "\n".join(segments).strip() if len(segments) > 0 else None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
267
 
268
 
269
  async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json:
270
- ydl_opts = {
271
- "noplaylist": True,
272
- "writesubtitles": True,
273
- "writeautomaticsub": True,
274
- # "allsubtitles": False,
275
- # "listsubtitles": True,
276
- "skip_download": True,
277
- "socket_timeout": 10,
278
- "extractor_retries": 0,
279
- # "debug_printtraffic": True,
280
- "extractor_args": {
281
- "youtube": {
282
- "player_skip": ["configs", "initial"],
283
- "player_client": ["ios"],
284
- "skip": ["hls", "dash"], # , "translated_subs"
285
- }
286
- },
287
- }
288
- if proxy:
289
- ydl_opts.update({"proxy": proxy, "socket_timeout": 20})
290
 
291
  title = "unknow"
292
  duration = ""
@@ -296,9 +340,16 @@ async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json:
296
  title = info_dict.get("title", "unknow")
297
  seconds = info_dict.get("duration")
298
  duration = str(seconds) if seconds else ""
299
-
 
 
 
 
300
  return {
 
 
301
  "title": title,
 
302
  "duration": duration,
303
  "subtitles": info_dict.get("subtitles"),
304
  "automatic_captions": info_dict.get("automatic_captions"),
@@ -306,3 +357,81 @@ async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json:
306
 
307
  except Exception as e:
308
  return {"error": str(e)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import json
3
  import math
4
+ import re
5
  import time
6
  import traceback
7
  from typing import Optional
 
23
  # }
24
  # ]
25
  # }
26
+
27
+
28
+ def getSubtitleOptions(
29
+ lang: Optional[str] = None,
30
+ proxy: Optional[str] = None,
31
+ ):
32
+ ydl_opts = {
33
+ "noplaylist": True,
34
+ "writesubtitles": True,
35
+ "writeautomaticsub": True,
36
+ # "listsubtitles": True,
37
+ # "subtitlesformat": subType, # mark due to default youtube no srt and xml format
38
+ "skip_download": True,
39
+ "socket_timeout": 10,
40
+ "extractor_retries": 0,
41
+ # "debug_printtraffic": True,
42
+ "extractor_args": {
43
+ "youtube": {
44
+ "player_skip": [
45
+ "configs",
46
+ "initial",
47
+ ], # skip "webpage" will cause l2P5PgL1LfI missing some langs,
48
+ "player_client": ["ios"],
49
+ "skip": ["hls", "dash"], # don't skip "translated_subs"
50
+ }
51
+ },
52
+ }
53
+
54
+ if lang:
55
+ ydl_opts.update(
56
+ {
57
+ "subtitleslangs": [
58
+ lang,
59
+ "-live_chat",
60
+ ]
61
+ }
62
+ ) # filter live chat to requested_subtitles
63
+ if proxy:
64
+ ydl_opts.update({"proxy": proxy, "socket_timeout": 20})
65
+
66
+ # print(ydl_opts)
67
+ return ydl_opts
68
+
69
+
70
  def getUrlFromSubtitleItem(item, lang="en", subType="vtt"):
71
+ # print("item: {}, lang: {}, subType: {}".format(item, lang, subType))
72
  for subtitle in item[lang]:
73
  if lang != "live_chat" and subType == "xml":
74
  if debug:
 
84
  return None
85
 
86
 
87
+ def getRequestedSubtitlesUrl(info_dict, lang, subType, isLangKey=False):
88
  item = info_dict.get("requested_subtitles")
89
+ if not item:
90
+ return None
91
+
92
+ langs = item.keys()
93
+ if lang in langs:
94
+ item = {lang: [item[lang]]} if type(item[lang]) == dict else item
95
+ url = getUrlFromSubtitleItem(item, lang, subType)
96
+ if url:
97
+ if debug:
98
+ print("getRequestedSubtitlesUrl lang:{}".format(lang))
99
+ return url
100
+
101
+ if not isLangKey:
102
  for l in langs:
103
  if l.startswith(lang):
104
  item = {l: [item[l]]} if type(item[l]) == dict else item
 
115
  lang="en",
116
  subType="vtt",
117
  subTitleKeys=["subtitles", "automatic_captions"],
118
+ isLangKey=False,
119
  ):
120
  for subtitle_item in subTitleKeys:
121
  langs = info_dict.get(subtitle_item).keys()
 
126
  print("getSubtitleLangUrl lang:{}".format(lang))
127
  return url
128
 
129
+ if not isLangKey:
130
+ for subtitle_item in subTitleKeys:
131
+ langs = info_dict.get(subtitle_item).keys()
132
+ for l in langs:
133
+ if l.startswith(lang):
134
+ url = getUrlFromSubtitleItem(
135
+ info_dict.get(subtitle_item), l, subType
136
+ )
137
+ if url:
138
+ if debug:
139
+ print("getSubtitleLangUrl lang:{} url:{}".format(l, url))
140
+ return url
141
 
142
  return None
143
 
 
172
  subType: Optional[str] = "vtt",
173
  proxy: Optional[str] = None,
174
  ) -> dict:
175
+ return await fetchAnySubtitle(url, lang, subType, proxy)
176
 
177
 
178
+ async def fetchAnySubtitle(
179
  url: str,
180
  lang: Optional[str] = "en",
181
  subType: Optional[str] = "vtt",
182
+ skipEmpty: bool = True,
183
  proxy: Optional[str] = None,
184
  ) -> dict:
185
  # lang-code or lang.* .* is regex
186
  # reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*"
187
 
188
+ ydl_opts = getSubtitleOptions(lang, proxy)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189
 
 
 
 
190
  title = "unknow"
191
  duration = ""
192
  try:
 
196
  title = info_dict.get("title", "unknow")
197
  seconds = info_dict.get("duration")
198
  duration = str(seconds) if seconds else ""
199
+ thumbnail = info_dict.get("thumbnail")
200
+ if ".webp" in thumbnail:
201
+ thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(
202
+ info_dict.get("id")
203
+ )
204
+
205
+ reqType = subType
206
+ if info_dict.get("extractor") == "youtube" and subType in ["srt", "txt"]:
207
+ reqType = "xml"
208
  if debug:
209
  print(
210
  "subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format(
211
  info_dict.get("subtitles").keys(),
212
  info_dict.get("automatic_captions").keys(),
213
+ (
214
+ info_dict.get("requested_subtitles").keys()
215
+ if info_dict.get("requested_subtitles")
216
+ else {}
217
+ ),
218
  )
219
  )
220
 
 
 
 
 
 
 
221
  subtitle_funcs = [
222
  getRequestedSubtitlesUrl,
223
  getSubtitleLangUrl,
224
  getSubtitleOtherUrl,
225
  ]
226
  for index in range(len(subtitle_funcs)):
227
+ subtitle_url = subtitle_funcs[index](info_dict, lang, reqType)
228
  if subtitle_url:
229
  # print("subtitle_url: {}".format(subtitle_url))
230
+ subtitle = fetchSubtitlebydlUrl(ydl, subType, subtitle_url)
231
+ print(
232
+ "function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format(
233
+ index, url, title, duration, len(subtitle or "")
 
234
  )
235
+ )
236
+ if subtitle is not None:
237
+ return {
238
+ "id": info_dict.get("id"),
239
+ "url": url,
240
+ "title": title,
241
+ "thumbnail": thumbnail,
242
+ "duration": duration,
243
+ "subtitle": subtitle,
244
+ "chapters": info_dict.get("chapters", None),
245
+ }
 
246
  except Exception as e:
247
  print(e)
248
  traceback.print_exc()
 
270
  return True
271
 
272
 
273
+ def xml_caption_to_srt(xml_captions: str, skip_empty: bool = True) -> str:
274
  """Convert xml caption tracks to "SubRip Subtitle (srt)".
275
  :param str xml_captions:
276
  XML formatted caption tracks.
 
282
  caption = unescape(
283
  text.replace("\n", " ").replace(" ", " "),
284
  )
285
+ if skip_empty and len(caption) == 0 or is_spaces_only(caption):
286
  continue
287
  try:
288
  duration = float(child.attrib["dur"])
 
298
  text=caption,
299
  )
300
  segments.append(line)
301
+
302
+ if skip_empty:
303
+ # return None if no text in xml
304
+ return "\n".join(segments).strip() if len(segments) > 0 else None
305
+ return "\n".join(segments).strip()
306
+
307
+
308
+ def xml_caption_to_txt(xml_captions: str, skip_empty: bool = True) -> str:
309
+ """Convert xml caption tracks to "SubRip Subtitle (srt)".
310
+ :param str xml_captions:
311
+ XML formatted caption tracks.
312
+ """
313
+ segments = []
314
+ root = ElementTree.fromstring(xml_captions)
315
+ for i, child in enumerate(list(root)):
316
+ text = child.text or ""
317
+ caption = unescape(
318
+ text.replace("\n", " ").replace(" ", " "),
319
+ )
320
+ if skip_empty and (len(caption) == 0 or is_spaces_only(caption)):
321
+ continue
322
+
323
+ line = "{text}\n".format(text=caption)
324
+ segments.append(line)
325
+
326
+ if skip_empty:
327
+ "\n".join(segments).strip() if len(segments) > 0 else None
328
+
329
+ return "\n".join(segments).strip()
330
 
331
 
332
  async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json:
333
+ ydl_opts = getSubtitleOptions(proxy)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
334
 
335
  title = "unknow"
336
  duration = ""
 
340
  title = info_dict.get("title", "unknow")
341
  seconds = info_dict.get("duration")
342
  duration = str(seconds) if seconds else ""
343
+ thumbnail = info_dict.get("thumbnail")
344
+ if ".webp" in thumbnail:
345
+ thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(
346
+ info_dict.get("id")
347
+ )
348
  return {
349
+ "id": info_dict.get("id"),
350
+ "url": url,
351
  "title": title,
352
+ "thumbnail": thumbnail,
353
  "duration": duration,
354
  "subtitles": info_dict.get("subtitles"),
355
  "automatic_captions": info_dict.get("automatic_captions"),
 
357
 
358
  except Exception as e:
359
  return {"error": str(e)}
360
+
361
+
362
+ def fetchSubtitlebydlUrl(ydl, subType, dlUrl, skipEmpty=True):
363
+ dlUrl = dlUrl if subType not in ["srt", "txt"] else re.sub(r"&fmt=[\w]+", "", dlUrl)
364
+
365
+ try:
366
+ with ydl.urlopen(dlUrl) as resp:
367
+ if subType == "srt":
368
+ return xml_caption_to_srt(resp.read().decode(), skipEmpty)
369
+ elif subType == "txt":
370
+ return xml_caption_to_txt(resp.read().decode(), skipEmpty)
371
+ else:
372
+ return resp.read().decode()
373
+ except Exception as e:
374
+ print(e)
375
+ return None
376
+
377
+
378
+ def getSubtitleUrlByLang(info_dict, lang, subType, isLangKey):
379
+ subtitle_funcs = [
380
+ getRequestedSubtitlesUrl,
381
+ getSubtitleLangUrl,
382
+ ]
383
+ for index in range(len(subtitle_funcs)):
384
+ subtitle_url = subtitle_funcs[index](
385
+ info_dict, lang, subType, isLangKey=isLangKey
386
+ )
387
+ print("getSubtitleUrlByLang subtitle_url: {}".format(subtitle_url))
388
+ if subtitle_url:
389
+ return subtitle_url
390
+
391
+
392
+ async def fetchSubtitleByInfo(
393
+ url: str, subType: str, dlInfo, proxy: Optional[str] = None
394
+ ):
395
+ try:
396
+ reqType = "xml" if subType in ["srt", "txt"] else subType
397
+ ydl_opts = getSubtitleOptions(dlInfo.get("lang", None), proxy)
398
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
399
+ subtitle = None
400
+ if "dlUrl" in dlInfo:
401
+ subtitle = fetchSubtitlebydlUrl(
402
+ ydl, subType, dlInfo.get("dlUrl"), False
403
+ )
404
+ if subtitle is not None:
405
+ return subtitle
406
+
407
+ info_dict = ydl.extract_info(url, download=False)
408
+ if debug:
409
+ print(
410
+ "subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format(
411
+ info_dict.get("subtitles").keys(),
412
+ info_dict.get("automatic_captions").keys(),
413
+ (
414
+ info_dict.get("requested_subtitles").keys()
415
+ if info_dict.get("requested_subtitles")
416
+ else {}
417
+ ),
418
+ )
419
+ )
420
+
421
+ subtitleUrl = None
422
+ if "langKey" in dlInfo:
423
+ subtitleUrl = getSubtitleUrlByLang(
424
+ info_dict, dlInfo.get("langKey"), reqType, True
425
+ )
426
+ if subtitleUrl is None:
427
+ subtitleUrl = getSubtitleUrlByLang(
428
+ info_dict, dlInfo.get("lang"), reqType, False
429
+ )
430
+
431
+ print("subtitleUrl: {}".format(subtitleUrl))
432
+ subtitle = fetchSubtitlebydlUrl(ydl, subType, subtitleUrl, False)
433
+ return subtitle
434
+ except Exception as e:
435
+ print(e)
436
+ traceback.print_exc()
437
+ return {"error": str(e)}
main.py CHANGED
@@ -1,14 +1,17 @@
 
1
  import os
2
- from fastapi import FastAPI, Header, Request
3
- from fastapi.responses import JSONResponse
4
- from typing import Annotated
5
- from fetchYoutubeSubtitle import fetchSubtitle, fetchSubtitleUrls
 
6
 
7
  token = os.getenv("HF_X_TOKEN")
8
  app = FastAPI()
9
 
 
10
  @app.get("/")
11
- def read_root(request:Request):
12
  print(request.headers)
13
  print(request.client.host)
14
  print(request.client.port)
@@ -21,7 +24,13 @@ def read_json():
21
 
22
 
23
  @app.get("/subtitle/")
24
- async def get_subtitle(url: str, subtype: str="srt", lang:str='en', proxy: str=None, x_token: Annotated[str | None, Header()] = None):
 
 
 
 
 
 
25
  if token != x_token:
26
  return JSONResponse({"error": "Invalid token"})
27
  subtitle = await fetchSubtitle(url, lang=lang, subType=subtype, proxy=proxy)
@@ -29,8 +38,68 @@ async def get_subtitle(url: str, subtype: str="srt", lang:str='en', proxy: str=N
29
 
30
 
31
  @app.get("/subtitle-urls/")
32
- async def get_subtitleUrls(url: str, proxy: str=None, x_token: Annotated[str | None, Header()] = None):
 
 
33
  if token != x_token:
34
  return JSONResponse({"error": "Invalid token"})
35
  subtitles = await fetchSubtitleUrls(url, proxy=proxy)
36
- return JSONResponse(content=subtitles)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
  import os
3
+ from fastapi import FastAPI, Header, Request, Response, HTTPException
4
+ from fastapi.responses import JSONResponse, StreamingResponse
5
+ import traceback
6
+ from typing import Annotated, Optional
7
+ from fetchYoutubeSubtitle import fetchSubtitle, fetchSubtitleUrls, fetchSubtitleByInfo
8
 
9
  token = os.getenv("HF_X_TOKEN")
10
  app = FastAPI()
11
 
12
+
13
  @app.get("/")
14
+ def read_root(request: Request):
15
  print(request.headers)
16
  print(request.client.host)
17
  print(request.client.port)
 
24
 
25
 
26
  @app.get("/subtitle/")
27
+ async def get_subtitle(
28
+ url: str,
29
+ subtype: str = "srt",
30
+ lang: str = "en",
31
+ proxy: str = None,
32
+ x_token: Annotated[str | None, Header()] = None,
33
+ ):
34
  if token != x_token:
35
  return JSONResponse({"error": "Invalid token"})
36
  subtitle = await fetchSubtitle(url, lang=lang, subType=subtype, proxy=proxy)
 
38
 
39
 
40
  @app.get("/subtitle-urls/")
41
+ async def get_subtitleUrls(
42
+ url: str, proxy: str = None, x_token: Annotated[str | None, Header()] = None
43
+ ):
44
  if token != x_token:
45
  return JSONResponse({"error": "Invalid token"})
46
  subtitles = await fetchSubtitleUrls(url, proxy=proxy)
47
+ return JSONResponse(content=subtitles)
48
+
49
+
50
+ def download_file(content, chunk_size):
51
+ num_chunks = (len(content) // chunk_size) + 1
52
+ for i in range(num_chunks):
53
+ start = i * chunk_size
54
+ end = (i + 1) * chunk_size
55
+ yield content[start:end]
56
+
57
+
58
+ @app.get("/subtitle-dl/")
59
+ async def download(
60
+ url: str,
61
+ fileName: str,
62
+ fileType: str,
63
+ info: str, # download info
64
+ proxy: str = None,
65
+ x_token: Annotated[str | None, Header()] = None,
66
+ request: Request = None,
67
+ ):
68
+ if token != x_token:
69
+ raise HTTPException(status_code=401, detail="Invalid token")
70
+
71
+ try:
72
+ dlInfo = json.loads(info)
73
+ # print(
74
+ # "url: {}, fileName: {}, fileType: {}, dlInfo: {}".format(
75
+ # url, fileName, fileType, dlInfo
76
+ # )
77
+ # )
78
+ subtitle = await fetchSubtitleByInfo(url, fileType, dlInfo, proxy=proxy)
79
+
80
+ excluded_headers = [
81
+ "content-encoding",
82
+ "content-length",
83
+ "transfer-encoding",
84
+ "connection",
85
+ ]
86
+ headers = [
87
+ (name, value)
88
+ for (name, value) in request.headers.items()
89
+ if name.lower() not in excluded_headers
90
+ ]
91
+ headers.append(("Content-Type", "text/plain"))
92
+ headers.append(
93
+ (
94
+ "Content-Disposition",
95
+ f'attachment; filename="{fileName.encode("utf-8").decode("latin-1")}.{fileType}"',
96
+ )
97
+ )
98
+
99
+ return StreamingResponse(
100
+ download_file(subtitle, 8192), headers=dict(headers), status_code=200
101
+ )
102
+ except Exception as e:
103
+ print(e)
104
+ traceback.print_exc()
105
+ raise HTTPException(status_code=500, detail="Internal Server Error")