Spaces:
Running
on
Zero
Running
on
Zero
import torch | |
from torch.utils import data | |
import numpy as np | |
from os.path import join as pjoin | |
import random | |
import codecs as cs | |
from tqdm.auto import tqdm | |
from utils.word_vectorizer import WordVectorizer, POS_enumerator | |
from utils.motion_process import recover_from_ric | |
class Text2MotionDataset(data.Dataset): | |
""" | |
Dataset for Text2Motion generation task. | |
""" | |
data_root = "" | |
min_motion_len = 40 | |
joints_num = None | |
dim_pose = None | |
max_motion_length = 196 | |
def __init__(self, opt, split, mode="train", accelerator=None): | |
self.max_text_len = getattr(opt, "max_text_len", 20) | |
self.unit_length = getattr(opt, "unit_length", 4) | |
self.mode = mode | |
motion_dir = pjoin(self.data_root, "new_joint_vecs") | |
text_dir = pjoin(self.data_root, "texts") | |
if mode not in ["train", "eval", "gt_eval", "xyz_gt", "hml_gt"]: | |
raise ValueError( | |
f"Mode '{mode}' is not supported. Please use one of: 'train', 'eval', 'gt_eval', 'xyz_gt','hml_gt'." | |
) | |
mean, std = None, None | |
if mode == "gt_eval": | |
print(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy")) | |
# used by T2M models (including evaluators) | |
mean = np.load(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_mean.npy")) | |
std = np.load(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy")) | |
elif mode in ["eval"]: | |
print(pjoin(opt.meta_dir, "std.npy")) | |
# used by our models during inference | |
mean = np.load(pjoin(opt.meta_dir, "mean.npy")) | |
std = np.load(pjoin(opt.meta_dir, "std.npy")) | |
else: | |
# used by our models during train | |
mean = np.load(pjoin(self.data_root, "Mean.npy")) | |
std = np.load(pjoin(self.data_root, "Std.npy")) | |
if mode == "eval": | |
# used by T2M models (including evaluators) | |
# this is to translate ours norms to theirs | |
self.mean_for_eval = np.load( | |
pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_mean.npy") | |
) | |
self.std_for_eval = np.load( | |
pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy") | |
) | |
if mode in ["gt_eval", "eval"]: | |
self.w_vectorizer = WordVectorizer(opt.glove_dir, "our_vab") | |
data_dict = {} | |
id_list = [] | |
split_file = pjoin(self.data_root, f"{split}.txt") | |
with cs.open(split_file, "r") as f: | |
for line in f.readlines(): | |
id_list.append(line.strip()) | |
if opt.debug == True: | |
id_list = id_list[:1000] | |
new_name_list = [] | |
length_list = [] | |
for name in tqdm( | |
id_list, | |
disable=( | |
not accelerator.is_local_main_process | |
if accelerator is not None | |
else False | |
), | |
): | |
motion = np.load(pjoin(motion_dir, name + ".npy")) | |
if (len(motion)) < self.min_motion_len or (len(motion) >= 200): | |
continue | |
text_data = [] | |
flag = False | |
with cs.open(pjoin(text_dir, name + ".txt")) as f: | |
for line in f.readlines(): | |
text_dict = {} | |
line_split = line.strip().split("#") | |
caption = line_split[0] | |
try: | |
tokens = line_split[1].split(" ") | |
f_tag = float(line_split[2]) | |
to_tag = float(line_split[3]) | |
f_tag = 0.0 if np.isnan(f_tag) else f_tag | |
to_tag = 0.0 if np.isnan(to_tag) else to_tag | |
except: | |
tokens = ["a/NUM", "a/NUM"] | |
f_tag = 0.0 | |
to_tag = 8.0 | |
text_dict["caption"] = caption | |
text_dict["tokens"] = tokens | |
if f_tag == 0.0 and to_tag == 0.0: | |
flag = True | |
text_data.append(text_dict) | |
else: | |
n_motion = motion[int(f_tag * 20) : int(to_tag * 20)] | |
if (len(n_motion)) < self.min_motion_len or ( | |
len(n_motion) >= 200 | |
): | |
continue | |
new_name = random.choice("ABCDEFGHIJKLMNOPQRSTUVW") + "_" + name | |
while new_name in data_dict: | |
new_name = ( | |
random.choice("ABCDEFGHIJKLMNOPQRSTUVW") + "_" + name | |
) | |
data_dict[new_name] = { | |
"motion": n_motion, | |
"length": len(n_motion), | |
"text": [text_dict], | |
} | |
new_name_list.append(new_name) | |
length_list.append(len(n_motion)) | |
if flag: | |
data_dict[name] = { | |
"motion": motion, | |
"length": len(motion), | |
"text": text_data, | |
} | |
new_name_list.append(name) | |
length_list.append(len(motion)) | |
name_list, length_list = zip( | |
*sorted(zip(new_name_list, length_list), key=lambda x: x[1]) | |
) | |
if mode == "train": | |
if opt.dataset_name != "amass": | |
joints_num = self.joints_num | |
# root_rot_velocity (B, seq_len, 1) | |
std[0:1] = std[0:1] / opt.feat_bias | |
# root_linear_velocity (B, seq_len, 2) | |
std[1:3] = std[1:3] / opt.feat_bias | |
# root_y (B, seq_len, 1) | |
std[3:4] = std[3:4] / opt.feat_bias | |
# ric_data (B, seq_len, (joint_num - 1)*3) | |
std[4 : 4 + (joints_num - 1) * 3] = ( | |
std[4 : 4 + (joints_num - 1) * 3] / 1.0 | |
) | |
# rot_data (B, seq_len, (joint_num - 1)*6) | |
std[4 + (joints_num - 1) * 3 : 4 + (joints_num - 1) * 9] = ( | |
std[4 + (joints_num - 1) * 3 : 4 + (joints_num - 1) * 9] / 1.0 | |
) | |
# local_velocity (B, seq_len, joint_num*3) | |
std[ | |
4 + (joints_num - 1) * 9 : 4 + (joints_num - 1) * 9 + joints_num * 3 | |
] = ( | |
std[ | |
4 | |
+ (joints_num - 1) * 9 : 4 | |
+ (joints_num - 1) * 9 | |
+ joints_num * 3 | |
] | |
/ 1.0 | |
) | |
# foot contact (B, seq_len, 4) | |
std[4 + (joints_num - 1) * 9 + joints_num * 3 :] = ( | |
std[4 + (joints_num - 1) * 9 + joints_num * 3 :] / opt.feat_bias | |
) | |
assert 4 + (joints_num - 1) * 9 + joints_num * 3 + 4 == mean.shape[-1] | |
if accelerator is not None and accelerator.is_main_process: | |
np.save(pjoin(opt.meta_dir, "mean.npy"), mean) | |
np.save(pjoin(opt.meta_dir, "std.npy"), std) | |
self.mean = mean | |
self.std = std | |
self.data_dict = data_dict | |
self.name_list = name_list | |
def inv_transform(self, data): | |
return data * self.std + self.mean | |
def __len__(self): | |
return len(self.data_dict) | |
def __getitem__(self, idx): | |
data = self.data_dict[self.name_list[idx]] | |
motion, m_length, text_list = data["motion"], data["length"], data["text"] | |
# Randomly select a caption | |
text_data = random.choice(text_list) | |
caption = text_data["caption"] | |
"Z Normalization" | |
if self.mode not in ["xyz_gt", "hml_gt"]: | |
motion = (motion - self.mean) / self.std | |
"crop motion" | |
if self.mode in ["eval", "gt_eval"]: | |
# Crop the motions in to times of 4, and introduce small variations | |
if self.unit_length < 10: | |
coin2 = np.random.choice(["single", "single", "double"]) | |
else: | |
coin2 = "single" | |
if coin2 == "double": | |
m_length = (m_length // self.unit_length - 1) * self.unit_length | |
elif coin2 == "single": | |
m_length = (m_length // self.unit_length) * self.unit_length | |
idx = random.randint(0, len(motion) - m_length) | |
motion = motion[idx : idx + m_length] | |
elif m_length >= self.max_motion_length: | |
idx = random.randint(0, len(motion) - self.max_motion_length) | |
motion = motion[idx : idx + self.max_motion_length] | |
m_length = self.max_motion_length | |
"pad motion" | |
if m_length < self.max_motion_length: | |
motion = np.concatenate( | |
[ | |
motion, | |
np.zeros((self.max_motion_length - m_length, motion.shape[1])), | |
], | |
axis=0, | |
) | |
assert len(motion) == self.max_motion_length | |
if self.mode in ["gt_eval", "eval"]: | |
"word embedding for text-to-motion evaluation" | |
tokens = text_data["tokens"] | |
if len(tokens) < self.max_text_len: | |
# pad with "unk" | |
tokens = ["sos/OTHER"] + tokens + ["eos/OTHER"] | |
sent_len = len(tokens) | |
tokens = tokens + ["unk/OTHER"] * (self.max_text_len + 2 - sent_len) | |
else: | |
# crop | |
tokens = tokens[: self.max_text_len] | |
tokens = ["sos/OTHER"] + tokens + ["eos/OTHER"] | |
sent_len = len(tokens) | |
pos_one_hots = [] | |
word_embeddings = [] | |
for token in tokens: | |
word_emb, pos_oh = self.w_vectorizer[token] | |
pos_one_hots.append(pos_oh[None, :]) | |
word_embeddings.append(word_emb[None, :]) | |
pos_one_hots = np.concatenate(pos_one_hots, axis=0) | |
word_embeddings = np.concatenate(word_embeddings, axis=0) | |
return ( | |
word_embeddings, | |
pos_one_hots, | |
caption, | |
sent_len, | |
motion, | |
m_length, | |
"_".join(tokens), | |
) | |
elif self.mode in ["xyz_gt"]: | |
"Convert motion hml representation to skeleton points xyz" | |
# 1. Use kn to get the keypoints position (the padding position after kn is all zero) | |
motion = torch.from_numpy(motion).float() | |
pred_joints = recover_from_ric( | |
motion, self.joints_num | |
) # (nframe, njoints, 3) | |
# 2. Put on Floor (Y axis) | |
floor_height = pred_joints.min(dim=0)[0].min(dim=0)[0][1] | |
pred_joints[:, :, 1] -= floor_height | |
return pred_joints | |
return caption, motion, m_length | |
class HumanML3D(Text2MotionDataset): | |
def __init__(self, opt, split="train", mode="train", accelerator=None): | |
self.data_root = "./data/HumanML3D" | |
self.min_motion_len = 40 | |
self.joints_num = 22 | |
self.dim_pose = 263 | |
self.max_motion_length = 196 | |
if accelerator: | |
accelerator.print( | |
"\n Loading %s mode HumanML3D %s dataset ..." % (mode, split) | |
) | |
else: | |
print("\n Loading %s mode HumanML3D dataset ..." % mode) | |
super(HumanML3D, self).__init__(opt, split, mode, accelerator) | |
class KIT(Text2MotionDataset): | |
def __init__(self, opt, split="train", mode="train", accelerator=None): | |
self.data_root = "./data/KIT-ML" | |
self.min_motion_len = 24 | |
self.joints_num = 21 | |
self.dim_pose = 251 | |
self.max_motion_length = 196 | |
if accelerator: | |
accelerator.print("\n Loading %s mode KIT %s dataset ..." % (mode, split)) | |
else: | |
print("\n Loading %s mode KIT dataset ..." % mode) | |
super(KIT, self).__init__(opt, split, mode, accelerator) | |