Spaces:
Runtime error
Runtime error
import json | |
import os | |
from fastapi import FastAPI, Header, Request, Response, HTTPException | |
from fastapi.responses import JSONResponse, StreamingResponse | |
import traceback | |
from typing import Annotated, Optional | |
from fetchYoutubeSubtitle import fetchSubtitle, fetchSubtitleUrls, fetchSubtitleByInfo | |
token = os.getenv("HF_X_TOKEN") | |
app = FastAPI() | |
def read_root(request: Request): | |
print(request.headers) | |
print(request.client.host) | |
print(request.client.port) | |
return {"Hello": "World!"} | |
def read_json(): | |
return JSONResponse(content={"Hello": "World!"}) | |
async def get_subtitle( | |
url: str, | |
subtype: str = "srt", | |
lang: str = "en", | |
proxy: str = None, | |
x_token: Annotated[str | None, Header()] = None, | |
): | |
if token != x_token: | |
return JSONResponse({"error": "Invalid token"}) | |
subtitle = await fetchSubtitle(url, lang=lang, subType=subtype, proxy=proxy) | |
return JSONResponse(content=subtitle) | |
async def get_subtitleUrls( | |
url: str, proxy: str = None, x_token: Annotated[str | None, Header()] = None | |
): | |
if token != x_token: | |
return JSONResponse({"error": "Invalid token"}) | |
subtitles = await fetchSubtitleUrls(url, proxy=proxy) | |
return JSONResponse(content=subtitles) | |
def download_file(content, chunk_size): | |
num_chunks = (len(content) // chunk_size) + 1 | |
for i in range(num_chunks): | |
start = i * chunk_size | |
end = (i + 1) * chunk_size | |
yield content[start:end] | |
async def download( | |
url: str, | |
fileName: str, | |
fileType: str, | |
info: str, # download info | |
proxy: str = None, | |
x_token: Annotated[str | None, Header()] = None, | |
request: Request = None, | |
): | |
if token != x_token: | |
raise HTTPException(status_code=401, detail="Invalid token") | |
dlInfo = None | |
try: | |
dlInfo = json.loads(info) | |
except Exception: | |
raise HTTPException(status_code=400, detail="Invalid params") | |
try: | |
# print( | |
# "url: {}, fileName: {}, fileType: {}, dlInfo: {}".format( | |
# url, fileName, fileType, dlInfo | |
# ) | |
# ) | |
subtitle = await fetchSubtitleByInfo(url, fileType, dlInfo, proxy=proxy) | |
excluded_headers = [ | |
"content-encoding", | |
"content-length", | |
"transfer-encoding", | |
"connection", | |
] | |
headers = [ | |
(name, value) | |
for (name, value) in request.headers.items() | |
if name.lower() not in excluded_headers | |
] | |
headers.append(("Content-Type", "text/plain")) | |
headers.append( | |
( | |
"Content-Disposition", | |
f'attachment; filename="{fileName.encode("utf-8").decode("latin-1")}.{fileType}"', | |
) | |
) | |
return StreamingResponse( | |
download_file(subtitle, 8192), headers=dict(headers), status_code=200 | |
) | |
except Exception as e: | |
print(e) | |
traceback.print_exc() | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |