import os import time import gradio as gr import threading from myconfigs import * from oss_utils import ossService from gen_client import * myHumanGen = HumanGenService() oss_service = ossService() # enable oss resources, if disabled use example mp4 resources from local directory. ENABLE_OSS_RESOURCES = True TEST_ENABLED = os.getenv('TEST_ENABLED', 'False') result_video_oss_url = {} # user_id, (list[signed_url), list[oss_path])] def tab_func_template(): prompt = "" return prompt, 'template_mode' def tab_func_prompt(): ref_video_path = None return ref_video_path, 'prompt_mode' with open('script.txt', encoding="utf-8") as f: script_text_to_load_results = f.read() # def dataset_func(evt: gr.SelectData): # ref_video_path = evt.value[0] # print("[dataset_func] ref_video_path: ", ref_video_path) # # input_prompt = 'haha' # if not os.path.exists(ref_video_path): # raise gr.Error(f"The input video {ref_video_path} is not existed!") # video_file_name = os.path.basename(ref_video_path) # __, input_prompt = myHumanGen.template_video_2_prompt(video_file_name) # return ref_video_path,input_prompt def video_2_prompt_func(ref_video_path): print("[dataset_func] ref_video_path: %s" % ref_video_path) if not os.path.exists(ref_video_path): raise gr.Error(f"The input video {ref_video_path} is not existed!") video_file_name = os.path.basename(ref_video_path) print("[dataset_func] video_file_name: %s" % video_file_name) __, input_prompt = myHumanGen.template_video_2_prompt(video_file_name) print("[dataset_func] input_prompt: %s" % input_prompt) return ref_video_path,input_prompt def get_user_result_video_list(uuid, date_string, num): directory='video_generation/Service/'+date_string+'/'+uuid+'/' for obj in oss2.ObjectIterator(oss_service.bucket, prefix=directory, delimiter='/'): print(f"目录存在:{directory}") break else: print(f"目录不存在:{directory}") return [],[] no_check_video_list = [] no_check_timer_list = [] for obj in oss2.ObjectIterator(oss_service.bucket, prefix=directory, delimiter = '/'): if obj.is_prefix(): # 判断obj为文件夹。 file_full_path = obj.key+'result.mp4' exist = oss_service.bucket.object_exists(file_full_path) if not exist: print(f'file_full_path is not existed') tmp_directory=obj.key print(f'tmp_directory = {tmp_directory}') for obj_xxx in oss2.ObjectIterator(oss_service.bucket, prefix=tmp_directory, delimiter = '/'): print(f'obj_xxx.key = {obj_xxx.key}') if obj_xxx.is_prefix(): # 判断obj为文件夹 pass else: import re pattern = r"dreamoving-.*-result\.mp4" # Extract the MP4 file name file_name_xxx = os.path.basename(obj_xxx.key) match = re.search(pattern, obj_xxx.key) if match and len(match.group()) == len(file_name_xxx): file_full_path = obj_xxx.key print(f'successfully match file_full_path: {file_full_path}') exist = True break else: pass if exist: object_meta = oss_service.bucket.head_object(file_full_path) bytes_num = object_meta.headers.get('Content-Length') # bytes # print(f"Object Size: {bytes_num} bytes") mb_num = float(bytes_num) / (1000 ** 2) # MB # print(f"Object Size: {mb_num} MB") if mb_num > 0.1: # 大于100KB last_modified = object_meta.headers.get('Last-Modified') # print(f"Last Modified: {last_modified}") from email.utils import parsedate_to_datetime # 将 HTTP-date 格式字符串转换为 datetime 对象 last_modified_datetime = parsedate_to_datetime(last_modified) # 将 datetime 对象转换为 Unix 时间戳(整数秒)-自1970年1月1日UTC以来的秒数 last_modified_timestamp = int(last_modified_datetime.timestamp()) # print(f"Last Modified (timestamp): {last_modified_timestamp}") no_check_video_list.append(file_full_path) no_check_timer_list.append(last_modified_timestamp) else: print(f'文件太小: {file_full_path}') else: # 判断obj为文件。 print(f'这不是文件夹: {obj.key}') # last_modified = obj.last_modified # 文件的最后修改时间 # print(f"File: {obj.key}, Last Modified: {last_modified}") valid_video_list = [] valid_image_list = [] if len(no_check_video_list) > 0: if len(no_check_video_list) > 1: # sort by time zipped_lists = zip(no_check_timer_list, no_check_video_list) # 合并为元组 sorted_pairs = sorted(zipped_lists, key=lambda x: x[0], reverse=True) # 从大到小 list1_sorted, list2_sorted = zip(*sorted_pairs) no_check_timer_list = list(list1_sorted) no_check_video_list = list(list2_sorted) for file_full_path in no_check_video_list: oss_video_path = "oss://vigen-invi/" + file_full_path print(f'生成的视频: {oss_video_path}') _, video_url = oss_service.sign(oss_video_path, timeout=3600*100) valid_video_list.append(video_url) style = "video/snapshot,t_1000,f_jpg,w_544,h_768,m_fast" params1 = {'x-oss-process': style} _, snapshot_image = oss_service.sign(oss_video_path, timeout=3600*100, params=params1) valid_image_list.append(snapshot_image) if len(valid_video_list) >= num: break return valid_video_list, valid_image_list def refresh_video(uuid, request_id): notes, process_status = myHumanGen.get_ranking_location(uuid) if is_wanx_platform: uuid = 'wanx_lab' if uuid is None or uuid == '': uuid = 'test_version_phone' print(f'[refresh_video] uuid: {uuid}') print(f'[refresh_video] request_id: {request_id}') new_list = [] new_image_list = [] if process_status == 'runing': print(f'process_status: {process_status}') # video_oss_path = "oss://vigen-invi/video_generation/logo/runing.mp4" # _, video_url = oss_service.sign(video_oss_path, timeout=3600*100) # new_list.append(video_url) # img_oss_path = "oss://vigen-invi/video_generation/logo/runing.png" # _, img_url = oss_service.sign(img_oss_path, timeout=3600*100) # new_image_list.append(img_url) new_list.append(None) new_image_list.append(None) date_string = datetime.datetime.now().strftime('%Y-%m-%d') valid_video_list, valid_image_list = get_user_result_video_list(uuid, date_string, 3) new_list = new_list + valid_video_list new_image_list = new_image_list + valid_image_list if len(new_list) < 4: date_string_yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') valid_video_list1, valid_image_list1 = get_user_result_video_list(uuid, date_string_yesterday, 4-len(new_list)) new_list = new_list + valid_video_list1 new_image_list = new_image_list + valid_image_list1 if len(new_list) < 4: date_string_bf_yesterday = (datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') valid_video_list2, valid_image_list2 = get_user_result_video_list(uuid, date_string_bf_yesterday, 4-len(new_list)) new_list = new_list + valid_video_list2 new_image_list = new_image_list + valid_image_list2 if len(new_list) < 4: for i in range(4-len(new_list)): new_list.append(None) new_image_list.append(None) else: date_string = datetime.datetime.now().strftime('%Y-%m-%d') valid_video_list, valid_image_list = get_user_result_video_list(uuid, date_string, 4) new_list = valid_video_list new_image_list = valid_image_list if len(new_list) < 4: date_string_yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') valid_video_list1, valid_image_list1 = get_user_result_video_list(uuid, date_string_yesterday, 4-len(new_list)) new_list = new_list + valid_video_list1 new_image_list = new_image_list + valid_image_list1 if len(new_list) < 4: date_string_bf_yesterday = (datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') valid_video_list2, valid_image_list2 = get_user_result_video_list(uuid, date_string_bf_yesterday, 4-len(new_list)) new_list = new_list + valid_video_list2 new_image_list = new_image_list + valid_image_list2 if len(new_list) < 4: for i in range(4-len(new_list)): new_list.append(None) new_image_list.append(None) return notes, new_list[0], new_list[1], new_list[2], new_list[3], new_image_list[0], new_image_list[1], new_image_list[2], new_image_list[3] with gr.Blocks(title = "追影", css='style.css', theme=gr.themes.Soft( radius_size=gr.themes.sizes.radius_sm, text_size=gr.themes.sizes.text_md ) ) as demo: with gr.Row(): gr.HTML(f"""