import gradio as gr import moviepy.editor as mpy import numpy as np import os from omegaconf import OmegaConf from tqdm import tqdm import shutil import time from avatar_generator import Avatar # # 指定保存文件的目录 # SAVE_DIR = "./uploaded_files" # os.makedirs(SAVE_DIR, exist_ok=True) # 创建目录(如果不存在) # 全局变量,用于控制任务是否应当终止 should_stop = False # 定义逐帧处理的函数 def process_files(file1, file2): global should_stop should_stop = False # 重置停止标志 yield None, None, None, "Starting Process!" file_path1 = file1.name file_path2 = file2.name pose_data = np.load(file_path1) exp_data = np.load(file_path2) # save pose_path = './test_data/AMASS/online_test_pose_data.npz' exp_path = './test_data/face_exp/online_test_exp_data.npy' np.savez(pose_path, **pose_data) np.save(exp_path, exp_data) # with open(file1.name, 'rb') as fsrc: # with open(file_path1, 'wb') as fdst: # shutil.copyfileobj(fsrc, fdst) # with open(file2.name, 'rb') as fsrc: # with open(file_path2, 'wb') as fdst: # shutil.copyfileobj(fsrc, fdst) conf = OmegaConf.load('configs/example.yaml') avatar = Avatar(conf) avatar.build_dataset(pose_path, exp_path) lenth = min(len(avatar.body_dataset), len(avatar.head_dataloader),20) output_frames = [] start_time = time.time() for idx in tqdm(range(lenth)): if should_stop: yield None, None, None, None break # 任务应当终止时跳出循环 frame = avatar.reder_frame(idx) # rgb2bgr frame = frame[..., ::-1] output_frames.append(frame) elapsed_time = time.time() - start_time estimated_total_time = (elapsed_time / (idx + 1)) * lenth remaining_time = estimated_total_time - elapsed_time yield frame, None, (idx + 1) / lenth * 100, f"{elapsed_time:.2f} sec/{estimated_total_time:.2f} sec" if not should_stop: output_path = "./output/output_video.mp4" final_video = mpy.ImageSequenceClip(output_frames, fps=25) final_video.write_videofile(output_path, codec='libx264') yield output_frames[-1], output_path, 100.0, "Processing completed!" # 清除操作 def clear_files(): global should_stop should_stop = True # 设置停止标志 # 返回空值以清空界面元素 return None, None, None, None, None, None # 创建 Gradio 接口 with gr.Blocks(css=""" .equal-height { height: 425px; /* 设置为你希望的高度 */ display: flex; flex-direction: column; justify-content: center; align-items: center; } .equal-height input { height: 100%; /* 输入框占满整个容器高度 */ } .output-container { height: 400px; /* 输出框的高度 */ } .custom-text { height: 80px; /* 输出框的高度 */ } """) as demo: with gr.Row(): # 左侧列,用于放置文件输入 with gr.Column(scale=1): with gr.Row(elem_classes="equal-height"): file_input1 = gr.File(label="Upload File (Body Pose)") file_input2 = gr.File(label="Upload File (Face EXP)") with gr.Column(scale=2): with gr.Row(): # 中间列,用于放置帧输出 with gr.Column(scale=1): frame_output = gr.Image(label="Current Frame Output", elem_classes="output-container") # 输出当前帧图像 # 右侧列,用于放置视频输出 with gr.Column(scale=1): video_output = gr.Video(label="Processed Video Output", elem_classes="output-container") # 输出视频 # progress_bar = gr.Label(label="Progress") with gr.Row(): with gr.Column(scale=2): progress_bar = gr.Slider(visible=True, minimum=0, maximum=100, step=1, label="Progress %",elem_classes="custom-text") # 使用Slider模拟进度条 with gr.Column(scale=1): output_time = gr.Textbox(label='Processing Time/Estimate Time', elem_classes="custom-text") # time_label = gr.Label(value="", label="Estimated Time Remaining", elem_classes="custom-label") # with gr.Row(): # progress_bar = gr.Progress() # 添加进度条 with gr.Row(): process_button = gr.Button("Start Processing Files") clear_button = gr.Button("Clear or Stop Processing") # 定义按钮的功能 process_button.click( fn=process_files, inputs=[file_input1, file_input2], outputs=[frame_output, video_output, progress_bar, output_time], show_progress=False ) clear_button.click( fn= clear_files, inputs=[], outputs=[file_input1, file_input2, frame_output, video_output, progress_bar, output_time] ) # 启动应用 demo.launch()