ProPainter / model /propainter.py
sczhou's picture
init code
320e465
raw
history blame
22.1 kB
''' Towards An End-to-End Framework for Video Inpainting
'''
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from einops import rearrange
from model.modules.base_module import BaseNetwork
from model.modules.sparse_transformer import TemporalSparseTransformerBlock, SoftSplit, SoftComp
from model.modules.spectral_norm import spectral_norm as _spectral_norm
from model.modules.flow_loss_utils import flow_warp
from model.modules.deformconv import ModulatedDeformConv2d
from .misc import constant_init
def length_sq(x):
return torch.sum(torch.square(x), dim=1, keepdim=True)
def fbConsistencyCheck(flow_fw, flow_bw, alpha1=0.01, alpha2=0.5):
flow_bw_warped = flow_warp(flow_bw, flow_fw.permute(0, 2, 3, 1)) # wb(wf(x))
flow_diff_fw = flow_fw + flow_bw_warped # wf + wb(wf(x))
mag_sq_fw = length_sq(flow_fw) + length_sq(flow_bw_warped) # |wf| + |wb(wf(x))|
occ_thresh_fw = alpha1 * mag_sq_fw + alpha2
# fb_valid_fw = (length_sq(flow_diff_fw) < occ_thresh_fw).float()
fb_valid_fw = (length_sq(flow_diff_fw) < occ_thresh_fw).to(flow_fw)
return fb_valid_fw
class DeformableAlignment(ModulatedDeformConv2d):
"""Second-order deformable alignment module."""
def __init__(self, *args, **kwargs):
# self.max_residue_magnitude = kwargs.pop('max_residue_magnitude', 10)
self.max_residue_magnitude = kwargs.pop('max_residue_magnitude', 3)
super(DeformableAlignment, self).__init__(*args, **kwargs)
self.conv_offset = nn.Sequential(
nn.Conv2d(2*self.out_channels + 2 + 1 + 2, self.out_channels, 3, 1, 1),
nn.LeakyReLU(negative_slope=0.1, inplace=True),
nn.Conv2d(self.out_channels, self.out_channels, 3, 1, 1),
nn.LeakyReLU(negative_slope=0.1, inplace=True),
nn.Conv2d(self.out_channels, self.out_channels, 3, 1, 1),
nn.LeakyReLU(negative_slope=0.1, inplace=True),
nn.Conv2d(self.out_channels, 27 * self.deform_groups, 3, 1, 1),
)
self.init_offset()
def init_offset(self):
constant_init(self.conv_offset[-1], val=0, bias=0)
def forward(self, x, cond_feat, flow):
out = self.conv_offset(cond_feat)
o1, o2, mask = torch.chunk(out, 3, dim=1)
# offset
offset = self.max_residue_magnitude * torch.tanh(torch.cat((o1, o2), dim=1))
offset = offset + flow.flip(1).repeat(1, offset.size(1) // 2, 1, 1)
# mask
mask = torch.sigmoid(mask)
return torchvision.ops.deform_conv2d(x, offset, self.weight, self.bias,
self.stride, self.padding,
self.dilation, mask)
class BidirectionalPropagation(nn.Module):
def __init__(self, channel, learnable=True):
super(BidirectionalPropagation, self).__init__()
self.deform_align = nn.ModuleDict()
self.backbone = nn.ModuleDict()
self.channel = channel
self.prop_list = ['backward_1', 'forward_1']
self.learnable = learnable
if self.learnable:
for i, module in enumerate(self.prop_list):
self.deform_align[module] = DeformableAlignment(
channel, channel, 3, padding=1, deform_groups=16)
self.backbone[module] = nn.Sequential(
nn.Conv2d(2*channel+2, channel, 3, 1, 1),
nn.LeakyReLU(negative_slope=0.2, inplace=True),
nn.Conv2d(channel, channel, 3, 1, 1),
)
self.fuse = nn.Sequential(
nn.Conv2d(2*channel+2, channel, 3, 1, 1),
nn.LeakyReLU(negative_slope=0.2, inplace=True),
nn.Conv2d(channel, channel, 3, 1, 1),
)
def binary_mask(self, mask, th=0.1):
mask[mask>th] = 1
mask[mask<=th] = 0
# return mask.float()
return mask.to(mask)
def forward(self, x, flows_forward, flows_backward, mask, interpolation='bilinear'):
"""
x shape : [b, t, c, h, w]
return [b, t, c, h, w]
"""
# For backward warping
# pred_flows_forward for backward feature propagation
# pred_flows_backward for forward feature propagation
b, t, c, h, w = x.shape
feats, masks = {}, {}
feats['input'] = [x[:, i, :, :, :] for i in range(0, t)]
masks['input'] = [mask[:, i, :, :, :] for i in range(0, t)]
prop_list = ['backward_1', 'forward_1']
cache_list = ['input'] + prop_list
for p_i, module_name in enumerate(prop_list):
feats[module_name] = []
masks[module_name] = []
if 'backward' in module_name:
frame_idx = range(0, t)
frame_idx = frame_idx[::-1]
flow_idx = frame_idx
flows_for_prop = flows_forward
flows_for_check = flows_backward
else:
frame_idx = range(0, t)
flow_idx = range(-1, t - 1)
flows_for_prop = flows_backward
flows_for_check = flows_forward
for i, idx in enumerate(frame_idx):
feat_current = feats[cache_list[p_i]][idx]
mask_current = masks[cache_list[p_i]][idx]
if i == 0:
feat_prop = feat_current
mask_prop = mask_current
else:
flow_prop = flows_for_prop[:, flow_idx[i], :, :, :]
flow_check = flows_for_check[:, flow_idx[i], :, :, :]
flow_vaild_mask = fbConsistencyCheck(flow_prop, flow_check)
feat_warped = flow_warp(feat_prop, flow_prop.permute(0, 2, 3, 1), interpolation)
if self.learnable:
cond = torch.cat([feat_current, feat_warped, flow_prop, flow_vaild_mask, mask_current], dim=1)
feat_prop = self.deform_align[module_name](feat_prop, cond, flow_prop)
mask_prop = mask_current
else:
mask_prop_valid = flow_warp(mask_prop, flow_prop.permute(0, 2, 3, 1))
mask_prop_valid = self.binary_mask(mask_prop_valid)
union_vaild_mask = self.binary_mask(mask_current*flow_vaild_mask*(1-mask_prop_valid))
feat_prop = union_vaild_mask * feat_warped + (1-union_vaild_mask) * feat_current
# update mask
mask_prop = self.binary_mask(mask_current*(1-(flow_vaild_mask*(1-mask_prop_valid))))
# refine
if self.learnable:
feat = torch.cat([feat_current, feat_prop, mask_current], dim=1)
feat_prop = feat_prop + self.backbone[module_name](feat)
# feat_prop = self.backbone[module_name](feat_prop)
feats[module_name].append(feat_prop)
masks[module_name].append(mask_prop)
# end for
if 'backward' in module_name:
feats[module_name] = feats[module_name][::-1]
masks[module_name] = masks[module_name][::-1]
outputs_b = torch.stack(feats['backward_1'], dim=1).view(-1, c, h, w)
outputs_f = torch.stack(feats['forward_1'], dim=1).view(-1, c, h, w)
if self.learnable:
mask_in = mask.view(-1, 2, h, w)
masks_b, masks_f = None, None
outputs = self.fuse(torch.cat([outputs_b, outputs_f, mask_in], dim=1)) + x.view(-1, c, h, w)
else:
masks_b = torch.stack(masks['backward_1'], dim=1)
masks_f = torch.stack(masks['forward_1'], dim=1)
outputs = outputs_f
return outputs_b.view(b, -1, c, h, w), outputs_f.view(b, -1, c, h, w), \
outputs.view(b, -1, c, h, w), masks_f
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.group = [1, 2, 4, 8, 1]
self.layers = nn.ModuleList([
nn.Conv2d(5, 64, kernel_size=3, stride=2, padding=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1, groups=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(640, 512, kernel_size=3, stride=1, padding=1, groups=2),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(768, 384, kernel_size=3, stride=1, padding=1, groups=4),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(640, 256, kernel_size=3, stride=1, padding=1, groups=8),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(512, 128, kernel_size=3, stride=1, padding=1, groups=1),
nn.LeakyReLU(0.2, inplace=True)
])
def forward(self, x):
bt, c, _, _ = x.size()
# h, w = h//4, w//4
out = x
for i, layer in enumerate(self.layers):
if i == 8:
x0 = out
_, _, h, w = x0.size()
if i > 8 and i % 2 == 0:
g = self.group[(i - 8) // 2]
x = x0.view(bt, g, -1, h, w)
o = out.view(bt, g, -1, h, w)
out = torch.cat([x, o], 2).view(bt, -1, h, w)
out = layer(out)
return out
class deconv(nn.Module):
def __init__(self,
input_channel,
output_channel,
kernel_size=3,
padding=0):
super().__init__()
self.conv = nn.Conv2d(input_channel,
output_channel,
kernel_size=kernel_size,
stride=1,
padding=padding)
def forward(self, x):
x = F.interpolate(x,
scale_factor=2,
mode='bilinear',
align_corners=True)
return self.conv(x)
class InpaintGenerator(BaseNetwork):
def __init__(self, init_weights=True, model_path=None):
super(InpaintGenerator, self).__init__()
channel = 128
hidden = 512
# encoder
self.encoder = Encoder()
# decoder
self.decoder = nn.Sequential(
deconv(channel, 128, kernel_size=3, padding=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1),
nn.LeakyReLU(0.2, inplace=True),
deconv(64, 64, kernel_size=3, padding=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1))
# soft split and soft composition
kernel_size = (7, 7)
padding = (3, 3)
stride = (3, 3)
t2t_params = {
'kernel_size': kernel_size,
'stride': stride,
'padding': padding
}
self.ss = SoftSplit(channel, hidden, kernel_size, stride, padding)
self.sc = SoftComp(channel, hidden, kernel_size, stride, padding)
self.max_pool = nn.MaxPool2d(kernel_size, stride, padding)
# feature propagation module
self.img_prop_module = BidirectionalPropagation(3, learnable=False)
self.feat_prop_module = BidirectionalPropagation(128, learnable=True)
depths = 8
num_heads = 4
window_size = (5, 9)
pool_size = (4, 4)
self.transformers = TemporalSparseTransformerBlock(dim=hidden,
n_head=num_heads,
window_size=window_size,
pool_size=pool_size,
depths=depths,
t2t_params=t2t_params)
if init_weights:
self.init_weights()
if model_path is not None:
print('Pretrained ProPainter has loaded...')
ckpt = torch.load(model_path, map_location='cpu')
self.load_state_dict(ckpt, strict=True)
# print network parameter number
self.print_network()
def img_propagation(self, masked_frames, completed_flows, masks, interpolation='nearest'):
_, _, prop_frames, updated_masks = self.img_prop_module(masked_frames, completed_flows[0], completed_flows[1], masks, interpolation)
return prop_frames, updated_masks
def forward(self, masked_frames, completed_flows, masks_in, masks_updated, num_local_frames, interpolation='bilinear', t_dilation=2):
"""
Args:
masks_in: original mask
masks_updated: updated mask after image propagation
"""
l_t = num_local_frames
b, t, _, ori_h, ori_w = masked_frames.size()
# extracting features
enc_feat = self.encoder(torch.cat([masked_frames.view(b * t, 3, ori_h, ori_w),
masks_in.view(b * t, 1, ori_h, ori_w),
masks_updated.view(b * t, 1, ori_h, ori_w)], dim=1))
_, c, h, w = enc_feat.size()
local_feat = enc_feat.view(b, t, c, h, w)[:, :l_t, ...]
ref_feat = enc_feat.view(b, t, c, h, w)[:, l_t:, ...]
fold_feat_size = (h, w)
ds_flows_f = F.interpolate(completed_flows[0].view(-1, 2, ori_h, ori_w), scale_factor=1/4, mode='bilinear', align_corners=False).view(b, l_t-1, 2, h, w)/4.0
ds_flows_b = F.interpolate(completed_flows[1].view(-1, 2, ori_h, ori_w), scale_factor=1/4, mode='bilinear', align_corners=False).view(b, l_t-1, 2, h, w)/4.0
ds_mask_in = F.interpolate(masks_in.reshape(-1, 1, ori_h, ori_w), scale_factor=1/4, mode='nearest').view(b, t, 1, h, w)
ds_mask_in_local = ds_mask_in[:, :l_t]
ds_mask_updated_local = F.interpolate(masks_updated[:,:l_t].reshape(-1, 1, ori_h, ori_w), scale_factor=1/4, mode='nearest').view(b, l_t, 1, h, w)
if self.training:
mask_pool_l = self.max_pool(ds_mask_in.view(-1, 1, h, w))
mask_pool_l = mask_pool_l.view(b, t, 1, mask_pool_l.size(-2), mask_pool_l.size(-1))
else:
mask_pool_l = self.max_pool(ds_mask_in_local.view(-1, 1, h, w))
mask_pool_l = mask_pool_l.view(b, l_t, 1, mask_pool_l.size(-2), mask_pool_l.size(-1))
prop_mask_in = torch.cat([ds_mask_in_local, ds_mask_updated_local], dim=2)
_, _, local_feat, _ = self.feat_prop_module(local_feat, ds_flows_f, ds_flows_b, prop_mask_in, interpolation)
enc_feat = torch.cat((local_feat, ref_feat), dim=1)
trans_feat = self.ss(enc_feat.view(-1, c, h, w), b, fold_feat_size)
mask_pool_l = rearrange(mask_pool_l, 'b t c h w -> b t h w c').contiguous()
trans_feat = self.transformers(trans_feat, fold_feat_size, mask_pool_l, t_dilation=t_dilation)
trans_feat = self.sc(trans_feat, t, fold_feat_size)
trans_feat = trans_feat.view(b, t, -1, h, w)
enc_feat = enc_feat + trans_feat
if self.training:
output = self.decoder(enc_feat.view(-1, c, h, w))
output = torch.tanh(output).view(b, t, 3, ori_h, ori_w)
else:
output = self.decoder(enc_feat[:, :l_t].view(-1, c, h, w))
output = torch.tanh(output).view(b, l_t, 3, ori_h, ori_w)
return output
# ######################################################################
# Discriminator for Temporal Patch GAN
# ######################################################################
class Discriminator(BaseNetwork):
def __init__(self,
in_channels=3,
use_sigmoid=False,
use_spectral_norm=True,
init_weights=True):
super(Discriminator, self).__init__()
self.use_sigmoid = use_sigmoid
nf = 32
self.conv = nn.Sequential(
spectral_norm(
nn.Conv3d(in_channels=in_channels,
out_channels=nf * 1,
kernel_size=(3, 5, 5),
stride=(1, 2, 2),
padding=1,
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(64, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 1,
nf * 2,
kernel_size=(3, 5, 5),
stride=(1, 2, 2),
padding=(1, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(128, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 2,
nf * 4,
kernel_size=(3, 5, 5),
stride=(1, 2, 2),
padding=(1, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(256, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 4,
nf * 4,
kernel_size=(3, 5, 5),
stride=(1, 2, 2),
padding=(1, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(256, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 4,
nf * 4,
kernel_size=(3, 5, 5),
stride=(1, 2, 2),
padding=(1, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(256, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv3d(nf * 4,
nf * 4,
kernel_size=(3, 5, 5),
stride=(1, 2, 2),
padding=(1, 2, 2)))
if init_weights:
self.init_weights()
def forward(self, xs):
# T, C, H, W = xs.shape (old)
# B, T, C, H, W (new)
xs_t = torch.transpose(xs, 1, 2)
feat = self.conv(xs_t)
if self.use_sigmoid:
feat = torch.sigmoid(feat)
out = torch.transpose(feat, 1, 2) # B, T, C, H, W
return out
class Discriminator_2D(BaseNetwork):
def __init__(self,
in_channels=3,
use_sigmoid=False,
use_spectral_norm=True,
init_weights=True):
super(Discriminator_2D, self).__init__()
self.use_sigmoid = use_sigmoid
nf = 32
self.conv = nn.Sequential(
spectral_norm(
nn.Conv3d(in_channels=in_channels,
out_channels=nf * 1,
kernel_size=(1, 5, 5),
stride=(1, 2, 2),
padding=(0, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(64, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 1,
nf * 2,
kernel_size=(1, 5, 5),
stride=(1, 2, 2),
padding=(0, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(128, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 2,
nf * 4,
kernel_size=(1, 5, 5),
stride=(1, 2, 2),
padding=(0, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(256, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 4,
nf * 4,
kernel_size=(1, 5, 5),
stride=(1, 2, 2),
padding=(0, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(256, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
spectral_norm(
nn.Conv3d(nf * 4,
nf * 4,
kernel_size=(1, 5, 5),
stride=(1, 2, 2),
padding=(0, 2, 2),
bias=not use_spectral_norm), use_spectral_norm),
# nn.InstanceNorm2d(256, track_running_stats=False),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv3d(nf * 4,
nf * 4,
kernel_size=(1, 5, 5),
stride=(1, 2, 2),
padding=(0, 2, 2)))
if init_weights:
self.init_weights()
def forward(self, xs):
# T, C, H, W = xs.shape (old)
# B, T, C, H, W (new)
xs_t = torch.transpose(xs, 1, 2)
feat = self.conv(xs_t)
if self.use_sigmoid:
feat = torch.sigmoid(feat)
out = torch.transpose(feat, 1, 2) # B, T, C, H, W
return out
def spectral_norm(module, mode=True):
if mode:
return _spectral_norm(module)
return module