File size: 7,267 Bytes
4597eac
f10d12b
4597eac
 
f10d12b
4597eac
41eb20c
4597eac
 
41eb20c
 
4597eac
41eb20c
 
4597eac
f10d12b
 
 
 
 
4597eac
bcefe57
 
4597eac
 
41eb20c
4597eac
 
 
 
 
 
 
 
f10d12b
 
4597eac
 
 
 
f10d12b
 
4597eac
 
 
 
 
 
 
 
 
 
 
f10d12b
346a13c
e3007c2
346a13c
e3007c2
4597eac
346a13c
4597eac
 
 
 
 
f10d12b
4597eac
 
 
 
 
 
 
 
 
f10d12b
4597eac
 
 
 
 
 
 
 
f10d12b
4597eac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f10d12b
4597eac
 
 
 
 
 
 
f10d12b
 
 
 
 
 
 
 
 
 
4597eac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from asyncio import create_subprocess_shell, gather, sleep
from logging import ERROR, INFO, basicConfig, getLogger
from pathlib import Path
from random import choice
from shutil import rmtree
from subprocess import CalledProcessError, PIPE
from typing import Any, List
from uuid import uuid4

from fastapi import FastAPI, HTTPException
from fastapi.responses import PlainTextResponse
from httpx import AsyncClient, HTTPStatusError, RequestError
from pydantic import BaseModel, HttpUrl
from uvicorn import run as uvicorn_run

need_logging = True

basicConfig(level = INFO if need_logging else ERROR)
logger = getLogger(__name__)

oxipng_bin = Path(__file__).parent / 'oxipng'
if not oxipng_bin.stat().st_mode & 0o111:
    oxipng_bin.chmod(0o755)

tokens = [
    # мне в общем-то все равно на эти токены
    '7e0ea3da6a73d77003c1abba7f0ea13c',
    'bc2e68b5918e5bb59ebca6c05d73daf9',
    'fecbfbe0938bcd1df27b7a9be1702cc9',
    '04e9981d4d0981964cb4c9753173244d',
    'dee75b07981c7aa211628ea7c7cbc03d',
]


async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path:
    logger.info(f'загрузка изображения: {url}')
    for attempt in range(retries):
        try:
            response = await client.get(url, timeout=30.0)
            response.raise_for_status()
            file_path = Path(__file__).parent / folder / f'{uuid4()}.png'
            file_path.parent.mkdir(parents=True, exist_ok=True)
            file_path.write_bytes(response.content)
            return file_path
        except (HTTPStatusError, RequestError) as e:
            if attempt < retries - 1:
                await sleep(2 ** attempt)
            else:
                raise e


async def download_pngs(urls: str | list[str]) -> list[Any]:
    urls = [urls] if isinstance(urls, str) else urls
    logger.info(f'скачивается список список из {len(urls)}: {urls}')
    # бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата:
    valid_urls = [url for url in urls if url and '\n' not in url and url.strip() != '']
    if len(valid_urls) != len(urls):
        logger.warning(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}')
    async with AsyncClient() as client:
        tasks = [download_png(url, str(uuid4()), client) for url in valid_urls]
        return list(await gather(*tasks))


async def optimize_png(image_path: Path, retries: int = 3) -> None:
    command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}'
    logger.info(f'оптимизация картинки {image_path}')
    for attempt in range(retries):
        try:
            process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE)
            stdout, stderr = await process.communicate()
            if process.returncode == 0:
                return
            else:
                raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr)
        except CalledProcessError as e:
            logger.error(f'ошибка при оптимизации {image_path}')
            if attempt < retries - 1:
                await sleep(2 ** attempt)
            else:
                raise e


async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None:
    image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
    logger.info(f'оптимизируется список список из {len(image_paths)}: {image_paths}')
    tasks = [optimize_png(image_path) for image_path in image_paths]
    await gather(*tasks)


async def telegraph_upload_png(file_path: str | Path) -> str | None:
    file_path = Path(file_path)
    if not file_path.is_file() or file_path.stat().st_size > 5 * 1024 * 1024:
        return None
    url = 'https://telegra.ph/upload'
    headers = {
        'authority': url.rsplit('/')[2],
        'accept': 'application/json, text/javascript, */*; q=0.01',
        'origin': url.rsplit('/', 1)[0],
        'referer': url.rsplit('/', 1)[0],
        'x-requested-with': 'XMLHttpRequest',
    }
    async with AsyncClient() as client:
        try:
            response = await client.post(url, headers=headers, files={'file': ('blob', file_path.read_bytes(), 'image/png')})
            response.raise_for_status()
            result = response.json()
        except:
            return None
    if response.is_success and 'error' not in result:
        link = result[0]['src']
        return url.rsplit('/', 1)[0] + link
    else:
        return None


async def upload_image_to_imgbb(file_path: Path) -> str | None:
    url = f'https://api.imgbb.com/1/upload?key={choice(tokens)}'
    try:
        with file_path.open('rb') as file:
            files = {'image': (file_path.name, file, 'image/png')}
            data = {}
            async with AsyncClient() as client:
                response = await client.post(url, files=files, data=data, timeout=30)
                response.raise_for_status()
                json = response.json()
                if json.get('success'):
                    return json['data']['url']
    except:
        return None


async def upload_image(file_path: Path | str) -> str | None:
    file_path = Path(file_path)
    return await telegraph_upload_png(file_path) or await upload_image_to_imgbb(file_path)


async def optimize_and_upload(images_urls: list[str] | str) -> list[str]:
    images_urls = [images_urls] if isinstance(images_urls, str) else images_urls
    logger.info(f'принятые ссылки в обработку ({len(images_urls)}): {images_urls}')
    images_paths = await download_pngs(images_urls)
    await optimize_pngs(images_paths)
    new_images_urls = []
    for image_path in images_paths:
        new_url = await upload_image(image_path)
        if new_url:
            new_images_urls.append(new_url)
        logger.info(f'загружено изображение {image_path} в {new_url}')
        try:
            image_path.unlink()
        except Exception as e:
            logger.error(f'не удалось удалить файл {image_path}: {e}')
    logger.info(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}')
    try:
        rmtree(images_paths[0].parent)
    except Exception as e:
        logger.error(f'не удалось удалить файл {images_paths[0].parent}: {e}')
    return new_images_urls


app = FastAPI()


class ImageURLs(BaseModel):
    urls: List[HttpUrl]


@app.get('/')
async def read_root():
    return PlainTextResponse('ну пролапс, ну и что', status_code=200)


@app.post('/pngopt_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
    try:
        optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls])
        return {"optimized_urls": optimized_urls}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


if __name__ == "__main__":
    uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90)