TANGO / datasets /process_testdata.py
H-Liu1997's picture
init
31f2f28
raw
history blame
No virus
10.7 kB
# import smplx
# import torch
# import pickle
# import numpy as np
# # Global: Load the SMPL-X model once
# smplx_model = smplx.create(
# "/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/emage/smplx_models/",
# model_type='smplx',
# gender='NEUTRAL_2020',
# use_face_contour=False,
# num_betas=300,
# num_expression_coeffs=100,
# ext='npz',
# use_pca=True,
# num_pca_comps=12,
# ).to("cuda").eval()
# device = "cuda"
# def pkl_to_npz(pkl_path, npz_path):
# # Load the pickle file
# with open(pkl_path, "rb") as f:
# pkl_example = pickle.load(f)
# bs = 1
# n = pkl_example["expression"].shape[0] # Assuming this is the batch size
# # Convert numpy arrays to torch tensors
# def to_tensor(numpy_array):
# return torch.tensor(numpy_array, dtype=torch.float32).to(device)
# # Ensure that betas are loaded from the pickle data, converting them to torch tensors
# betas = to_tensor(pkl_example["betas"])
# transl = to_tensor(pkl_example["transl"])
# expression = to_tensor(pkl_example["expression"])
# jaw_pose = to_tensor(pkl_example["jaw_pose"])
# global_orient = to_tensor(pkl_example["global_orient"])
# body_pose_axis = to_tensor(pkl_example["body_pose_axis"])
# left_hand_pose = to_tensor(pkl_example['left_hand_pose'])
# right_hand_pose = to_tensor(pkl_example['right_hand_pose'])
# leye_pose = to_tensor(pkl_example['leye_pose'])
# reye_pose = to_tensor(pkl_example['reye_pose'])
# # Pass the loaded data into the SMPL-X model
# gt_vertex = smplx_model(
# betas=betas,
# transl=transl, # Translation
# expression=expression, # Expression
# jaw_pose=jaw_pose, # Jaw pose
# global_orient=global_orient, # Global orientation
# body_pose=body_pose_axis, # Body pose
# left_hand_pose=left_hand_pose, # Left hand pose
# right_hand_pose=right_hand_pose, # Right hand pose
# return_full_pose=True,
# leye_pose=leye_pose, # Left eye pose
# reye_pose=reye_pose, # Right eye pose
# )
# # Save the relevant data to an npz file
# np.savez(npz_path,
# betas=pkl_example["betas"],
# poses=gt_vertex["full_pose"].cpu().numpy(),
# expressions=pkl_example["expression"],
# trans=pkl_example["transl"],
# model='smplx2020',
# gender='neutral',
# mocap_frame_rate=30,
# )
# from tqdm import tqdm
# import os
# def convert_all_pkl_in_folder(folder_path):
# # Collect all .pkl files
# pkl_files = []
# for root, dirs, files in os.walk(folder_path):
# for file in files:
# if file.endswith(".pkl"):
# pkl_files.append(os.path.join(root, file))
# # Process each file with a progress bar
# for pkl_path in tqdm(pkl_files, desc="Converting .pkl to .npz"):
# npz_path = pkl_path.replace(".pkl", ".npz") # Replace .pkl with .npz
# pkl_to_npz(pkl_path, npz_path)
# convert_all_pkl_in_folder("/content/oliver/oliver/")
# import os
# import json
# def collect_dataset_info(root_dir):
# dataset_info = []
# for root, dirs, files in os.walk(root_dir):
# for file in files:
# if file.endswith(".npz"):
# video_id = file[:-4] # Removing the .npz extension to get the video ID
# # Construct the paths based on the current root directory
# motion_path = os.path.join(root)
# video_path = os.path.join(root)
# audio_path = os.path.join(root)
# # Determine the mode (train, val, test) by checking parent directory
# mode = root.split(os.sep)[-2] # Assuming mode is one folder up in hierarchy
# dataset_info.append({
# "video_id": video_id,
# "video_path": video_path,
# "audio_path": audio_path,
# "motion_path": motion_path,
# "mode": mode
# })
# return dataset_info
# # Set the root directory path of your dataset
# root_dir = '/content/oliver/oliver/' # Adjust this to your actual root directory
# dataset_info = collect_dataset_info(root_dir)
# output_file = '/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/datasets/show-oliver-original.json'
# # Save the dataset information to a JSON file
# with open(output_file, 'w') as json_file:
# json.dump(dataset_info, json_file, indent=4)
# print(f"Dataset information saved to {output_file}")
# import os
# import json
# import numpy as np
# def load_npz(npz_path):
# try:
# data = np.load(npz_path)
# return data
# except Exception as e:
# print(f"Error loading {npz_path}: {e}")
# return None
# def generate_clips(data, stride, window_length):
# clips = []
# for entry in data:
# npz_data = load_npz(os.path.join(entry['motion_path'],entry['video_id']+".npz"))
# # Only continue if the npz file is successfully loaded
# if npz_data is None:
# continue
# # Determine the total length of the sequence from npz data
# total_frames = npz_data["poses"].shape[0]
# # Generate clips based on stride and window_length
# for start_idx in range(0, total_frames - window_length + 1, stride):
# end_idx = start_idx + window_length
# clip = {
# "video_id": entry["video_id"],
# "video_path": entry["video_path"],
# "audio_path": entry["audio_path"],
# "motion_path": entry["motion_path"],
# "mode": entry["mode"],
# "start_idx": start_idx,
# "end_idx": end_idx
# }
# clips.append(clip)
# return clips
# # Load the existing dataset JSON file
# input_json = '/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/datasets/show-oliver-original.json'
# with open(input_json, 'r') as f:
# dataset_info = json.load(f)
# # Set stride and window length
# stride = 40 # Adjust stride as needed
# window_length = 64 # Adjust window length as needed
# # Generate clips for all data
# clips_data = generate_clips(dataset_info, stride, window_length)
# # Save the filtered clips data to a new JSON file
# output_json = f'/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/datasets/show-oliver-s{stride}_w{window_length}.json'
# with open(output_json, 'w') as f:
# json.dump(clips_data, f, indent=4)
# print(f"Filtered clips data saved to {output_json}")
from ast import Expression
import os
import numpy as np
import wave
from moviepy.editor import VideoFileClip
def split_npz(npz_path, output_prefix):
try:
# Load the npz file
data = np.load(npz_path)
# Get the arrays and split them along the time dimension (T)
poses = data["poses"]
betas = data["betas"]
expressions = data["expressions"]
trans = data["trans"]
# Determine the halfway point (T/2)
half = poses.shape[0] // 2
# Save the first half (0-5 seconds)
np.savez(output_prefix + "_0_5.npz",
betas=betas[:half],
poses=poses[:half],
expressions=expressions[:half],
trans=trans[:half],
model=data['model'],
gender=data['gender'],
mocap_frame_rate=data['mocap_frame_rate'])
# Save the second half (5-10 seconds)
np.savez(output_prefix + "_5_10.npz",
betas=betas[half:],
poses=poses[half:],
expressions=expressions[half:],
trans=trans[half:],
model=data['model'],
gender=data['gender'],
mocap_frame_rate=data['mocap_frame_rate'])
print(f"NPZ split saved for {output_prefix}")
except Exception as e:
print(f"Error processing NPZ file {npz_path}: {e}")
def split_wav(wav_path, output_prefix):
try:
with wave.open(wav_path, 'rb') as wav_file:
params = wav_file.getparams()
frames = wav_file.readframes(wav_file.getnframes())
half_frame = len(frames) // 2
# Create two half files
for i, start_frame in enumerate([0, half_frame]):
with wave.open(f"{output_prefix}_{i*5}_{(i+1)*5}.wav", 'wb') as out_wav:
out_wav.setparams(params)
if i == 0:
out_wav.writeframes(frames[:half_frame])
else:
out_wav.writeframes(frames[half_frame:])
print(f"WAV split saved for {output_prefix}")
except Exception as e:
print(f"Error processing WAV file {wav_path}: {e}")
def split_mp4(mp4_path, output_prefix):
try:
clip = VideoFileClip(mp4_path)
for i in range(2):
subclip = clip.subclip(i*5, (i+1)*5)
subclip.write_videofile(f"{output_prefix}_{i*5}_{(i+1)*5}.mp4", codec="libx264", audio_codec="aac")
print(f"MP4 split saved for {output_prefix}")
except Exception as e:
print(f"Error processing MP4 file {mp4_path}: {e}")
def process_files(root_dir, output_dir):
import json
clips = []
dirs = os.listdir(root_dir)
for dir in dirs:
video_id = dir
output_prefix = os.path.join(output_dir, video_id)
root = os.path.join(root_dir, dir)
npz_path = os.path.join(root, video_id + ".npz")
wav_path = os.path.join(root, video_id + ".wav")
mp4_path = os.path.join(root, video_id + ".mp4")
# split_npz(npz_path, output_prefix)
# split_wav(wav_path, output_prefix)
# split_mp4(mp4_path, output_prefix)
clip = {
"video_id": video_id,
"video_path": root,
"audio_path": root,
"motion_path": root,
"mode": "test",
"start_idx": 0,
"end_idx": 150
}
clips.append(clip)
output_json = output_dir + "/test.json"
with open(output_json, 'w') as f:
json.dump(clips, f, indent=4)
# Set the root directory path of your dataset and output directory
root_dir = '/content/oliver/oliver/Abortion_Laws_-_Last_Week_Tonight_with_John_Oliver_HBO-DRauXXz6t0Y.webm/test/'
output_dir = '/content/test'
# Make sure the output directory exists
os.makedirs(output_dir, exist_ok=True)
# Process all the files
process_files(root_dir, output_dir)