from asyncio import create_subprocess_shell, gather, sleep from pathlib import Path from random import choice from subprocess import CalledProcessError, PIPE from typing import Any from uuid import uuid4 from httpx import AsyncClient, HTTPStatusError, RequestError oxipng_bin = Path(__file__).parent / 'oxipng' oxipng_bin.chmod(0o755) tokens = [ '7e0ea3da6a73d77003c1abba7f0ea13c', 'bc2e68b5918e5bb59ebca6c05d73daf9', 'fecbfbe0938bcd1df27b7a9be1702cc9', '04e9981d4d0981964cb4c9753173244d', 'dee75b07981c7aa211628ea7c7cbc03d', ] async def download_png(url: str, client: AsyncClient, retries: int = 5) -> Path: for attempt in range(retries): try: response = await client.get(url, timeout=30.0) response.raise_for_status() file_path = Path(f'{uuid4()}.png') file_path.write_bytes(response.content) return file_path except (HTTPStatusError, RequestError) as e: if attempt < retries - 1: await sleep(2 ** attempt) else: raise e async def download_pngs(urls: str | list[str]) -> list[Any]: urls = [urls] if isinstance(urls, str) else urls async with AsyncClient() as client: tasks = [download_png(url, client) for url in urls] return list(await gather(*tasks)) async def optimize_png(image_path: Path, retries: int = 3) -> None: command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}' for attempt in range(retries): try: process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE) stdout, stderr = await process.communicate() if process.returncode == 0: return else: raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr) except CalledProcessError as e: if attempt < retries - 1: await sleep(2 ** attempt) else: raise e async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None: image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] tasks = [optimize_png(image_path) for image_path in image_paths] await gather(*tasks) async def telegraph_upload_png(file_path: str | Path) -> str | None: file_path = Path(file_path) if not file_path.is_file() or file_path.stat().st_size > 5 * 1024 * 1024: return None url = 'https://telegra.ph/upload' headers = { 'authority': url.rsplit('/')[2], 'accept': 'application/json, text/javascript, */*; q=0.01', 'origin': url.rsplit('/', 1)[0], 'referer': url.rsplit('/', 1)[0], 'x-requested-with': 'XMLHttpRequest', } async with AsyncClient() as client: try: response = await client.post(url, headers=headers, files={'file': ('blob', file_path.read_bytes(), 'image/png')}) response.raise_for_status() result = response.json() except: return None if response.is_success and 'error' not in result: link = result[0]['src'] return url.rsplit('/', 1)[0] + link else: return None async def upload_image_to_imgbb(file_path: Path) -> str | None: url = f'https://api.imgbb.com/1/upload?key={choice(tokens)}' try: with file_path.open('rb') as file: files = {'image': (file_path.name, file, 'image/png')} data = {} async with AsyncClient() as client: response = await client.post(url, files=files, data=data, timeout=30) response.raise_for_status() json = response.json() if json.get('success'): return json['data']['url'] except: return None async def upload_image(file_path: Path | str) -> str | None: file_path = Path(file_path) return await telegraph_upload_png(file_path) or await upload_image_to_imgbb(file_path) async def optimize_and_upload(images_urls: list[str] | str) -> list[str]: images_urls = [images_urls] if isinstance(images_urls, str) else images_urls images_paths = await download_pngs(images_urls) await optimize_pngs(images_paths) new_images_urls = [] for image_path in images_paths: new_url = await upload_image(image_path) if new_url: new_images_urls.append(new_url) image_path.unlink(missing_ok=True) return new_images_urls from fastapi import FastAPI, HTTPException from fastapi.responses import PlainTextResponse from pydantic import BaseModel, HttpUrl from typing import List app = FastAPI() class ImageURLs(BaseModel): urls: List[HttpUrl] @app.get('/') async def read_root(): return PlainTextResponse('ну пролапс, ну и что', status_code=200) @app.post('/pngopt_by_urls/') async def optimize_images_endpoint(image_urls: ImageURLs): try: optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls]) return {"optimized_urls": optimized_urls} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": from uvicorn import run as uvicorn_run uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90)