# Codes are borrowed from # https://github.com/ZHKKKe/MODNet/blob/master/src/trainer.py # https://github.com/ZHKKKe/MODNet/blob/master/src/models/backbones/mobilenetv2.py # https://github.com/ZHKKKe/MODNet/blob/master/src/models/modnet.py import numpy as np import scipy import torch import torch.nn as nn import torch.nn.functional as F import os import math import torch from scipy.ndimage import gaussian_filter # ---------------------------------------------------------------------------------- # Loss Functions # ---------------------------------------------------------------------------------- class GaussianBlurLayer(nn.Module): """ Add Gaussian Blur to a 4D tensors This layer takes a 4D tensor of {N, C, H, W} as input. The Gaussian blur will be performed in given channel number (C) splitly. """ def __init__(self, channels, kernel_size): """ Arguments: channels (int): Channel for input tensor kernel_size (int): Size of the kernel used in blurring """ super(GaussianBlurLayer, self).__init__() self.channels = channels self.kernel_size = kernel_size assert self.kernel_size % 2 != 0 self.op = nn.Sequential( nn.ReflectionPad2d(math.floor(self.kernel_size / 2)), nn.Conv2d(channels, channels, self.kernel_size, stride=1, padding=0, bias=None, groups=channels) ) self._init_kernel() def forward(self, x): """ Arguments: x (torch.Tensor): input 4D tensor Returns: torch.Tensor: Blurred version of the input """ if not len(list(x.shape)) == 4: print('\'GaussianBlurLayer\' requires a 4D tensor as input\n') exit() elif not x.shape[1] == self.channels: print('In \'GaussianBlurLayer\', the required channel ({0}) is' 'not the same as input ({1})\n'.format(self.channels, x.shape[1])) exit() return self.op(x) def _init_kernel(self): sigma = 0.3 * ((self.kernel_size - 1) * 0.5 - 1) + 0.8 n = np.zeros((self.kernel_size, self.kernel_size)) i = math.floor(self.kernel_size / 2) n[i, i] = 1 kernel = gaussian_filter(n, sigma) for name, param in self.named_parameters(): param.data.copy_(torch.from_numpy(kernel)) param.requires_grad = False blurer = GaussianBlurLayer(1, 3) def loss_func(pred_semantic, pred_detail, pred_matte, image, trimap, gt_matte, semantic_scale=10.0, detail_scale=10.0, matte_scale=1.0): """ loss of MODNet Arguments: blurer: GaussianBlurLayer pred_semantic: model output pred_detail: model output pred_matte: model output image : input RGB image ts pixel values should be normalized trimap : trimap used to calculate the losses its pixel values can be 0, 0.5, or 1 (foreground=1, background=0, unknown=0.5) gt_matte: ground truth alpha matte its pixel values are between [0, 1] semantic_scale (float): scale of the semantic loss NOTE: please adjust according to your dataset detail_scale (float): scale of the detail loss NOTE: please adjust according to your dataset matte_scale (float): scale of the matte loss NOTE: please adjust according to your dataset Returns: semantic_loss (torch.Tensor): loss of the semantic estimation [Low-Resolution (LR) Branch] detail_loss (torch.Tensor): loss of the detail prediction [High-Resolution (HR) Branch] matte_loss (torch.Tensor): loss of the semantic-detail fusion [Fusion Branch] """ trimap = trimap.float() # calculate the boundary mask from the trimap boundaries = (trimap < 0.5) + (trimap > 0.5) # calculate the semantic loss gt_semantic = F.interpolate(gt_matte, scale_factor=1 / 16, mode='bilinear') gt_semantic = blurer(gt_semantic) semantic_loss = torch.mean(F.mse_loss(pred_semantic, gt_semantic)) semantic_loss = semantic_scale * semantic_loss # calculate the detail loss pred_boundary_detail = torch.where(boundaries, trimap, pred_detail.float()) gt_detail = torch.where(boundaries, trimap, gt_matte.float()) detail_loss = torch.mean(F.l1_loss(pred_boundary_detail, gt_detail.float())) detail_loss = detail_scale * detail_loss # calculate the matte loss pred_boundary_matte = torch.where(boundaries, trimap, pred_matte.float()) matte_l1_loss = F.l1_loss(pred_matte, gt_matte) + 4.0 * F.l1_loss(pred_boundary_matte, gt_matte) matte_compositional_loss = F.l1_loss(image * pred_matte, image * gt_matte) \ + 4.0 * F.l1_loss(image * pred_boundary_matte, image * gt_matte) matte_loss = torch.mean(matte_l1_loss + matte_compositional_loss) matte_loss = matte_scale * matte_loss return semantic_loss, detail_loss, matte_loss # ------------------------------------------------------------------------------ # Useful functions # ------------------------------------------------------------------------------ def _make_divisible(v, divisor, min_value=None): if min_value is None: min_value = divisor new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) # Make sure that round down does not go down by more than 10%. if new_v < 0.9 * v: new_v += divisor return new_v def conv_bn(inp, oup, stride): return nn.Sequential( nn.Conv2d(inp, oup, 3, stride, 1, bias=False), nn.BatchNorm2d(oup), nn.ReLU6(inplace=True) ) def conv_1x1_bn(inp, oup): return nn.Sequential( nn.Conv2d(inp, oup, 1, 1, 0, bias=False), nn.BatchNorm2d(oup), nn.ReLU6(inplace=True) ) # ------------------------------------------------------------------------------ # Class of Inverted Residual block # ------------------------------------------------------------------------------ class InvertedResidual(nn.Module): def __init__(self, inp, oup, stride, expansion, dilation=1): super(InvertedResidual, self).__init__() self.stride = stride assert stride in [1, 2] hidden_dim = round(inp * expansion) self.use_res_connect = self.stride == 1 and inp == oup if expansion == 1: self.conv = nn.Sequential( # dw nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, dilation=dilation, bias=False), nn.BatchNorm2d(hidden_dim), nn.ReLU6(inplace=True), # pw-linear nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), nn.BatchNorm2d(oup), ) else: self.conv = nn.Sequential( # pw nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False), nn.BatchNorm2d(hidden_dim), nn.ReLU6(inplace=True), # dw nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, dilation=dilation, bias=False), nn.BatchNorm2d(hidden_dim), nn.ReLU6(inplace=True), # pw-linear nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), nn.BatchNorm2d(oup), ) def forward(self, x): if self.use_res_connect: return x + self.conv(x) else: return self.conv(x) # ------------------------------------------------------------------------------ # Class of MobileNetV2 # ------------------------------------------------------------------------------ class MobileNetV2(nn.Module): def __init__(self, in_channels, alpha=1.0, expansion=6, num_classes=1000): super(MobileNetV2, self).__init__() self.in_channels = in_channels self.num_classes = num_classes input_channel = 32 last_channel = 1280 interverted_residual_setting = [ # t, c, n, s [1, 16, 1, 1], [expansion, 24, 2, 2], [expansion, 32, 3, 2], [expansion, 64, 4, 2], [expansion, 96, 3, 1], [expansion, 160, 3, 2], [expansion, 320, 1, 1], ] # building first layer input_channel = _make_divisible(input_channel * alpha, 8) self.last_channel = _make_divisible(last_channel * alpha, 8) if alpha > 1.0 else last_channel self.features = [conv_bn(self.in_channels, input_channel, 2)] # building inverted residual blocks for t, c, n, s in interverted_residual_setting: output_channel = _make_divisible(int(c * alpha), 8) for i in range(n): if i == 0: self.features.append(InvertedResidual(input_channel, output_channel, s, expansion=t)) else: self.features.append(InvertedResidual(input_channel, output_channel, 1, expansion=t)) input_channel = output_channel # building last several layers self.features.append(conv_1x1_bn(input_channel, self.last_channel)) # make it nn.Sequential self.features = nn.Sequential(*self.features) # building classifier if self.num_classes is not None: self.classifier = nn.Sequential( nn.Dropout(0.2), nn.Linear(self.last_channel, num_classes), ) # Initialize weights self._init_weights() def forward(self, x): # Stage1 x = self.features[0](x) x = self.features[1](x) # Stage2 x = self.features[2](x) x = self.features[3](x) # Stage3 x = self.features[4](x) x = self.features[5](x) x = self.features[6](x) # Stage4 x = self.features[7](x) x = self.features[8](x) x = self.features[9](x) x = self.features[10](x) x = self.features[11](x) x = self.features[12](x) x = self.features[13](x) # Stage5 x = self.features[14](x) x = self.features[15](x) x = self.features[16](x) x = self.features[17](x) x = self.features[18](x) # Classification if self.num_classes is not None: x = x.mean(dim=(2, 3)) x = self.classifier(x) # Output return x def _load_pretrained_model(self, pretrained_file): pretrain_dict = torch.load(pretrained_file, map_location='cpu') model_dict = {} state_dict = self.state_dict() print("[MobileNetV2] Loading pretrained model...") for k, v in pretrain_dict.items(): if k in state_dict: model_dict[k] = v else: print(k, "is ignored") state_dict.update(model_dict) self.load_state_dict(state_dict) def _init_weights(self): for m in self.modules(): if isinstance(m, nn.Conv2d): n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels m.weight.data.normal_(0, math.sqrt(2. / n)) if m.bias is not None: m.bias.data.zero_() elif isinstance(m, nn.BatchNorm2d): m.weight.data.fill_(1) m.bias.data.zero_() elif isinstance(m, nn.Linear): n = m.weight.size(1) m.weight.data.normal_(0, 0.01) m.bias.data.zero_() class BaseBackbone(nn.Module): """ Superclass of Replaceable Backbone Model for Semantic Estimation """ def __init__(self, in_channels): super(BaseBackbone, self).__init__() self.in_channels = in_channels self.model = None self.enc_channels = [] def forward(self, x): raise NotImplementedError def load_pretrained_ckpt(self): raise NotImplementedError class MobileNetV2Backbone(BaseBackbone): """ MobileNetV2 Backbone """ def __init__(self, in_channels): super(MobileNetV2Backbone, self).__init__(in_channels) self.model = MobileNetV2(self.in_channels, alpha=1.0, expansion=6, num_classes=None) self.enc_channels = [16, 24, 32, 96, 1280] def forward(self, x): # x = reduce(lambda x, n: self.model.features[n](x), list(range(0, 2)), x) x = self.model.features[0](x) x = self.model.features[1](x) enc2x = x # x = reduce(lambda x, n: self.model.features[n](x), list(range(2, 4)), x) x = self.model.features[2](x) x = self.model.features[3](x) enc4x = x # x = reduce(lambda x, n: self.model.features[n](x), list(range(4, 7)), x) x = self.model.features[4](x) x = self.model.features[5](x) x = self.model.features[6](x) enc8x = x # x = reduce(lambda x, n: self.model.features[n](x), list(range(7, 14)), x) x = self.model.features[7](x) x = self.model.features[8](x) x = self.model.features[9](x) x = self.model.features[10](x) x = self.model.features[11](x) x = self.model.features[12](x) x = self.model.features[13](x) enc16x = x # x = reduce(lambda x, n: self.model.features[n](x), list(range(14, 19)), x) x = self.model.features[14](x) x = self.model.features[15](x) x = self.model.features[16](x) x = self.model.features[17](x) x = self.model.features[18](x) enc32x = x return [enc2x, enc4x, enc8x, enc16x, enc32x] def load_pretrained_ckpt(self): # the pre-trained model is provided by https://github.com/thuyngch/Human-Segmentation-PyTorch ckpt_path = './pretrained/mobilenetv2_human_seg.ckpt' if not os.path.exists(ckpt_path): print('cannot find the pretrained mobilenetv2 backbone') exit() ckpt = torch.load(ckpt_path) self.model.load_state_dict(ckpt) SUPPORTED_BACKBONES = { 'mobilenetv2': MobileNetV2Backbone, } # ------------------------------------------------------------------------------ # MODNet Basic Modules # ------------------------------------------------------------------------------ class IBNorm(nn.Module): """ Combine Instance Norm and Batch Norm into One Layer """ def __init__(self, in_channels): super(IBNorm, self).__init__() in_channels = in_channels self.bnorm_channels = int(in_channels / 2) self.inorm_channels = in_channels - self.bnorm_channels self.bnorm = nn.BatchNorm2d(self.bnorm_channels, affine=True) self.inorm = nn.InstanceNorm2d(self.inorm_channels, affine=False) def forward(self, x): bn_x = self.bnorm(x[:, :self.bnorm_channels, ...].contiguous()) in_x = self.inorm(x[:, self.bnorm_channels:, ...].contiguous()) return torch.cat((bn_x, in_x), 1) class Conv2dIBNormRelu(nn.Module): """ Convolution + IBNorm + ReLu """ def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True, with_ibn=True, with_relu=True): super(Conv2dIBNormRelu, self).__init__() layers = [ nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=bias) ] if with_ibn: layers.append(IBNorm(out_channels)) if with_relu: layers.append(nn.ReLU(inplace=True)) self.layers = nn.Sequential(*layers) def forward(self, x): return self.layers(x) class SEBlock(nn.Module): """ SE Block Proposed in https://arxiv.org/pdf/1709.01507.pdf """ def __init__(self, in_channels, out_channels, reduction=1): super(SEBlock, self).__init__() self.pool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Sequential( nn.Linear(in_channels, int(in_channels // reduction), bias=False), nn.ReLU(inplace=True), nn.Linear(int(in_channels // reduction), out_channels, bias=False), nn.Sigmoid() ) def forward(self, x): b, c, _, _ = x.size() w = self.pool(x).view(b, c) w = self.fc(w).view(b, c, 1, 1) return x * w.expand_as(x) # ------------------------------------------------------------------------------ # MODNet Branches # ------------------------------------------------------------------------------ class LRBranch(nn.Module): """ Low Resolution Branch of MODNet """ def __init__(self, backbone): super(LRBranch, self).__init__() enc_channels = backbone.enc_channels self.backbone = backbone self.se_block = SEBlock(enc_channels[4], enc_channels[4], reduction=4) self.conv_lr16x = Conv2dIBNormRelu(enc_channels[4], enc_channels[3], 5, stride=1, padding=2) self.conv_lr8x = Conv2dIBNormRelu(enc_channels[3], enc_channels[2], 5, stride=1, padding=2) self.conv_lr = Conv2dIBNormRelu(enc_channels[2], 1, kernel_size=3, stride=2, padding=1, with_ibn=False, with_relu=False) def forward(self, img, inference): enc_features = self.backbone.forward(img) enc2x, enc4x, enc32x = enc_features[0], enc_features[1], enc_features[4] enc32x = self.se_block(enc32x) lr16x = F.interpolate(enc32x, scale_factor=2, mode='bilinear', align_corners=False) lr16x = self.conv_lr16x(lr16x) lr8x = F.interpolate(lr16x, scale_factor=2, mode='bilinear', align_corners=False) lr8x = self.conv_lr8x(lr8x) pred_semantic = None if not inference: lr = self.conv_lr(lr8x) pred_semantic = torch.sigmoid(lr) return pred_semantic, lr8x, [enc2x, enc4x] class HRBranch(nn.Module): """ High Resolution Branch of MODNet """ def __init__(self, hr_channels, enc_channels): super(HRBranch, self).__init__() self.tohr_enc2x = Conv2dIBNormRelu(enc_channels[0], hr_channels, 1, stride=1, padding=0) self.conv_enc2x = Conv2dIBNormRelu(hr_channels + 3, hr_channels, 3, stride=2, padding=1) self.tohr_enc4x = Conv2dIBNormRelu(enc_channels[1], hr_channels, 1, stride=1, padding=0) self.conv_enc4x = Conv2dIBNormRelu(2 * hr_channels, 2 * hr_channels, 3, stride=1, padding=1) self.conv_hr4x = nn.Sequential( Conv2dIBNormRelu(3 * hr_channels + 3, 2 * hr_channels, 3, stride=1, padding=1), Conv2dIBNormRelu(2 * hr_channels, 2 * hr_channels, 3, stride=1, padding=1), Conv2dIBNormRelu(2 * hr_channels, hr_channels, 3, stride=1, padding=1), ) self.conv_hr2x = nn.Sequential( Conv2dIBNormRelu(2 * hr_channels, 2 * hr_channels, 3, stride=1, padding=1), Conv2dIBNormRelu(2 * hr_channels, hr_channels, 3, stride=1, padding=1), Conv2dIBNormRelu(hr_channels, hr_channels, 3, stride=1, padding=1), Conv2dIBNormRelu(hr_channels, hr_channels, 3, stride=1, padding=1), ) self.conv_hr = nn.Sequential( Conv2dIBNormRelu(hr_channels + 3, hr_channels, 3, stride=1, padding=1), Conv2dIBNormRelu(hr_channels, 1, kernel_size=1, stride=1, padding=0, with_ibn=False, with_relu=False), ) def forward(self, img, enc2x, enc4x, lr8x, inference): img2x = F.interpolate(img, scale_factor=1 / 2, mode='bilinear', align_corners=False) img4x = F.interpolate(img, scale_factor=1 / 4, mode='bilinear', align_corners=False) enc2x = self.tohr_enc2x(enc2x) hr4x = self.conv_enc2x(torch.cat((img2x, enc2x), dim=1)) enc4x = self.tohr_enc4x(enc4x) hr4x = self.conv_enc4x(torch.cat((hr4x, enc4x), dim=1)) lr4x = F.interpolate(lr8x, scale_factor=2, mode='bilinear', align_corners=False) hr4x = self.conv_hr4x(torch.cat((hr4x, lr4x, img4x), dim=1)) hr2x = F.interpolate(hr4x, scale_factor=2, mode='bilinear', align_corners=False) hr2x = self.conv_hr2x(torch.cat((hr2x, enc2x), dim=1)) pred_detail = None if not inference: hr = F.interpolate(hr2x, scale_factor=2, mode='bilinear', align_corners=False) hr = self.conv_hr(torch.cat((hr, img), dim=1)) pred_detail = torch.sigmoid(hr) return pred_detail, hr2x class FusionBranch(nn.Module): """ Fusion Branch of MODNet """ def __init__(self, hr_channels, enc_channels): super(FusionBranch, self).__init__() self.conv_lr4x = Conv2dIBNormRelu(enc_channels[2], hr_channels, 5, stride=1, padding=2) self.conv_f2x = Conv2dIBNormRelu(2 * hr_channels, hr_channels, 3, stride=1, padding=1) self.conv_f = nn.Sequential( Conv2dIBNormRelu(hr_channels + 3, int(hr_channels / 2), 3, stride=1, padding=1), Conv2dIBNormRelu(int(hr_channels / 2), 1, 1, stride=1, padding=0, with_ibn=False, with_relu=False), ) def forward(self, img, lr8x, hr2x): lr4x = F.interpolate(lr8x, scale_factor=2, mode='bilinear', align_corners=False) lr4x = self.conv_lr4x(lr4x) lr2x = F.interpolate(lr4x, scale_factor=2, mode='bilinear', align_corners=False) f2x = self.conv_f2x(torch.cat((lr2x, hr2x), dim=1)) f = F.interpolate(f2x, scale_factor=2, mode='bilinear', align_corners=False) f = self.conv_f(torch.cat((f, img), dim=1)) pred_matte = torch.sigmoid(f) return pred_matte # ------------------------------------------------------------------------------ # MODNet # ------------------------------------------------------------------------------ class MODNet(nn.Module): """ Architecture of MODNet """ def __init__(self, in_channels=3, hr_channels=32, backbone_arch='mobilenetv2', backbone_pretrained=False): super(MODNet, self).__init__() self.in_channels = in_channels self.hr_channels = hr_channels self.backbone_arch = backbone_arch self.backbone_pretrained = backbone_pretrained self.backbone = SUPPORTED_BACKBONES[self.backbone_arch](self.in_channels) self.lr_branch = LRBranch(self.backbone) self.hr_branch = HRBranch(self.hr_channels, self.backbone.enc_channels) self.f_branch = FusionBranch(self.hr_channels, self.backbone.enc_channels) for m in self.modules(): if isinstance(m, nn.Conv2d): self._init_conv(m) elif isinstance(m, nn.BatchNorm2d) or isinstance(m, nn.InstanceNorm2d): self._init_norm(m) if self.backbone_pretrained: self.backbone.load_pretrained_ckpt() def forward(self, img, inference): pred_semantic, lr8x, [enc2x, enc4x] = self.lr_branch(img, inference) pred_detail, hr2x = self.hr_branch(img, enc2x, enc4x, lr8x, inference) pred_matte = self.f_branch(img, lr8x, hr2x) return pred_semantic, pred_detail, pred_matte @staticmethod def compute_loss(args): pred_semantic, pred_detail, pred_matte, image, trimap, gt_matte = args semantic_loss, detail_loss, matte_loss = loss_func(pred_semantic, pred_detail, pred_matte, image, trimap, gt_matte) loss = semantic_loss + detail_loss + matte_loss return matte_loss, loss def freeze_norm(self): norm_types = [nn.BatchNorm2d, nn.InstanceNorm2d] for m in self.modules(): for n in norm_types: if isinstance(m, n): m.eval() continue def _init_conv(self, conv): nn.init.kaiming_uniform_( conv.weight, a=0, mode='fan_in', nonlinearity='relu') if conv.bias is not None: nn.init.constant_(conv.bias, 0) def _init_norm(self, norm): if norm.weight is not None: nn.init.constant_(norm.weight, 1) nn.init.constant_(norm.bias, 0) def _apply(self, fn): super(MODNet, self)._apply(fn) blurer._apply(fn) # let blurer's device same as modnet return self