Spaces:
Runtime error
Runtime error
import os | |
import time | |
from io import BytesIO | |
import uuid | |
import torch | |
import gradio as gr | |
import spaces | |
import numpy as np | |
from einops import rearrange | |
from PIL import Image, ExifTags | |
from dataclasses import dataclass | |
from flux.sampling import denoise, get_noise, get_schedule, prepare, unpack, prepare_tokens | |
from flux.util import configs, embed_watermark, load_ae, load_clip, load_flow_model, load_t5 | |
import jax | |
import jax.numpy as jnp | |
from flax import nnx | |
from jax import Array as Tensor | |
from einops import repeat | |
class SamplingOptions: | |
prompt: str | |
width: int | |
height: int | |
num_steps: int | |
guidance: float | |
seed: int | None | |
NSFW_THRESHOLD = 0.85 | |
def get_models(name: str, device: torch.device, offload: bool, is_schnell: bool): | |
t5 = load_t5(device, max_length=256 if is_schnell else 512) | |
clip = load_clip(device) | |
model = load_flow_model(name, device="cpu" if offload else device) | |
ae = load_ae(name, device="cpu" if offload else device) | |
# nsfw_classifier = pipeline("image-classification", model="Falconsai/nsfw_image_detection", device=device) | |
# return model, ae, t5, clip, nsfw_classifier | |
return nnx.split(model), nnx.split(ae), nnx.split(t5), t5.tokenizer, nnx.split(clip), clip.tokenizer, None | |
def encode(ae,x): | |
ae=nnx.merge(*ae) | |
return ae.encode(x) | |
def _generate(model, ae, t5, clip, x, t5_tokens, clip_tokens, num_steps, guidance, | |
#init_image=None, | |
#image2image_strength=0.0, | |
shift=True): | |
b,h,w,c=x.shape | |
model=nnx.merge(*model) | |
ae=nnx.merge(*ae) | |
t5=nnx.merge(*t5) | |
clip=nnx.merge(*clip) | |
timesteps = get_schedule( | |
num_steps, | |
x.shape[-1] * x.shape[-2] // 4, | |
shift=shift, | |
) | |
# if init_image is not None: | |
# t_idx = int((1 - image2image_strength) * num_steps) | |
# t = timesteps[t_idx] | |
# timesteps = timesteps[t_idx:] | |
# x = t * x + (1.0 - t) * init_image.astype(x.dtype) | |
inp = prepare(t5, clip, x, t5_tokens, clip_tokens) | |
x = denoise(model, **inp, timesteps=timesteps, guidance=guidance) | |
x = unpack(x.astype(jnp.float32), h*8, w*8) | |
x = ae.decode(x) | |
return x | |
generate=jax.jit(_generate, static_argnames=("num_steps","shift")) | |
def prepare_tokens(t5_tokenizer, clip_tokenizer, prompt: str | list[str]) -> tuple[Tensor, Tensor]: | |
if isinstance(prompt, str): | |
prompt = [prompt] | |
t5_tokens = t5_tokenizer( | |
prompt, | |
truncation=True, | |
max_length=512, | |
return_length=False, | |
return_overflowing_tokens=False, | |
padding="max_length", | |
return_tensors="jax", | |
)["input_ids"] | |
clip_tokens = clip_tokenizer( | |
prompt, | |
truncation=True, | |
max_length=77, | |
return_length=False, | |
return_overflowing_tokens=False, | |
padding="max_length", | |
return_tensors="jax", | |
)["input_ids"] | |
return t5_tokens, clip_tokens | |
class FluxGenerator: | |
def __init__(self, model_name: str, device: str, offload: bool): | |
self.device = None | |
self.offload = offload | |
self.model_name = model_name | |
self.is_schnell = model_name == "flux-schnell" | |
self.model, self.ae, self.t5, self.t5_tokenizer, self.clip, self.clip_tokenizer, self.nsfw_classifier = get_models( | |
model_name, | |
device=self.device, | |
offload=self.offload, | |
is_schnell=self.is_schnell, | |
) | |
self.key = jax.random.key(0) | |
def generate_image( | |
self, | |
img_size, | |
num_steps, | |
guidance, | |
seed, | |
prompt, | |
# init_image=None, | |
# image2image_strength=0.0, | |
add_sampling_metadata=True, | |
): | |
seed = int(seed) | |
if seed == -1: | |
seed = None | |
if img_size == "1,024x1,024": | |
width, height = 1024, 1024 | |
else: | |
width, height = 512, 512 | |
opts = SamplingOptions( | |
prompt=prompt, | |
width=width, | |
height=height, | |
num_steps=num_steps, | |
guidance=guidance, | |
seed=seed, | |
) | |
if opts.seed is None: | |
# opts.seed = torch.Generator(device="cpu").seed() | |
key,self.key=jax.random.split(self.key,2) | |
opts.seed=jax.random.randint(key,(),0,2**30) | |
print(f"Generating '{opts.prompt}' with seed {opts.seed}") | |
t0 = time.perf_counter() | |
# if init_image is not None: | |
# if isinstance(init_image, np.ndarray): | |
# init_image = jnp.asarray(init_image).astype(jnp.float32) / 255.0 | |
# init_image = init_image[None] | |
# # init_image = torch.nn.functional.interpolate(init_image, (opts.height, opts.width)) | |
# init_image = jax.image.resize(init_image, (opts.height, opts.width), method="lanczos5") | |
# # if self.offload: | |
# # self.ae.encoder.to(self.device) | |
# # init_image = self.ae.encode(init_image) | |
# init_image = encode(self.ae, init_image) | |
# prepare input | |
t5_tokens, clip_tokens = prepare_tokens(self.t5_tokenizer, self.clip_tokenizer, prompt=opts.prompt) | |
x = get_noise( | |
1, | |
opts.height, | |
opts.width, | |
device=None, | |
dtype=jnp.bfloat16, | |
seed=opts.seed, | |
) | |
x = generate(self.model, self.ae, self.t5, self.clip, x, t5_tokens, clip_tokens, opts.num_steps, opts.guidance, shift=(not self.is_schnell)) | |
t1 = time.perf_counter() | |
# print(f"Done in {t1 - t0:.1f}s.") | |
runtime = t1 - t0 | |
# print(f"Done in {t1 - t0:.1f}s.") | |
# bring into PIL format | |
x= jnp.clip(x, -1, 1) | |
# x = embed_watermark(x.astype(jnp.float32)) | |
# x = rearrange(x[0], "c h w -> h w c") | |
img = Image.fromarray(np.asarray((127.5 * (x[0] + 1.0))).astype(np.uint8)) | |
# img = Image.fromarray((127.5 * (x + 1.0)).cpu().byte().numpy()) | |
# nsfw_score = [x["score"] for x in self.nsfw_classifier(img) if x["label"] == "nsfw"][0] | |
if True: | |
filename = f"output/gradio/{uuid.uuid4()}.jpg" | |
os.makedirs(os.path.dirname(filename), exist_ok=True) | |
exif_data = Image.Exif() | |
# if init_image is None: | |
exif_data[ExifTags.Base.Software] = "AI generated;txt2img;flux" | |
# else: | |
# exif_data[ExifTags.Base.Software] = "AI generated;img2img;flux" | |
exif_data[ExifTags.Base.Make] = "Black Forest Labs" | |
exif_data[ExifTags.Base.Model] = self.model_name | |
if add_sampling_metadata: | |
exif_data[ExifTags.Base.ImageDescription] = prompt | |
img.save(filename, format="jpeg", exif=exif_data, quality=95, subsampling=0) | |
return img, runtime, str(opts.seed), filename, None | |
else: | |
return None, str(opts.seed), None, "Your generated image may contain NSFW content." | |
def create_demo(model_name: str, device: str = "cuda", offload: bool = False): | |
generator = FluxGenerator(model_name, device, offload) | |
is_schnell = model_name == "flux-schnell" | |
with open("./assets/banner.html") as f: | |
banner = f.read() | |
with gr.Blocks() as demo: | |
with gr.Column(elem_id="app-container"): | |
gr.HTML(f"""<iframe scrolling="no" style="width: 100%; height: 125px; border: 0" srcdoc='{banner}'>""") | |
gr.Markdown(f"""🚀 [Flux-Flax](https://github.com/lkwq007/flux-flax) is a JAX implementation of Flux models. 1-step time statistics for `FLUX.1-schnell`: `0.4s` for 1024x1024, `0.1s` for 512x512; 2-step: `0.6s` for 1024x1024, `0.2s` for 512x512; 4-step: `2.4s` for 1024x1024, `0.8s` for 512x512. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
output_image = gr.Image(label="Generated Image") | |
warning_text = gr.Textbox(label="Warning", visible=False) | |
download_btn = gr.File(label="Download full-resolution") | |
gr.Markdown(""" | |
💡 Note: More resolutions are supports, but here this demo limits to 1024x1024 and 512x512 to avoid jit recompilation (which takes 130s). Flux-Flax also support `FLUX.1-dev`, 50-step time statistics: `18s` for 1024x1024, `6s` for 512x512""") | |
with gr.Column(scale=1): | |
prompt = gr.Textbox(label="Prompt", value="a photo of a forest with mist swirling around the tree trunks. The word \"FLUX\" is painted over it in big, red brush strokes with visible texture") | |
generate_btn = gr.Button("Generate") | |
with gr.Row(): | |
seed_output = gr.Number(label="Used Seed") | |
runtime = gr.Number(label="Inference Time", precision=3) | |
with gr.Row(): | |
seed = gr.Textbox(-1, label="Seed (-1 for random)") | |
img_size = gr.Radio(["1,024x1,024", "512x512"], label="Image Resolution", value="1,024x1,024") | |
num_steps = gr.Slider(1, 50, 4 if is_schnell else 50, step=1, label="Number of steps") | |
guidance = gr.Slider(1.0, 10.0, 3.5, step=0.1, label="Guidance", interactive=not is_schnell) | |
add_sampling_metadata = gr.Checkbox(label="Add sampling parameters to metadata?", value=True) | |
# guidance = gr.Slider(1.0, 10.0, 3.5, step=0.1, label="Guidance", interactive=not is_schnell, visible=False) | |
# def update_img2img(do_img2img): | |
# return { | |
# init_image: gr.update(visible=do_img2img), | |
# image2image_strength: gr.update(visible=do_img2img), | |
# } | |
# do_img2img.change(update_img2img, do_img2img, [init_image, image2image_strength]) | |
generate_btn.click( | |
fn=generator.generate_image, | |
inputs=[img_size, num_steps, guidance, seed, prompt, add_sampling_metadata], | |
outputs=[output_image, runtime, seed_output, download_btn, warning_text], | |
) | |
return demo | |
# if __name__ == "__main__": | |
# import argparse | |
# parser = argparse.ArgumentParser(description="Flux") | |
# parser.add_argument("--name", type=str, default="flux-schnell", choices=list(configs.keys()), help="Model name") | |
# parser.add_argument("--device", type=str, default="cpu", help="Device to use") | |
# parser.add_argument("--offload", action="store_true", help="Offload model to CPU when not in use") | |
# parser.add_argument("--share", action="store_true", help="Create a public link to your demo") | |
# args = parser.parse_args() | |
demo = create_demo("flux-schnell", None, False) | |
demo.launch() |