File size: 5,251 Bytes
fff8451
 
 
 
cb5166a
 
abe2421
fff8451
 
 
 
 
abe2421
fff8451
b477ed7
fff8451
abe2421
 
 
fff8451
 
 
abe2421
071420e
 
abe2421
 
071420e
 
 
 
 
 
 
 
 
 
 
fff8451
 
abe2421
fff8451
 
 
0068a8e
fff8451
 
 
 
0068a8e
fff8451
 
0068a8e
fff8451
 
 
0068a8e
fff8451
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
abe2421
 
 
fff8451
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98b60eb
fff8451
 
 
 
 
abe2421
 
071420e
 
abe2421
 
071420e
abe2421
 
9070608
abe2421
 
 
 
 
d63ddc4
 
 
 
 
 
 
 
 
abe2421
 
d63ddc4
 
da2b6f6
 
 
 
 
fff8451
 
abe2421
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import torch
import imageio
import os
import gradio as gr
import subprocess
from subprocess import getoutput

from diffusers.schedulers import EulerAncestralDiscreteScheduler
from transformers import T5EncoderModel, T5Tokenizer
from allegro.pipelines.pipeline_allegro import AllegroPipeline
from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D
from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel
# from allegro.models.transformers.block import AttnProcessor2_0

from huggingface_hub import snapshot_download

# # Override attention processor initialization
# AttnProcessor2_0.__init__ = lambda self, *args, **kwargs: super(AttnProcessor2_0, self).__init__()

weights_dir = './allegro_weights'
os.makedirs(weights_dir, exist_ok=True)

is_shared_ui = False
is_gpu_associated = torch.cuda.is_available()

# Download weights if not present
if not os.path.exists(weights_dir):
    snapshot_download(
        repo_id='rhymes-ai/Allegro',
        allow_patterns=[
            'scheduler/**',
            'text_encoder/**',
            'tokenizer/**',
            'transformer/**',
            'vae/**',
        ],
        local_dir=weights_dir,
    )

def single_inference(user_prompt, save_path, guidance_scale, num_sampling_steps, seed, enable_cpu_offload):
    dtype = torch.float16  # Changed from torch.bfloat16

    # Load models
    vae = AllegroAutoencoderKL3D.from_pretrained(
        "./allegro_weights/vae/", 
        torch_dtype=torch.float32
    ).cuda()
    vae.eval()

    text_encoder = T5EncoderModel.from_pretrained("./allegro_weights/text_encoder/", torch_dtype=dtype)
    text_encoder.eval()

    tokenizer = T5Tokenizer.from_pretrained("./allegro_weights/tokenizer/")

    scheduler = EulerAncestralDiscreteScheduler()

    transformer = AllegroTransformer3DModel.from_pretrained("./allegro_weights/transformer/", torch_dtype=dtype).cuda()
    transformer.eval()

    allegro_pipeline = AllegroPipeline(
        vae=vae,
        text_encoder=text_encoder,
        tokenizer=tokenizer,
        scheduler=scheduler,
        transformer=transformer
    ).to("cuda:0")

    positive_prompt = """
    (masterpiece), (best quality), (ultra-detailed), (unwatermarked), 
    {} 
    emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo, 
    sharp focus, high budget, cinemascope, moody, epic, gorgeous
    """

    negative_prompt = """
    nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality, 
    low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry.
    """

    # Process user prompt
    user_prompt = positive_prompt.format(user_prompt.lower().strip())

    if enable_cpu_offload:
        allegro_pipeline.enable_sequential_cpu_offload()

    # Clear memory before generation
    # torch.cuda.empty_cache()

    out_video = allegro_pipeline(
        user_prompt, 
        negative_prompt=negative_prompt, 
        num_frames=88,
        height=720,
        width=1280,
        num_inference_steps=num_sampling_steps,
        guidance_scale=guidance_scale,
        max_sequence_length=512,
        generator=torch.Generator(device="cuda:0").manual_seed(seed)
    ).video[0]

    # Save video
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    imageio.mimwrite(save_path, out_video, fps=15, quality=8)

    return save_path


# Gradio interface function
def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)):
    save_path = "./output_videos/generated_video.mp4"
    result_path = single_inference(user_prompt, save_path, guidance_scale, num_sampling_steps, seed, enable_cpu_offload)
    return result_path

# Create Gradio interface
with gr.Blocks() as demo:
    with gr.Column():
        gr.Markdown("# Allegro Video Generation")
        gr.Markdown("Generate a video based on a text prompt using the Allegro pipeline.")
        
        user_prompt = gr.Textbox(label="User Prompt")
        with gr.Row():
            guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5)
            num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20)
        with gr.Row():
            seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42)
            enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=True, scale=1)
            
        submit_btn = gr.Button("Generate Video")
        video_output = gr.Video(label="Generated Video")

        gr.Examples(
            examples=[
                ["A Monkey is playing bass guitar."],
                ["An astronaut riding a horse."],
                ["A tiny finch on a branch with spring flowers on background."]
            ],
            inputs=[user_prompt],
            outputs=video_output,
            fn=lambda x: None,
            cache_examples=False
        )

    submit_btn.click(
        fn=run_inference,
        inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload],
        outputs=video_output
    )

# Launch the interface
demo.launch(show_error=True)