Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import requests | |
import time | |
import torch | |
import spaces | |
from diffusers import StableDiffusionControlNetImg2ImgPipeline, ControlNetModel, DDIMScheduler | |
from diffusers.pipelines.stable_diffusion import StableDiffusionSafetyChecker | |
from diffusers.models import AutoencoderKL | |
from diffusers.models.attention_processor import AttnProcessor2_0 | |
from PIL import Image | |
import cv2 | |
import pandas as pd | |
from RealESRGAN import RealESRGAN | |
import gradio as gr | |
from gradio_imageslider import ImageSlider | |
USE_TORCH_COMPILE = False | |
ENABLE_CPU_OFFLOAD = os.getenv("ENABLE_CPU_OFFLOAD", "0") == "1" | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
def download_file(url, folder_path, filename): | |
if not os.path.exists(folder_path): | |
os.makedirs(folder_path) | |
file_path = os.path.join(folder_path, filename) | |
if os.path.isfile(file_path): | |
print(f"File already exists: {file_path}") | |
else: | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
with open(file_path, 'wb') as file: | |
for chunk in response.iter_content(chunk_size=1024): | |
file.write(chunk) | |
print(f"File successfully downloaded and saved: {file_path}") | |
else: | |
print(f"Error downloading the file. Status code: {response.status_code}") | |
def download_models(): | |
models = { | |
"MODEL": ("https://huggingface.co/dantea1118/juggernaut_reborn/resolve/main/juggernaut_reborn.safetensors?download=true", "models/models/Stable-diffusion", "juggernaut_reborn.safetensors"), | |
"UPSCALER_X2": ("https://huggingface.co/ai-forever/Real-ESRGAN/resolve/main/RealESRGAN_x2.pth?download=true", "models/upscalers/", "RealESRGAN_x2.pth"), | |
"UPSCALER_X4": ("https://huggingface.co/ai-forever/Real-ESRGAN/resolve/main/RealESRGAN_x4.pth?download=true", "models/upscalers/", "RealESRGAN_x4.pth"), | |
"NEGATIVE_1": ("https://huggingface.co/philz1337x/embeddings/resolve/main/verybadimagenegative_v1.3.pt?download=true", "models/embeddings", "verybadimagenegative_v1.3.pt"), | |
"NEGATIVE_2": ("https://huggingface.co/datasets/AddictiveFuture/sd-negative-embeddings/resolve/main/JuggernautNegative-neg.pt?download=true", "models/embeddings", "JuggernautNegative-neg.pt"), | |
"LORA_1": ("https://huggingface.co/philz1337x/loras/resolve/main/SDXLrender_v2.0.safetensors?download=true", "models/Lora", "SDXLrender_v2.0.safetensors"), | |
"LORA_2": ("https://huggingface.co/philz1337x/loras/resolve/main/more_details.safetensors?download=true", "models/Lora", "more_details.safetensors"), | |
"CONTROLNET": ("https://huggingface.co/lllyasviel/ControlNet-v1-1/resolve/main/control_v11f1e_sd15_tile.pth?download=true", "models/ControlNet", "control_v11f1e_sd15_tile.pth"), | |
"VAE": ("https://huggingface.co/stabilityai/sd-vae-ft-mse-original/resolve/main/vae-ft-mse-840000-ema-pruned.safetensors?download=true", "models/VAE", "vae-ft-mse-840000-ema-pruned.safetensors"), | |
} | |
for model, (url, folder, filename) in models.items(): | |
download_file(url, folder, filename) | |
download_models() | |
def timer_func(func): | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = func(*args, **kwargs) | |
end_time = time.time() | |
print(f"{func.__name__} took {end_time - start_time:.2f} seconds") | |
return result | |
return wrapper | |
class LazyLoadPipeline: | |
def __init__(self): | |
self.pipe = None | |
def load(self): | |
if self.pipe is None: | |
print("Starting to load the pipeline...") | |
self.pipe = self.setup_pipeline() | |
print(f"Moving pipeline to device: {device}") | |
self.pipe.to(device) | |
if USE_TORCH_COMPILE: | |
print("Compiling the model...") | |
self.pipe.unet = torch.compile(self.pipe.unet, mode="reduce-overhead", fullgraph=True) | |
def setup_pipeline(self): | |
print("Setting up the pipeline...") | |
controlnet = ControlNetModel.from_single_file( | |
"models/ControlNet/control_v11f1e_sd15_tile.pth", torch_dtype=torch.float16 | |
) | |
safety_checker = StableDiffusionSafetyChecker.from_pretrained("CompVis/stable-diffusion-safety-checker") | |
model_path = "models/models/Stable-diffusion/juggernaut_reborn.safetensors" | |
pipe = StableDiffusionControlNetImg2ImgPipeline.from_single_file( | |
model_path, | |
controlnet=controlnet, | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
safety_checker=safety_checker | |
) | |
vae = AutoencoderKL.from_single_file( | |
"models/VAE/vae-ft-mse-840000-ema-pruned.safetensors", | |
torch_dtype=torch.float16 | |
) | |
pipe.vae = vae | |
pipe.load_textual_inversion("models/embeddings/verybadimagenegative_v1.3.pt") | |
pipe.load_textual_inversion("models/embeddings/JuggernautNegative-neg.pt") | |
pipe.load_lora_weights("models/Lora/SDXLrender_v2.0.safetensors") | |
pipe.fuse_lora(lora_scale=0.5) | |
pipe.load_lora_weights("models/Lora/more_details.safetensors") | |
pipe.fuse_lora(lora_scale=1.) | |
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config) | |
pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.3, b2=1.4) | |
return pipe | |
def __call__(self, *args, **kwargs): | |
return self.pipe(*args, **kwargs) | |
class LazyRealESRGAN: | |
def __init__(self, device, scale): | |
self.device = device | |
self.scale = scale | |
self.model = None | |
def load_model(self): | |
if self.model is None: | |
self.model = RealESRGAN(self.device, scale=self.scale) | |
self.model.load_weights(f'models/upscalers/RealESRGAN_x{self.scale}.pth', download=False) | |
def predict(self, img): | |
self.load_model() | |
return self.model.predict(img) | |
lazy_realesrgan_x2 = LazyRealESRGAN(device, scale=2) | |
lazy_realesrgan_x4 = LazyRealESRGAN(device, scale=4) | |
def resize_and_upscale(input_image, resolution): | |
scale = 2 if resolution <= 2048 else 4 | |
input_image = input_image.convert("RGB") | |
W, H = input_image.size | |
k = float(resolution) / min(H, W) | |
H = int(round(H * k / 64.0)) * 64 | |
W = int(round(W * k / 64.0)) * 64 | |
img = input_image.resize((W, H), resample=Image.LANCZOS) | |
if scale == 2: | |
img = lazy_realesrgan_x2.predict(img) | |
else: | |
img = lazy_realesrgan_x4.predict(img) | |
return img | |
def create_hdr_effect(original_image, hdr): | |
if hdr == 0: | |
return original_image | |
cv_original = cv2.cvtColor(pd.DataFrame(original_image).to_numpy(), cv2.COLOR_RGB2BGR) | |
factors = [1.0 - 0.9 * hdr, 1.0 - 0.7 * hdr, 1.0 - 0.45 * hdr, | |
1.0 - 0.25 * hdr, 1.0, 1.0 + 0.2 * hdr, | |
1.0 + 0.4 * hdr, 1.0 + 0.6 * hdr, 1.0 + 0.8 * hdr] | |
images = [cv2.convertScaleAbs(cv_original, alpha=factor) for factor in factors] | |
merge_mertens = cv2.createMergeMertens() | |
hdr_image = merge_mertens.process(images) | |
hdr_image_8bit = pd.DataFrame(np.clip(hdr_image * 255, 0, 255).astype('uint8')).to_numpy() | |
return Image.fromarray(cv2.cvtColor(hdr_image_8bit, cv2.COLOR_BGR2RGB)) | |
def apply_denoising(image, strength): | |
return cv2.fastNlMeansDenoisingColored(image, None, strength * 10, strength * 10, 7, 21) | |
def apply_sharpening(image, intensity): | |
kernel = pd.DataFrame([[0, -1, 0], [-1, 5 + intensity * 4, -1], [0, -1, 0]]).to_numpy() | |
return cv2.filter2D(image, -1, kernel) | |
def prepare_image(input_image, resolution, hdr): | |
resized_image = resize_and_upscale(input_image, resolution) | |
if hdr > 0: | |
resized_image = create_hdr_effect(resized_image, hdr) | |
return resized_image | |
lazy_pipe = LazyLoadPipeline() | |
def gradio_process_images(input_images, model_choice, custom_prompt, custom_negative_prompt, resolution, num_inference_steps, strength, hdr, guidance_scale, denoising_strength, sharpening_intensity): | |
results = [] | |
for input_image in input_images: | |
lazy_pipe.load() | |
condition_image = prepare_image(input_image, resolution, hdr) | |
options = { | |
"prompt": custom_prompt, | |
"negative_prompt": custom_negative_prompt, | |
"image": condition_image, | |
"strength": strength, | |
"guidance_scale": guidance_scale, | |
"num_inference_steps": num_inference_steps, | |
"control_image": condition_image, | |
} | |
result = lazy_pipe(**options).images[0] | |
result = pd.DataFrame(result).to_numpy() | |
if denoising_strength > 0: | |
result = apply_denoising(result, denoising_strength) | |
if sharpening_intensity > 0: | |
result = apply_sharpening(result, sharpening_intensity) | |
results.append(result) | |
return results | |
def update_live_preview(input_image, model_choice, custom_prompt, custom_negative_prompt, resolution, num_inference_steps, strength, hdr, guidance_scale): | |
lazy_pipe.load() | |
condition_image = prepare_image(input_image, resolution, hdr) | |
options = { | |
"prompt": custom_prompt, | |
"negative_prompt": custom_negative_prompt, | |
"image": condition_image, | |
"strength": strength, | |
"guidance_scale": guidance_scale, | |
"num_inference_steps": num_inference_steps, | |
"control_image": condition_image, | |
} | |
result = lazy_pipe(**options).images[0] | |
return pd.DataFrame(result).to_numpy() | |
# Gradio Interface | |
input_images = gr.File(label="Input Images", type="file", multiple=True) | |
model_choice = gr.Dropdown(choices=["Model A", "Model B"], value="Model A", label="Select Model") | |
custom_prompt = gr.Textbox(label="Custom Prompt", value="masterpiece, best quality, highres") | |
custom_negative_prompt = gr.Textbox(label="Custom Negative Prompt", value="low quality, normal quality, ugly, blurry, blur, lowres, bad anatomy, bad hands, cropped, worst quality, verybadimagenegative_v1.3, JuggernautNegative-neg") | |
resolution = gr.Slider(minimum=256, maximum=4096, value=512, step=64, label="Resolution") | |
num_inference_steps = gr.Slider(minimum=1, maximum=100, value=20, step=1, label="Number of Inference Steps") | |
strength = gr.Slider(minimum=0.0, maximum=1.0, value=0.75, step=0.05, label="Strength") | |
hdr = gr.Slider(minimum=0.0, maximum=1.0, value=0.5, step=0.05, label="HDR Effect") | |
guidance_scale = gr.Slider(minimum=0.1, maximum=30.0, value=7.5, step=0.1, label="Guidance Scale") | |
denoising_strength = gr.Slider(minimum=0, maximum=1, value=0.5, step=0.1, label="Denoising Strength") | |
sharpening_intensity = gr.Slider(minimum=0, maximum=1, value=0.5, step=0.1, label="Sharpening Intensity") | |
output_slider = ImageSlider(label="Processed Images") | |
live_preview = gr.Image(type="numpy", label="Live Preview") | |
run_button = gr.Button("Run") | |
run_button.click( | |
fn=gradio_process_images, | |
inputs=[input_images, model_choice, custom_prompt, custom_negative_prompt, resolution, num_inference_steps, strength, hdr, guidance_scale, denoising_strength, sharpening_intensity], | |
outputs=output_slider | |
) | |
# Live preview updates every second | |
gr.Interface( | |
fn=update_live_preview, | |
inputs=[input_images, model_choice, custom_prompt, custom_negative_prompt, resolution, num_inference_steps, strength, hdr, guidance_scale], | |
outputs=live_preview, | |
live=True | |
) | |
demo = gr.Interface( | |
fn=gradio_process_images, | |
inputs=[input_images, model_choice, custom_prompt, custom_negative_prompt, resolution, num_inference_steps, strength, hdr, guidance_scale, denoising_strength, sharpening_intensity], | |
outputs=output_slider | |
) | |
demo.launch() | |