lanbogao's picture
1. Add retry to extract video, and use proxy when retry if has proxy.
eb92911
raw
history blame
3.15 kB
import json
import os
from fastapi import FastAPI, Header, Request, Response, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
import traceback
from typing import Annotated, Optional
from fetchYoutubeSubtitle import fetchSubtitle, fetchSubtitleUrls, fetchSubtitleByInfo
token = os.getenv("HF_X_TOKEN")
app = FastAPI()
@app.get("/")
def read_root(request: Request):
print(request.headers)
print(request.client.host)
print(request.client.port)
return {"Hello": "World!"}
@app.get("/json")
def read_json():
return JSONResponse(content={"Hello": "World!"})
@app.get("/subtitle/")
async def get_subtitle(
url: str,
subtype: str = "srt",
lang: str = "en",
proxy: str = None,
x_token: Annotated[str | None, Header()] = None,
):
if token != x_token:
return JSONResponse({"error": "Invalid token"})
subtitle = await fetchSubtitle(url, lang=lang, subType=subtype, proxy=proxy)
return JSONResponse(content=subtitle)
@app.get("/subtitle-urls/")
async def get_subtitleUrls(
url: str, proxy: str = None, x_token: Annotated[str | None, Header()] = None
):
if token != x_token:
return JSONResponse({"error": "Invalid token"})
subtitles = await fetchSubtitleUrls(url, proxy=proxy)
return JSONResponse(content=subtitles)
def download_file(content, chunk_size):
num_chunks = (len(content) // chunk_size) + 1
for i in range(num_chunks):
start = i * chunk_size
end = (i + 1) * chunk_size
yield content[start:end]
@app.get("/subtitle-dl/")
async def download(
url: str,
fileName: str,
fileType: str,
info: str, # download info
proxy: str = None,
x_token: Annotated[str | None, Header()] = None,
request: Request = None,
):
if token != x_token:
raise HTTPException(status_code=401, detail="Invalid token")
dlInfo = None
try:
dlInfo = json.loads(info)
except Exception:
raise HTTPException(status_code=400, detail="Invalid params")
try:
# print(
# "url: {}, fileName: {}, fileType: {}, dlInfo: {}".format(
# url, fileName, fileType, dlInfo
# )
# )
subtitle = await fetchSubtitleByInfo(url, fileType, dlInfo, proxy=proxy)
excluded_headers = [
"content-encoding",
"content-length",
"transfer-encoding",
"connection",
]
headers = [
(name, value)
for (name, value) in request.headers.items()
if name.lower() not in excluded_headers
]
headers.append(("Content-Type", "text/plain"))
headers.append(
(
"Content-Disposition",
f'attachment; filename="{fileName.encode("utf-8").decode("latin-1")}.{fileType}"',
)
)
return StreamingResponse(
download_file(subtitle, 8192), headers=dict(headers), status_code=200
)
except Exception as e:
print(e)
traceback.print_exc()
raise HTTPException(status_code=500, detail="Internal Server Error")