Spaces:
Runtime error
Runtime error
import torch | |
import imageio | |
import os | |
import gradio as gr | |
import subprocess | |
from subprocess import getoutput | |
from diffusers.schedulers import EulerAncestralDiscreteScheduler | |
from transformers import T5EncoderModel, T5Tokenizer | |
from allegro.pipelines.pipeline_allegro import AllegroPipeline | |
from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D | |
from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel | |
# from allegro.models.transformers.block import AttnProcessor2_0 | |
from huggingface_hub import snapshot_download | |
# # Override attention processor initialization | |
# AttnProcessor2_0.__init__ = lambda self, *args, **kwargs: super(AttnProcessor2_0, self).__init__() | |
weights_dir = './allegro_weights' | |
os.makedirs(weights_dir, exist_ok=True) | |
is_shared_ui = False | |
is_gpu_associated = torch.cuda.is_available() | |
# Download weights if not present | |
if not os.path.exists(weights_dir): | |
snapshot_download( | |
repo_id='rhymes-ai/Allegro', | |
allow_patterns=[ | |
'scheduler/**', | |
'text_encoder/**', | |
'tokenizer/**', | |
'transformer/**', | |
'vae/**', | |
], | |
local_dir=weights_dir, | |
) | |
def single_inference(user_prompt, save_path, guidance_scale, num_sampling_steps, seed, enable_cpu_offload): | |
dtype = torch.float16 # Changed from torch.bfloat16 | |
# Load models | |
vae = AllegroAutoencoderKL3D.from_pretrained( | |
"./allegro_weights/vae/", | |
torch_dtype=torch.float32 | |
).cuda() | |
vae.eval() | |
text_encoder = T5EncoderModel.from_pretrained("./allegro_weights/text_encoder/", torch_dtype=dtype) | |
text_encoder.eval() | |
tokenizer = T5Tokenizer.from_pretrained("./allegro_weights/tokenizer/") | |
scheduler = EulerAncestralDiscreteScheduler() | |
transformer = AllegroTransformer3DModel.from_pretrained("./allegro_weights/transformer/", torch_dtype=dtype).cuda() | |
transformer.eval() | |
allegro_pipeline = AllegroPipeline( | |
vae=vae, | |
text_encoder=text_encoder, | |
tokenizer=tokenizer, | |
scheduler=scheduler, | |
transformer=transformer | |
).to("cuda:0") | |
positive_prompt = """ | |
(masterpiece), (best quality), (ultra-detailed), (unwatermarked), | |
{} | |
emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo, | |
sharp focus, high budget, cinemascope, moody, epic, gorgeous | |
""" | |
negative_prompt = """ | |
nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality, | |
low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry. | |
""" | |
# Process user prompt | |
user_prompt = positive_prompt.format(user_prompt.lower().strip()) | |
if enable_cpu_offload: | |
allegro_pipeline.enable_sequential_cpu_offload() | |
# Clear memory before generation | |
# torch.cuda.empty_cache() | |
out_video = allegro_pipeline( | |
user_prompt, | |
negative_prompt=negative_prompt, | |
num_frames=88, | |
height=720, | |
width=1280, | |
num_inference_steps=num_sampling_steps, | |
guidance_scale=guidance_scale, | |
max_sequence_length=512, | |
generator=torch.Generator(device="cuda:0").manual_seed(seed) | |
).video[0] | |
# Save video | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
imageio.mimwrite(save_path, out_video, fps=15, quality=8) | |
return save_path | |
# Gradio interface function | |
def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)): | |
save_path = "./output_videos/generated_video.mp4" | |
result_path = single_inference(user_prompt, save_path, guidance_scale, num_sampling_steps, seed, enable_cpu_offload) | |
return result_path | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
with gr.Column(): | |
gr.Markdown("# Allegro Video Generation") | |
gr.Markdown("Generate a video based on a text prompt using the Allegro pipeline.") | |
user_prompt = gr.Textbox(label="User Prompt") | |
with gr.Row(): | |
guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5) | |
num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20) | |
with gr.Row(): | |
seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42) | |
enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=True, scale=1) | |
submit_btn = gr.Button("Generate Video") | |
video_output = gr.Video(label="Generated Video") | |
gr.Examples( | |
examples=[ | |
["A Monkey is playing bass guitar."], | |
["An astronaut riding a horse."], | |
["A tiny finch on a branch with spring flowers on background."] | |
], | |
inputs=[user_prompt], | |
outputs=video_output, | |
fn=lambda x: None, | |
cache_examples=False | |
) | |
submit_btn.click( | |
fn=run_inference, | |
inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload], | |
outputs=video_output | |
) | |
# Launch the interface | |
demo.launch(show_error=True) |