Spaces:
Sleeping
Sleeping
import json | |
import pathlib | |
import time | |
from typing import cast | |
import click | |
import filetype | |
from tqdm import tqdm | |
from watchdog.events import FileSystemEvent, FileSystemEventHandler | |
from watchdog.observers import Observer | |
from ..bg import remove | |
from ..session_factory import new_session | |
from ..sessions import sessions_names | |
def p_command( | |
model: str, | |
extras: str, | |
input: pathlib.Path, | |
output: pathlib.Path, | |
watch: bool, | |
**kwargs, | |
) -> None: | |
try: | |
kwargs.update(json.loads(extras)) | |
except Exception: | |
pass | |
session = new_session(model) | |
def process(each_input: pathlib.Path) -> None: | |
try: | |
mimetype = filetype.guess(each_input) | |
if mimetype is None: | |
return | |
if mimetype.mime.find("image") < 0: | |
return | |
each_output = (output / each_input.name).with_suffix(".png") | |
each_output.parents[0].mkdir(parents=True, exist_ok=True) | |
if not each_output.exists(): | |
each_output.write_bytes( | |
cast( | |
bytes, | |
remove(each_input.read_bytes(), session=session, **kwargs), | |
) | |
) | |
if watch: | |
print( | |
f"processed: {each_input.absolute()} -> {each_output.absolute()}" | |
) | |
except Exception as e: | |
print(e) | |
inputs = list(input.glob("**/*")) | |
if not watch: | |
inputs = tqdm(inputs) | |
for each_input in inputs: | |
if not each_input.is_dir(): | |
process(each_input) | |
if watch: | |
observer = Observer() | |
class EventHandler(FileSystemEventHandler): | |
def on_any_event(self, event: FileSystemEvent) -> None: | |
if not ( | |
event.is_directory or event.event_type in ["deleted", "closed"] | |
): | |
process(pathlib.Path(event.src_path)) | |
event_handler = EventHandler() | |
observer.schedule(event_handler, input, recursive=False) | |
observer.start() | |
try: | |
while True: | |
time.sleep(1) | |
finally: | |
observer.stop() | |
observer.join() | |