import os import socket import sys import json import traceback import logging import shlex import urllib from rich.logging import RichHandler from sdkit.utils import log as sdkit_log # hack, so we can overwrite the log config from easydiffusion import task_manager from easydiffusion.utils import log # Remove all handlers associated with the root logger object. for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) LOG_FORMAT = "%(asctime)s.%(msecs)03d %(levelname)s %(threadName)s %(message)s" logging.basicConfig( level=logging.INFO, format=LOG_FORMAT, datefmt="%X", handlers=[RichHandler(markup=True, rich_tracebacks=False, show_time=False, show_level=False)], ) SD_DIR = os.getcwd() SD_UI_DIR = os.getenv("SD_UI_PATH", None) CONFIG_DIR = os.path.abspath(os.path.join(SD_UI_DIR, "..", "scripts")) MODELS_DIR = os.path.abspath(os.path.join(SD_DIR, "..", "models")) USER_PLUGINS_DIR = os.path.abspath(os.path.join(SD_DIR, "..", "plugins")) CORE_PLUGINS_DIR = os.path.abspath(os.path.join(SD_UI_DIR, "plugins")) USER_UI_PLUGINS_DIR = os.path.join(USER_PLUGINS_DIR, "ui") CORE_UI_PLUGINS_DIR = os.path.join(CORE_PLUGINS_DIR, "ui") USER_SERVER_PLUGINS_DIR = os.path.join(USER_PLUGINS_DIR, "server") UI_PLUGINS_SOURCES = ((CORE_UI_PLUGINS_DIR, "core"), (USER_UI_PLUGINS_DIR, "user")) sys.path.append(os.path.dirname(SD_UI_DIR)) sys.path.append(USER_SERVER_PLUGINS_DIR) OUTPUT_DIRNAME = "Stable Diffusion UI" # in the user's home folder PRESERVE_CONFIG_VARS = ["FORCE_FULL_PRECISION"] TASK_TTL = 15 * 60 # Discard last session's task timeout APP_CONFIG_DEFAULTS = { # auto: selects the cuda device with the most free memory, cuda: use the currently active cuda device. "render_devices": "auto", # valid entries: 'auto', 'cpu' or 'cuda:N' (where N is a GPU index) "update_branch": "main", "ui": { "open_browser_on_start": True, }, } IMAGE_EXTENSIONS = [".png", ".apng", ".jpg", ".jpeg", ".jfif", ".pjpeg", ".pjp", ".jxl", ".gif", ".webp", ".avif", ".svg"] CUSTOM_MODIFIERS_DIR = os.path.abspath(os.path.join(SD_DIR, "..", "modifiers")) CUSTOM_MODIFIERS_PORTRAIT_EXTENSIONS=[".portrait", "_portrait", " portrait", "-portrait"] CUSTOM_MODIFIERS_LANDSCAPE_EXTENSIONS=[".landscape", "_landscape", " landscape", "-landscape"] def init(): os.makedirs(USER_UI_PLUGINS_DIR, exist_ok=True) os.makedirs(USER_SERVER_PLUGINS_DIR, exist_ok=True) load_server_plugins() update_render_threads() def getConfig(default_val=APP_CONFIG_DEFAULTS): try: config_json_path = os.path.join(CONFIG_DIR, "config.json") if not os.path.exists(config_json_path): config = default_val else: with open(config_json_path, "r", encoding="utf-8") as f: config = json.load(f) if "net" not in config: config["net"] = {} if os.getenv("SD_UI_BIND_PORT") is not None: config["net"]["listen_port"] = int(os.getenv("SD_UI_BIND_PORT")) else: config["net"]["listen_port"] = 9000 if os.getenv("SD_UI_BIND_IP") is not None: config["net"]["listen_to_network"] = os.getenv("SD_UI_BIND_IP") == "0.0.0.0" else: config["net"]["listen_to_network"] = True return config except Exception as e: log.warn(traceback.format_exc()) return default_val def setConfig(config): try: # config.json config_json_path = os.path.join(CONFIG_DIR, "config.json") with open(config_json_path, "w", encoding="utf-8") as f: json.dump(config, f) except: log.error(traceback.format_exc()) try: # config.bat config_bat_path = os.path.join(CONFIG_DIR, "config.bat") config_bat = [] if "update_branch" in config: config_bat.append(f"@set update_branch={config['update_branch']}") config_bat.append(f"@set SD_UI_BIND_PORT={config['net']['listen_port']}") bind_ip = "0.0.0.0" if config["net"]["listen_to_network"] else "127.0.0.1" config_bat.append(f"@set SD_UI_BIND_IP={bind_ip}") # Preserve these variables if they are set for var in PRESERVE_CONFIG_VARS: if os.getenv(var) is not None: config_bat.append(f"@set {var}={os.getenv(var)}") if len(config_bat) > 0: with open(config_bat_path, "w", encoding="utf-8") as f: f.write("\n".join(config_bat)) except: log.error(traceback.format_exc()) try: # config.sh config_sh_path = os.path.join(CONFIG_DIR, "config.sh") config_sh = ["#!/bin/bash"] if "update_branch" in config: config_sh.append(f"export update_branch={config['update_branch']}") config_sh.append(f"export SD_UI_BIND_PORT={config['net']['listen_port']}") bind_ip = "0.0.0.0" if config["net"]["listen_to_network"] else "127.0.0.1" config_sh.append(f"export SD_UI_BIND_IP={bind_ip}") # Preserve these variables if they are set for var in PRESERVE_CONFIG_VARS: if os.getenv(var) is not None: config_bat.append(f'export {var}="{shlex.quote(os.getenv(var))}"') if len(config_sh) > 1: with open(config_sh_path, "w", encoding="utf-8") as f: f.write("\n".join(config_sh)) except: log.error(traceback.format_exc()) def save_to_config(ckpt_model_name, vae_model_name, hypernetwork_model_name, vram_usage_level): config = getConfig() if "model" not in config: config["model"] = {} config["model"]["stable-diffusion"] = ckpt_model_name config["model"]["vae"] = vae_model_name config["model"]["hypernetwork"] = hypernetwork_model_name if vae_model_name is None or vae_model_name == "": del config["model"]["vae"] if hypernetwork_model_name is None or hypernetwork_model_name == "": del config["model"]["hypernetwork"] config["vram_usage_level"] = vram_usage_level setConfig(config) def update_render_threads(): config = getConfig() render_devices = config.get("render_devices", "auto") active_devices = task_manager.get_devices()["active"].keys() log.debug(f"requesting for render_devices: {render_devices}") task_manager.update_render_threads(render_devices, active_devices) def getUIPlugins(): plugins = [] for plugins_dir, dir_prefix in UI_PLUGINS_SOURCES: for file in os.listdir(plugins_dir): if file.endswith(".plugin.js"): plugins.append(f"/plugins/{dir_prefix}/{file}") return plugins def load_server_plugins(): if not os.path.exists(USER_SERVER_PLUGINS_DIR): return import importlib def load_plugin(file): mod_path = file.replace(".py", "") return importlib.import_module(mod_path) def apply_plugin(file, plugin): if hasattr(plugin, "get_cond_and_uncond"): import sdkit.generate.image_generator sdkit.generate.image_generator.get_cond_and_uncond = plugin.get_cond_and_uncond log.info(f"Overridden get_cond_and_uncond with the one in the server plugin: {file}") for file in os.listdir(USER_SERVER_PLUGINS_DIR): file_path = os.path.join(USER_SERVER_PLUGINS_DIR, file) if (not os.path.isdir(file_path) and not file_path.endswith("_plugin.py")) or ( os.path.isdir(file_path) and not file_path.endswith("_plugin") ): continue try: log.info(f"Loading server plugin: {file}") mod = load_plugin(file) log.info(f"Applying server plugin: {file}") apply_plugin(file, mod) except: log.warn(f"Error while loading a server plugin") log.warn(traceback.format_exc()) def getIPConfig(): try: ips = socket.gethostbyname_ex(socket.gethostname()) ips[2].append(ips[0]) return ips[2] except Exception as e: log.exception(e) return [] def open_browser(): config = getConfig() ui = config.get("ui", {}) net = config.get("net", {"listen_port": 9000}) port = net.get("listen_port", 9000) if ui.get("open_browser_on_start", True): import webbrowser webbrowser.open(f"http://localhost:{port}") def get_image_modifiers(): modifiers_json_path = os.path.join(SD_UI_DIR, "modifiers.json") modifier_categories = {} original_category_order=[] with open(modifiers_json_path, "r", encoding="utf-8") as f: modifiers_file = json.load(f) # The trailing slash is needed to support symlinks if not os.path.isdir(f"{CUSTOM_MODIFIERS_DIR}/"): return modifiers_file # convert modifiers from a list of objects to a dict of dicts for category_item in modifiers_file: category_name = category_item['category'] original_category_order.append(category_name) category = {} for modifier_item in category_item['modifiers']: modifier = {} for preview_item in modifier_item['previews']: modifier[preview_item['name']] = preview_item['path'] category[modifier_item['modifier']] = modifier modifier_categories[category_name] = category def scan_directory(directory_path: str, category_name="Modifiers"): for entry in os.scandir(directory_path): if entry.is_file(): file_extension = list(filter(lambda e: entry.name.endswith(e), IMAGE_EXTENSIONS)) if len(file_extension) == 0: continue modifier_name = entry.name[: -len(file_extension[0])] modifier_path = f"custom/{entry.path[len(CUSTOM_MODIFIERS_DIR) + 1:]}" # URL encode path segments modifier_path = "/".join(map(lambda segment: urllib.parse.quote(segment), modifier_path.split("/"))) is_portrait = True is_landscape = True portrait_extension = list(filter(lambda e: modifier_name.lower().endswith(e), CUSTOM_MODIFIERS_PORTRAIT_EXTENSIONS)) landscape_extension = list(filter(lambda e: modifier_name.lower().endswith(e), CUSTOM_MODIFIERS_LANDSCAPE_EXTENSIONS)) if len(portrait_extension) > 0: is_landscape = False modifier_name = modifier_name[: -len(portrait_extension[0])] elif len(landscape_extension) > 0: is_portrait = False modifier_name = modifier_name[: -len(landscape_extension[0])] if (category_name not in modifier_categories): modifier_categories[category_name] = {} category = modifier_categories[category_name] if (modifier_name not in category): category[modifier_name] = {} if (is_portrait or "portrait" not in category[modifier_name]): category[modifier_name]["portrait"] = modifier_path if (is_landscape or "landscape" not in category[modifier_name]): category[modifier_name]["landscape"] = modifier_path elif entry.is_dir(): scan_directory( entry.path, entry.name if directory_path==CUSTOM_MODIFIERS_DIR else f"{category_name}/{entry.name}", ) scan_directory(CUSTOM_MODIFIERS_DIR) custom_categories = sorted( [cn for cn in modifier_categories.keys() if cn not in original_category_order], key=str.casefold, ) # convert the modifiers back into a list of objects modifier_categories_list = [] for category_name in [*original_category_order, *custom_categories]: category = { 'category': category_name, 'modifiers': [] } for modifier_name in sorted(modifier_categories[category_name].keys(), key=str.casefold): modifier = { 'modifier': modifier_name, 'previews': [] } for preview_name, preview_path in modifier_categories[category_name][modifier_name].items(): modifier['previews'].append({ 'name': preview_name, 'path': preview_path }) category['modifiers'].append(modifier) modifier_categories_list.append(category) return modifier_categories_list