## Restormer: Efficient Transformer for High-Resolution Image Restoration ## Syed Waqas Zamir, Aditya Arora, Salman Khan, Munawar Hayat, Fahad Shahbaz Khan, and Ming-Hsuan Yang ## https://arxiv.org/abs/2111.09881 import torch import torch.nn.functional as F import os import argparse from scipy.spatial import ConvexHull from tqdm import tqdm import numpy as np parser = argparse.ArgumentParser(description='Test DaGAN on your own images') parser.add_argument('--source_image', default='./temp/source.jpg', type=str, help='Directory of input source image') parser.add_argument('--driving_video', default='./temp/driving.mp4', type=str, help='Directory for driving video') parser.add_argument('--output', default='./temp/result.mp4', type=str, help='Directory for driving video') args = parser.parse_args() def normalize_kp(kp_source, kp_driving, kp_driving_initial, adapt_movement_scale=False, use_relative_movement=False, use_relative_jacobian=False): if adapt_movement_scale: source_area = ConvexHull(kp_source['value'][0].data.cpu().numpy()).volume driving_area = ConvexHull(kp_driving_initial['value'][0].data.cpu().numpy()).volume adapt_movement_scale = np.sqrt(source_area) / np.sqrt(driving_area) else: adapt_movement_scale = 1 kp_new = {k: v for k, v in kp_driving.items()} if use_relative_movement: kp_value_diff = (kp_driving['value'] - kp_driving_initial['value']) kp_value_diff *= adapt_movement_scale kp_new['value'] = kp_value_diff + kp_source['value'] if use_relative_jacobian: jacobian_diff = torch.matmul(kp_driving['jacobian'], torch.inverse(kp_driving_initial['jacobian'])) kp_new['jacobian'] = torch.matmul(jacobian_diff, kp_source['jacobian']) return kp_new def find_best_frame(source, driving, cpu=False): import face_alignment def normalize_kp(kp): kp = kp - kp.mean(axis=0, keepdims=True) area = ConvexHull(kp[:, :2]).volume area = np.sqrt(area) kp[:, :2] = kp[:, :2] / area return kp fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=True, device='cpu' if cpu else 'cuda') kp_source = fa.get_landmarks(255 * source)[0] kp_source = normalize_kp(kp_source) norm = float('inf') frame_num = 0 for i, image in tqdm(enumerate(driving)): kp_driving = fa.get_landmarks(255 * image)[0] kp_driving = normalize_kp(kp_driving) new_norm = (np.abs(kp_source - kp_driving) ** 2).sum() if new_norm < norm: norm = new_norm frame_num = i return frame_num def make_animation(source_image, driving_video, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False): sources = [] drivings = [] with torch.no_grad(): predictions = [] depth_gray = [] source = torch.tensor(source_image[np.newaxis].astype(np.float32)).permute(0, 3, 1, 2) driving = torch.tensor(np.array(driving_video)[np.newaxis].astype(np.float32)).permute(0, 4, 1, 2, 3) if not cpu: source = source.cuda() driving = driving.cuda() outputs = depth_decoder(depth_encoder(source)) depth_source = outputs[("disp", 0)] outputs = depth_decoder(depth_encoder(driving[:, :, 0])) depth_driving = outputs[("disp", 0)] source_kp = torch.cat((source,depth_source),1) driving_kp = torch.cat((driving[:, :, 0],depth_driving),1) kp_source = kp_detector(source_kp) kp_driving_initial = kp_detector(driving_kp) # kp_source = kp_detector(source) # kp_driving_initial = kp_detector(driving[:, :, 0]) for frame_idx in tqdm(range(driving.shape[2])): driving_frame = driving[:, :, frame_idx] if not cpu: driving_frame = driving_frame.cuda() outputs = depth_decoder(depth_encoder(driving_frame)) depth_map = outputs[("disp", 0)] gray_driving = np.transpose(depth_map.data.cpu().numpy(), [0, 2, 3, 1])[0] gray_driving = 1-gray_driving/np.max(gray_driving) frame = torch.cat((driving_frame,depth_map),1) kp_driving = kp_detector(frame) kp_norm = normalize_kp(kp_source=kp_source, kp_driving=kp_driving, kp_driving_initial=kp_driving_initial, use_relative_movement=relative, use_relative_jacobian=relative, adapt_movement_scale=adapt_movement_scale) out = generator(source, kp_source=kp_source, kp_driving=kp_norm,source_depth = depth_source, driving_depth = depth_map) drivings.append(np.transpose(driving_frame.data.cpu().numpy(), [0, 2, 3, 1])[0]) sources.append(np.transpose(source.data.cpu().numpy(), [0, 2, 3, 1])[0]) predictions.append(np.transpose(out['prediction'].data.cpu().numpy(), [0, 2, 3, 1])[0]) depth_gray.append(gray_driving) return sources, drivings, predictions,depth_gray # with open("config/vox-adv-256.yaml") as f: # config = yaml.load(f) # generator = G.SPADEDepthAwareGenerator(**config['model_params']['generator_params'],**config['model_params']['common_params']) # config['model_params']['common_params']['num_channels'] = 4 # kp_detector = KPD.KPDetector(**config['model_params']['kp_detector_params'],**config['model_params']['common_params']) # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # g_checkpoint = torch.load("generator.pt", map_location=device) # kp_checkpoint = torch.load("kp_detector.pt", map_location=device) # ckp_generator = OrderedDict((k.replace('module.',''),v) for k,v in g_checkpoint.items()) # generator.load_state_dict(ckp_generator) # ckp_kp_detector = OrderedDict((k.replace('module.',''),v) for k,v in kp_checkpoint.items()) # kp_detector.load_state_dict(ckp_kp_detector) # depth_encoder = depth.ResnetEncoder(18, False) # depth_decoder = depth.DepthDecoder(num_ch_enc=depth_encoder.num_ch_enc, scales=range(4)) # loaded_dict_enc = torch.load('encoder.pth') # loaded_dict_dec = torch.load('depth.pth') # filtered_dict_enc = {k: v for k, v in loaded_dict_enc.items() if k in depth_encoder.state_dict()} # depth_encoder.load_state_dict(filtered_dict_enc) # ckp_depth_decoder= {k: v for k, v in loaded_dict_dec.items() if k in depth_decoder.state_dict()} # depth_decoder.load_state_dict(ckp_depth_decoder) # depth_encoder.eval() # depth_decoder.eval() # # device = torch.device('cpu') # # stx() # generator = generator.to(device) # kp_detector = kp_detector.to(device) # depth_encoder = depth_encoder.to(device) # depth_decoder = depth_decoder.to(device) # generator.eval() # kp_detector.eval() # depth_encoder.eval() # depth_decoder.eval() # img_multiple_of = 8 # with torch.inference_mode(): # if torch.cuda.is_available(): # torch.cuda.ipc_collect() # torch.cuda.empty_cache() # source_image = imageio.imread(args.source_image) # reader = imageio.get_reader(args.driving_video) # fps = reader.get_meta_data()['fps'] # driving_video = [] # try: # for im in reader: # driving_video.append(im) # except RuntimeError: # pass # reader.close() # source_image = resize(source_image, (256, 256))[..., :3] # driving_video = [resize(frame, (256, 256))[..., :3] for frame in driving_video] # i = find_best_frame(source_image, driving_video) # print ("Best frame: " + str(i)) # driving_forward = driving_video[i:] # driving_backward = driving_video[:(i+1)][::-1] # sources_forward, drivings_forward, predictions_forward,depth_forward = make_animation(source_image, driving_forward, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False) # sources_backward, drivings_backward, predictions_backward,depth_backward = make_animation(source_image, driving_backward, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False) # predictions = predictions_backward[::-1] + predictions_forward[1:] # sources = sources_backward[::-1] + sources_forward[1:] # drivings = drivings_backward[::-1] + drivings_forward[1:] # depth_gray = depth_backward[::-1] + depth_forward[1:] # imageio.mimsave(args.output, [np.concatenate((img_as_ubyte(s),img_as_ubyte(d),img_as_ubyte(p)),1) for (s,d,p) in zip(sources, drivings, predictions)], fps=fps) # imageio.mimsave("gray.mp4", depth_gray, fps=fps) # # merge the gray video # animation = np.array(imageio.mimread(args.output,memtest=False)) # gray = np.array(imageio.mimread("gray.mp4",memtest=False)) # src_dst = animation[:,:,:512,:] # animate = animation[:,:,512:,:] # merge = np.concatenate((src_dst,gray,animate),2) # imageio.mimsave(args.output, merge, fps=fps) # print(f"\nRestored images are saved at {out_dir}")