Spaces:
Running
Running
import gradio as gr | |
import moviepy.editor as mpy | |
import numpy as np | |
import os | |
from omegaconf import OmegaConf | |
from tqdm import tqdm | |
import shutil | |
import time | |
from avatar_generator import Avatar | |
# # 指定保存文件的目录 | |
# SAVE_DIR = "./uploaded_files" | |
# os.makedirs(SAVE_DIR, exist_ok=True) # 创建目录(如果不存在) | |
# 全局变量,用于控制任务是否应当终止 | |
should_stop = False | |
# 定义逐帧处理的函数 | |
def process_files(file1, file2): | |
global should_stop | |
should_stop = False # 重置停止标志 | |
yield None, None, None, "Starting Process!" | |
file_path1 = file1.name | |
file_path2 = file2.name | |
pose_data = np.load(file_path1) | |
exp_data = np.load(file_path2) | |
# save | |
pose_path = './test_data/AMASS/online_test_pose_data.npz' | |
exp_path = './test_data/face_exp/online_test_exp_data.npy' | |
np.savez(pose_path, **pose_data) | |
np.save(exp_path, exp_data) | |
# with open(file1.name, 'rb') as fsrc: | |
# with open(file_path1, 'wb') as fdst: | |
# shutil.copyfileobj(fsrc, fdst) | |
# with open(file2.name, 'rb') as fsrc: | |
# with open(file_path2, 'wb') as fdst: | |
# shutil.copyfileobj(fsrc, fdst) | |
conf = OmegaConf.load('configs/example.yaml') | |
avatar = Avatar(conf) | |
avatar.build_dataset(pose_path, exp_path) | |
lenth = min(len(avatar.body_dataset), len(avatar.head_dataloader),20) | |
output_frames = [] | |
start_time = time.time() | |
for idx in tqdm(range(lenth)): | |
if should_stop: | |
yield None, None, None, None | |
break # 任务应当终止时跳出循环 | |
frame = avatar.reder_frame(idx) | |
# rgb2bgr | |
frame = frame[..., ::-1] | |
output_frames.append(frame) | |
elapsed_time = time.time() - start_time | |
estimated_total_time = (elapsed_time / (idx + 1)) * lenth | |
remaining_time = estimated_total_time - elapsed_time | |
yield frame, None, (idx + 1) / lenth * 100, f"{elapsed_time:.2f} sec/{estimated_total_time:.2f} sec" | |
if not should_stop: | |
output_path = "./output/output_video.mp4" | |
final_video = mpy.ImageSequenceClip(output_frames, fps=25) | |
final_video.write_videofile(output_path, codec='libx264') | |
yield output_frames[-1], output_path, 100.0, "Processing completed!" | |
# 清除操作 | |
def clear_files(): | |
global should_stop | |
should_stop = True # 设置停止标志 | |
# 返回空值以清空界面元素 | |
return None, None, None, None, None, None | |
# 创建 Gradio 接口 | |
with gr.Blocks(css=""" | |
.equal-height { | |
height: 425px; /* 设置为你希望的高度 */ | |
display: flex; | |
flex-direction: column; | |
justify-content: center; | |
align-items: center; | |
} | |
.equal-height input { | |
height: 100%; /* 输入框占满整个容器高度 */ | |
} | |
.output-container { | |
height: 400px; /* 输出框的高度 */ | |
} | |
.custom-text { | |
height: 80px; /* 输出框的高度 */ | |
} | |
""") as demo: | |
with gr.Row(): | |
# 左侧列,用于放置文件输入 | |
with gr.Column(scale=1): | |
with gr.Row(elem_classes="equal-height"): | |
file_input1 = gr.File(label="Upload File (Body Pose)") | |
file_input2 = gr.File(label="Upload File (Face EXP)") | |
with gr.Column(scale=2): | |
with gr.Row(): | |
# 中间列,用于放置帧输出 | |
with gr.Column(scale=1): | |
frame_output = gr.Image(label="Current Frame Output", elem_classes="output-container") # 输出当前帧图像 | |
# 右侧列,用于放置视频输出 | |
with gr.Column(scale=1): | |
video_output = gr.Video(label="Processed Video Output", elem_classes="output-container") # 输出视频 | |
# progress_bar = gr.Label(label="Progress") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
progress_bar = gr.Slider(visible=True, minimum=0, maximum=100, step=1, label="Progress %",elem_classes="custom-text") # 使用Slider模拟进度条 | |
with gr.Column(scale=1): | |
output_time = gr.Textbox(label='Processing Time/Estimate Time', elem_classes="custom-text") | |
# time_label = gr.Label(value="", label="Estimated Time Remaining", elem_classes="custom-label") | |
# with gr.Row(): | |
# progress_bar = gr.Progress() # 添加进度条 | |
with gr.Row(): | |
process_button = gr.Button("Start Processing Files") | |
clear_button = gr.Button("Clear or Stop Processing") | |
# 定义按钮的功能 | |
process_button.click( | |
fn=process_files, | |
inputs=[file_input1, file_input2], | |
outputs=[frame_output, video_output, progress_bar, output_time], | |
show_progress=False | |
) | |
clear_button.click( | |
fn= clear_files, | |
inputs=[], | |
outputs=[file_input1, file_input2, frame_output, video_output, progress_bar, output_time] | |
) | |
# 启动应用 | |
demo.launch() |