import json import os from fastapi import FastAPI, Header, Request, Response, HTTPException from fastapi.responses import JSONResponse, StreamingResponse import traceback from typing import Annotated, Optional from fetchYoutubeSubtitle import fetchSubtitle, fetchSubtitleUrls, fetchSubtitleByInfo token = os.getenv("HF_X_TOKEN") app = FastAPI() @app.get("/") def read_root(request: Request): print(request.headers) print(request.client.host) print(request.client.port) return {"Hello": "World!"} @app.get("/json") def read_json(): return JSONResponse(content={"Hello": "World!"}) @app.get("/subtitle/") async def get_subtitle( url: str, subtype: str = "srt", lang: str = "en", proxy: str = None, x_token: Annotated[str | None, Header()] = None, ): if token != x_token: return JSONResponse({"error": "Invalid token"}) subtitle = await fetchSubtitle(url, lang=lang, subType=subtype, proxy=proxy) return JSONResponse(content=subtitle) @app.get("/subtitle-urls/") async def get_subtitleUrls( url: str, proxy: str = None, x_token: Annotated[str | None, Header()] = None ): if token != x_token: return JSONResponse({"error": "Invalid token"}) subtitles = await fetchSubtitleUrls(url, proxy=proxy) return JSONResponse(content=subtitles) def download_file(content, chunk_size): num_chunks = (len(content) // chunk_size) + 1 for i in range(num_chunks): start = i * chunk_size end = (i + 1) * chunk_size yield content[start:end] @app.get("/subtitle-dl/") async def download( url: str, fileName: str, fileType: str, info: str, # download info proxy: str = None, x_token: Annotated[str | None, Header()] = None, request: Request = None, ): if token != x_token: raise HTTPException(status_code=401, detail="Invalid token") dlInfo = None try: dlInfo = json.loads(info) except Exception: raise HTTPException(status_code=400, detail="Invalid params") try: # print( # "url: {}, fileName: {}, fileType: {}, dlInfo: {}".format( # url, fileName, fileType, dlInfo # ) # ) subtitle = await fetchSubtitleByInfo(url, fileType, dlInfo, proxy=proxy) excluded_headers = [ "content-encoding", "content-length", "transfer-encoding", "connection", ] headers = [ (name, value) for (name, value) in request.headers.items() if name.lower() not in excluded_headers ] headers.append(("Content-Type", "text/plain")) headers.append( ( "Content-Disposition", f'attachment; filename="{fileName.encode("utf-8").decode("latin-1")}.{fileType}"', ) ) return StreamingResponse( download_file(subtitle, 8192), headers=dict(headers), status_code=200 ) except Exception as e: print(e) traceback.print_exc() raise HTTPException(status_code=500, detail="Internal Server Error")