import os import importlib import importlib.util import subprocess import sys import re import logging from functools import lru_cache logging.getLogger("torch.distributed.nn").setLevel(logging.ERROR) # sshh... logging.getLogger("xformers").addFilter(lambda record: 'A matching Triton is not available' not in record.getMessage()) python = sys.executable git = os.environ.get('GIT', "git") default_command_live = (os.environ.get('LAUNCH_LIVE_OUTPUT') == "1") index_url = os.environ.get('INDEX_URL', "") modules_path = os.path.dirname(os.path.realpath(__file__)) script_path = os.path.dirname(modules_path) dir_repos = "repositories" fooocus_tag = '1.0.0' @lru_cache() def commit_hash(): try: return subprocess.check_output([git, "rev-parse", "HEAD"], shell=False, encoding='utf8').strip() except Exception: return "" def git_clone(url, dir, name, commithash=None): # TODO clone into temporary dir and move if successful if os.path.exists(dir): if commithash is None: return current_hash = run(f'"{git}" -C "{dir}" rev-parse HEAD', None, f"Couldn't determine {name}'s hash: {commithash}", live=False).strip() if current_hash == commithash: return run(f'"{git}" -C "{dir}" fetch', f"Fetching updates for {name}...", f"Couldn't fetch {name}") run(f'"{git}" -C "{dir}" checkout {commithash}', f"Checking out commit for {name} with hash: {commithash}...", f"Couldn't checkout commit {commithash} for {name}", live=True) return run(f'"{git}" clone "{url}" "{dir}"', f"Cloning {name} into {dir}...", f"Couldn't clone {name}", live=True) if commithash is not None: run(f'"{git}" -C "{dir}" checkout {commithash}', None, "Couldn't checkout {name}'s hash: {commithash}") def repo_dir(name): return os.path.join(script_path, dir_repos, name) def is_installed(package): try: spec = importlib.util.find_spec(package) except ModuleNotFoundError: return False return spec is not None def run(command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live) -> str: if desc is not None: print(desc) run_kwargs = { "args": command, "shell": True, "env": os.environ if custom_env is None else custom_env, "encoding": 'utf8', "errors": 'ignore', } if not live: run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE result = subprocess.run(**run_kwargs) if result.returncode != 0: error_bits = [ f"{errdesc or 'Error running command'}.", f"Command: {command}", f"Error code: {result.returncode}", ] if result.stdout: error_bits.append(f"stdout: {result.stdout}") if result.stderr: error_bits.append(f"stderr: {result.stderr}") raise RuntimeError("\n".join(error_bits)) return (result.stdout or "") def run_pip(command, desc=None, live=default_command_live): index_url_line = f' --index-url {index_url}' if index_url != '' else '' return run(f'"{python}" -m pip {command} --prefer-binary{index_url_line}', desc=f"Installing {desc}", errdesc=f"Couldn't install {desc}", live=live) re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*") def requirements_met(requirements_file): """ Does a simple parse of a requirements.txt file to determine if all rerqirements in it are already installed. Returns True if so, False if not installed or parsing fails. """ import importlib.metadata import packaging.version with open(requirements_file, "r", encoding="utf8") as file: for line in file: if line.strip() == "": continue m = re.match(re_requirement, line) if m is None: return False package = m.group(1).strip() version_required = (m.group(2) or "").strip() if version_required == "": continue try: version_installed = importlib.metadata.version(package) except Exception: return False if packaging.version.parse(version_required) != packaging.version.parse(version_installed): return False return True