# Copyright 2023 Bingxin Ke, ETH Zurich. All rights reserved. # Last modified: 2024-05-24 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # -------------------------------------------------------------------------- # If you find this code useful, we kindly ask you to cite our paper in your work. # Please find bibtex at: https://github.com/prs-eth/Marigold#-citation # More information about the method can be found at https://marigoldmonodepth.github.io # -------------------------------------------------------------------------- from typing import Any, Callable, Dict, List, Optional, Union import logging from diffusers.image_processor import VaeImageProcessor import pdb from diffusers.utils import load_image, make_image_grid from typing import Dict, Optional, Union import torchvision.transforms as transforms import PIL.Image import numpy as np import torch from diffusers import ( AutoencoderKL, DDIMScheduler, DiffusionPipeline, LCMScheduler, UNet2DConditionModel, ) from .duplicate_unet import DoubleUNet2DConditionModel import os from torch.nn import Conv2d from PIL import Image, ImageDraw, ImageFont from torch.nn.parameter import Parameter from diffusers.utils import BaseOutput from PIL import Image from torch.utils.data import DataLoader, TensorDataset from torchvision.transforms import InterpolationMode from torchvision.transforms.functional import pil_to_tensor, resize from tqdm.auto import tqdm from transformers import CLIPTextModel, CLIPTokenizer, CLIPTextModelWithProjection from .util.batchsize import find_batch_size from .util.ensemble import ensemble_depth from .util.image_util import ( chw2hwc, colorize_depth_maps, get_tv_resample_method, resize_max_res, ) def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0): """ Rescale `noise_cfg` according to `guidance_rescale`. Based on findings of [Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). See Section 3.4 """ std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True) std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True) # rescale the results from guidance (fixes overexposure) noise_pred_rescaled = noise_cfg * (std_text / std_cfg) # mix with the original results from guidance by factor guidance_rescale to avoid "plain looking" images noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg return noise_cfg class MarigoldDepthOutput(BaseOutput): """ Output class for Marigold monocular depth prediction pipeline. Args: depth_np (`np.ndarray`): Predicted depth map, with depth values in the range of [0, 1]. depth_colored (`PIL.Image.Image`): Colorized depth map, with the shape of [3, H, W] and values in [0, 1]. uncertainty (`None` or `np.ndarray`): Uncalibrated uncertainty(MAD, median absolute deviation) coming from ensembling. """ depth_np: np.ndarray depth_colored: Union[None, Image.Image] uncertainty: Union[None, np.ndarray] class MarigoldPipeline(DiffusionPipeline): """ Pipeline for monocular depth estimation using Marigold: https://marigoldmonodepth.github.io. This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) Args: unet (`UNet2DConditionModel`): Conditional U-Net to denoise the depth latent, conditioned on image latent. vae (`AutoencoderKL`): Variational Auto-Encoder (VAE) Model to encode and decode images and depth maps to and from latent representations. scheduler (`DDIMScheduler`): A scheduler to be used in combination with `unet` to denoise the encoded image latents. text_encoder (`CLIPTextModel`): Text-encoder, for empty text embedding. tokenizer (`CLIPTokenizer`): CLIP tokenizer. scale_invariant (`bool`, *optional*): A model property specifying whether the predicted depth maps are scale-invariant. This value must be set in the model config. When used together with the `shift_invariant=True` flag, the model is also called "affine-invariant". NB: overriding this value is not supported. shift_invariant (`bool`, *optional*): A model property specifying whether the predicted depth maps are shift-invariant. This value must be set in the model config. When used together with the `scale_invariant=True` flag, the model is also called "affine-invariant". NB: overriding this value is not supported. default_denoising_steps (`int`, *optional*): The minimum number of denoising diffusion steps that are required to produce a prediction of reasonable quality with the given model. This value must be set in the model config. When the pipeline is called without explicitly setting `num_inference_steps`, the default value is used. This is required to ensure reasonable results with various model flavors compatible with the pipeline, such as those relying on very short denoising schedules (`LCMScheduler`) and those with full diffusion schedules (`DDIMScheduler`). default_processing_resolution (`int`, *optional*): The recommended value of the `processing_resolution` parameter of the pipeline. This value must be set in the model config. When the pipeline is called without explicitly setting `processing_resolution`, the default value is used. This is required to ensure reasonable results with various model flavors trained with varying optimal processing resolution values. """ rgb_latent_scale_factor = 0.18215 depth_latent_scale_factor = 0.18215 def __init__( self, unet: DoubleUNet2DConditionModel, vae: AutoencoderKL, scheduler: Union[DDIMScheduler, LCMScheduler], text_encoder: CLIPTextModel, tokenizer: CLIPTokenizer, scale_invariant: Optional[bool] = True, shift_invariant: Optional[bool] = True, default_denoising_steps: Optional[int] = None, default_processing_resolution: Optional[int] = None, requires_safety_checker: bool = False, ): super().__init__() self.register_modules( unet=unet, vae=vae, scheduler=scheduler, text_encoder=text_encoder, tokenizer=tokenizer, ) self.register_to_config( scale_invariant=scale_invariant, shift_invariant=shift_invariant, default_denoising_steps=default_denoising_steps, default_processing_resolution=default_processing_resolution, ) self.scale_invariant = scale_invariant self.shift_invariant = shift_invariant self.default_denoising_steps = default_denoising_steps self.default_processing_resolution = default_processing_resolution self.empty_text_embed = None self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1) self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) self.register_to_config(requires_safety_checker=requires_safety_checker) self.separate_list = [0,0] @torch.no_grad() def __call__( self, input_image: Union[Image.Image, torch.Tensor], denoising_steps: Optional[int] = None, ensemble_size: int = 5, processing_res: Optional[int] = None, match_input_res: bool = True, resample_method: str = "bilinear", batch_size: int = 0, generator: Union[torch.Generator, None] = None, color_map: str = "Spectral", show_progress_bar: bool = True, ensemble_kwargs: Dict = None, ) -> MarigoldDepthOutput: """ Function invoked when calling the pipeline. Args: input_image (`Image`): Input RGB (or gray-scale) image. denoising_steps (`int`, *optional*, defaults to `None`): Number of denoising diffusion steps during inference. The default value `None` results in automatic selection. The number of steps should be at least 10 with the full Marigold models, and between 1 and 4 for Marigold-LCM models. ensemble_size (`int`, *optional*, defaults to `10`): Number of predictions to be ensembled. processing_res (`int`, *optional*, defaults to `None`): Effective processing resolution. When set to `0`, processes at the original image resolution. This produces crisper predictions, but may also lead to the overall loss of global context. The default value `None` resolves to the optimal value from the model config. match_input_res (`bool`, *optional*, defaults to `True`): Resize depth prediction to match input resolution. Only valid if `processing_res` > 0. resample_method: (`str`, *optional*, defaults to `bilinear`): Resampling method used to resize images and depth predictions. This can be one of `bilinear`, `bicubic` or `nearest`, defaults to: `bilinear`. batch_size (`int`, *optional*, defaults to `0`): Inference batch size, no bigger than `num_ensemble`. If set to 0, the script will automatically decide the proper batch size. generator (`torch.Generator`, *optional*, defaults to `None`) Random generator for initial noise generation. show_progress_bar (`bool`, *optional*, defaults to `True`): Display a progress bar of diffusion denoising. color_map (`str`, *optional*, defaults to `"Spectral"`, pass `None` to skip colorized depth map generation): Colormap used to colorize the depth map. scale_invariant (`str`, *optional*, defaults to `True`): Flag of scale-invariant prediction, if True, scale will be adjusted from the raw prediction. shift_invariant (`str`, *optional*, defaults to `True`): Flag of shift-invariant prediction, if True, shift will be adjusted from the raw prediction, if False, near plane will be fixed at 0m. ensemble_kwargs (`dict`, *optional*, defaults to `None`): Arguments for detailed ensembling settings. Returns: `MarigoldDepthOutput`: Output class for Marigold monocular depth prediction pipeline, including: - **depth_np** (`np.ndarray`) Predicted depth map, with depth values in the range of [0, 1] - **depth_colored** (`PIL.Image.Image`) Colorized depth map, with the shape of [3, H, W] and values in [0, 1], None if `color_map` is `None` - **uncertainty** (`None` or `np.ndarray`) Uncalibrated uncertainty(MAD, median absolute deviation) coming from ensembling. None if `ensemble_size = 1` """ # Model-specific optimal default values leading to fast and reasonable results. if denoising_steps is None: denoising_steps = self.default_denoising_steps if processing_res is None: processing_res = self.default_processing_resolution assert processing_res >= 0 assert ensemble_size >= 1 # Check if denoising step is reasonable self._check_inference_step(denoising_steps) resample_method: InterpolationMode = get_tv_resample_method(resample_method) # ----------------- Image Preprocess ----------------- # Convert to torch tensor if isinstance(input_image, Image.Image): input_image = input_image.convert("RGB") # convert to torch tensor [H, W, rgb] -> [rgb, H, W] rgb = pil_to_tensor(input_image) rgb = rgb.unsqueeze(0) # [1, rgb, H, W] elif isinstance(input_image, torch.Tensor): rgb = input_image else: raise TypeError(f"Unknown input type: {type(input_image) = }") input_size = rgb.shape assert ( 4 == rgb.dim() and 3 == input_size[-3] ), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" # Resize image if processing_res > 0: rgb = resize_max_res( rgb, max_edge_resolution=processing_res, resample_method=resample_method, ) # Normalize rgb values rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 # [0, 255] -> [-1, 1] rgb_norm = rgb_norm.to(self.dtype) assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 # ----------------- Predicting depth ----------------- # Batch repeated input image duplicated_rgb = rgb_norm.expand(ensemble_size, -1, -1, -1) single_rgb_dataset = TensorDataset(duplicated_rgb) if batch_size > 0: _bs = batch_size else: _bs = find_batch_size( ensemble_size=ensemble_size, input_res=max(rgb_norm.shape[1:]), dtype=self.dtype, ) single_rgb_loader = DataLoader( single_rgb_dataset, batch_size=_bs, shuffle=False ) # Predict depth maps (batched) depth_pred_ls = [] if show_progress_bar: iterable = tqdm( single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False ) else: iterable = single_rgb_loader for batch in iterable: (batched_img,) = batch depth_pred_raw = self.single_infer( rgb_in=batched_img, num_inference_steps=denoising_steps, show_pbar=show_progress_bar, generator=generator, ) depth_pred_ls.append(depth_pred_raw.detach()) depth_preds = torch.concat(depth_pred_ls, dim=0) torch.cuda.empty_cache() # clear vram cache for ensembling # ----------------- Test-time ensembling ----------------- if ensemble_size > 1: depth_pred, pred_uncert = ensemble_depth( depth_preds, scale_invariant=self.scale_invariant, shift_invariant=self.shift_invariant, max_res=50, **(ensemble_kwargs or {}), ) else: depth_pred = depth_preds pred_uncert = None # Resize back to original resolution if match_input_res: depth_pred = resize( depth_pred, input_size[-2:], interpolation=resample_method, antialias=True, ) # Convert to numpy depth_pred = depth_pred.squeeze() depth_pred = depth_pred.cpu().numpy() if pred_uncert is not None: pred_uncert = pred_uncert.squeeze().cpu().numpy() # Clip output range depth_pred = depth_pred.clip(0, 1) # Colorize if color_map is not None: depth_colored = colorize_depth_maps( depth_pred, 0, 1, cmap=color_map ).squeeze() # [3, H, W], value in (0, 1) depth_colored = (depth_colored * 255).astype(np.uint8) depth_colored_hwc = chw2hwc(depth_colored) depth_colored_img = Image.fromarray(depth_colored_hwc) else: depth_colored_img = None return MarigoldDepthOutput( depth_np=depth_pred, depth_colored=depth_colored_img, uncertainty=pred_uncert, ) def _replace_unet_conv_in(self): # replace the first layer to accept 8 in_channels _weight = self.unet.conv_in.weight.clone() # [320, 4, 3, 3] _bias = self.unet.conv_in.bias.clone() # [320] zero_weight = torch.zeros(_weight.shape).to(_weight.device) _weight = torch.cat([_weight, zero_weight], dim=1) # _weight = _weight.repeat((1, 2, 1, 1)) # Keep selected channel(s) # half the activation magnitude # _weight *= 0.5 # new conv_in channel _n_convin_out_channel = self.unet.conv_in.out_channels _new_conv_in = Conv2d( 8, _n_convin_out_channel, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1) ) _new_conv_in.weight = Parameter(_weight) _new_conv_in.bias = Parameter(_bias) self.unet.conv_in = _new_conv_in logging.info("Unet conv_in layer is replaced") # replace config self.unet.config["in_channels"] = 8 logging.info("Unet config is updated") return def _replace_unet_conv_out(self): # replace the first layer to accept 8 in_channels _weight = self.unet.conv_out.weight.clone() # [8, 320, 3, 3] _bias = self.unet.conv_out.bias.clone() # [320] _weight = _weight.repeat((2, 1, 1, 1)) # Keep selected channel(s) _bias = _bias.repeat((2)) # half the activation magnitude # new conv_in channel _n_convin_out_channel = self.unet.conv_out.out_channels _new_conv_out = Conv2d( _n_convin_out_channel, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1) ) _new_conv_out.weight = Parameter(_weight) _new_conv_out.bias = Parameter(_bias) self.unet.conv_out = _new_conv_out logging.info("Unet conv_out layer is replaced") # replace config self.unet.config["out_channels"] = 8 logging.info("Unet config is updated") return def _check_inference_step(self, n_step: int) -> None: """ Check if denoising step is reasonable Args: n_step (`int`): denoising steps """ assert n_step >= 1 # if isinstance(self.scheduler, DDIMScheduler): # if n_step < 10: # logging.warning( # f"Too few denoising steps: {n_step}. Recommended to use the LCM checkpoint for few-step inference." # ) # elif isinstance(self.scheduler, LCMScheduler): # if not 1 <= n_step <= 4: # logging.warning( # f"Non-optimal setting of denoising steps: {n_step}. Recommended setting is 1-4 steps." # ) # else: # raise RuntimeError(f"Unsupported scheduler type: {type(self.scheduler)}") def encode_empty_text(self): """ Encode text embedding for empty prompt """ prompt = "" text_inputs = self.tokenizer( prompt, padding="max_length", max_length=self.tokenizer.model_max_length, truncation=True, return_tensors="pt", ) text_input_ids = text_inputs.input_ids.to(self.text_encoder.device) self.empty_text_embed = self.text_encoder(text_input_ids)[0].to(self.dtype) def encode_text(self, prompt): """ Encode text embedding for empty prompt """ text_inputs = self.tokenizer( prompt, padding="max_length", max_length=self.tokenizer.model_max_length, truncation=True, return_tensors="pt", ) text_input_ids = text_inputs.input_ids.to(self.text_encoder.device) text_embed = self.text_encoder(text_input_ids)[0].to(self.dtype) return text_embed def numpy_to_pil(self, images: np.ndarray) -> PIL.Image.Image: """ Convert a numpy image or a batch of images to a PIL image. """ if images.ndim == 3: images = images[None, ...] images = (images * 255).round().astype("uint8") if images.shape[-1] == 1: # special case for grayscale (single channel) images pil_images = [Image.fromarray(image.squeeze(), mode="L") for image in images] else: pil_images = [Image.fromarray(image) for image in images] return pil_images @torch.no_grad() def generate_rgbd( self, prompt: str or list, num_inference_steps: int, generator: Union[torch.Generator, None], show_pbar: bool = None, color_map: str = "Spectral", height: int = 60, width: int = 80 ): """ Perform an individual depth prediction without ensembling. Args: rgb_in (`torch.Tensor`): Input RGB image. num_inference_steps (`int`): Number of diffusion denoisign steps (DDIM) during inference. show_pbar (`bool`): Display a progress bar of diffusion denoising. generator (`torch.Generator`) Random generator for initial noise generation. Returns: `torch.Tensor`: Predicted depth map. """ device = self.device ori_type = self.dtype # Set timesteps self.scheduler.set_timesteps(num_inference_steps, device=device) timesteps = self.scheduler.timesteps # [T] if isinstance(prompt, list): bs = len(prompt) batch_text_embed = [] for p in prompt: batch_text_embed.append(self.encode_text(p)) batch_text_embed = torch.cat(batch_text_embed, dim=0) elif isinstance(prompt, str): bs = 1 batch_text_embed = self.encode_text(prompt).unsqueeze(0) else: raise NotImplementedError if self.empty_text_embed is None: self.encode_empty_text() batch_empty_text_embed = self.empty_text_embed.repeat( (batch_text_embed.shape[0], 1, 1) ).to(device) # [B, 2, 1024] text_embed = torch.cat([batch_empty_text_embed, batch_text_embed], dim=0) # Initial depth map (noise) cat_latent = torch.randn( [bs, self.unet.config["in_channels"], height, width], device=device, dtype=torch.bfloat16, generator=generator, ) * self.scheduler.init_noise_sigma # [B, 8, h, w] # Denoising loop if show_pbar: iterable = tqdm( enumerate(timesteps), total=len(timesteps), leave=False, desc=" " * 4 + "Diffusion denoising", ) else: iterable = enumerate(timesteps) self.to(torch.bfloat16) for i, t in iterable: latent_model_input = torch.cat([cat_latent] * 2) latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) # predict the noise residual with torch.no_grad(): noise_pred = self.unet( latent_model_input, t, t, encoder_hidden_states=text_embed.to(torch.bfloat16), return_dict=False, # separate_list=self.separate_list )[0] # perform guidance guidance_scale = 7.5 noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) # compute the previous noisy sample x_t -> x_t-1 cat_latent = self.scheduler.step(noise_pred, t, cat_latent).prev_sample # self.unet.to(default_dtype) # cat_latent.to(default_dtype) image = self.decode_image(cat_latent[:, 0:4, :, :]) image = self.numpy_to_pil(image) # depth_pred = depth depth = self.decode_depth(cat_latent[:, 4:, :, :]) depth = (depth - depth.min()) / (depth.max() - depth.min()) # depth = torch.clip(depth, -1.0, 1.0) # depth = (depth + 1.0) / 2.0 depth_pred = depth.squeeze() depth_pred = depth_pred.float().cpu().numpy() depth_pred = depth_pred.clip(0, 1) # Colorize if color_map is not None: depth_colored_img = [] depth_colored = colorize_depth_maps( depth_pred, 0, 1, cmap=color_map ).squeeze() # [3, H, W], value in (0, 1) depth_colored_img = self.numpy_to_pil(np.transpose(depth_colored, (0, 2, 3, 1))) else: depth_colored_img = None rgbd_images = self.post_process_rgbd(prompt, image, depth_colored_img) self.to(ori_type) return rgbd_images @torch.no_grad() def image2depth(self, input_image: Union[Image.Image, torch.Tensor], denoising_steps: Optional[int] = None, ensemble_size: int = 5, processing_res: Optional[int] = None, match_input_res: bool = True, resample_method: str = "bilinear", batch_size: int = 0, generator: Union[torch.Generator, None] = None, color_map: str = "Spectral", show_progress_bar: bool = True, ensemble_kwargs: Dict = None, cfg_scale: float = 1.0 ): # Model-specific optimal default values leading to fast and reasonable results. if denoising_steps is None: denoising_steps = self.default_denoising_steps if processing_res is None: processing_res = self.default_processing_resolution ori_type = self.dtype self.to(torch.bfloat16) assert processing_res >= 0 assert ensemble_size >= 1 # Check if denoising step is reasonable self._check_inference_step(denoising_steps) resample_method: InterpolationMode = get_tv_resample_method(resample_method) # ----------------- Image Preprocess ----------------- # Convert to torch tensor if isinstance(input_image, Image.Image): input_image = input_image.convert("RGB") # convert to torch tensor [H, W, rgb] -> [rgb, H, W] rgb = pil_to_tensor(input_image) rgb = rgb.unsqueeze(0) # [1, rgb, H, W] elif isinstance(input_image, torch.Tensor): rgb = input_image else: raise TypeError(f"Unknown input type: {type(input_image) = }") input_size = rgb.shape assert ( 4 == rgb.dim() and 3 == input_size[-3] ), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" # Resize image if processing_res > 0: rgb = resize_max_res( rgb, max_edge_resolution=processing_res, resample_method=resample_method, ) # Normalize rgb values rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 # [0, 255] -> [-1, 1] rgb_norm = rgb_norm.to(self.dtype) assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 # ----------------- Predicting depth ----------------- # Batch repeated input image duplicated_rgb = rgb_norm.expand(ensemble_size, -1, -1, -1) single_rgb_dataset = TensorDataset(duplicated_rgb) if batch_size > 0: _bs = batch_size else: _bs = find_batch_size( ensemble_size=ensemble_size, input_res=max(rgb_norm.shape[1:]), dtype=self.dtype, ) single_rgb_loader = DataLoader( single_rgb_dataset, batch_size=_bs, shuffle=False ) # Predict depth maps (batched) depth_pred_ls = [] if show_progress_bar: iterable = tqdm( single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False ) else: iterable = single_rgb_loader for batch in iterable: (batched_img,) = batch depth_pred_raw = self.single_image2depth( rgb_in=batched_img, num_inference_steps=denoising_steps, show_pbar=show_progress_bar, generator=generator, cfg_scale=cfg_scale ) depth_pred_ls.append(depth_pred_raw.detach()) depth_preds = torch.concat(depth_pred_ls, dim=0) torch.cuda.empty_cache() # clear vram cache for ensembling depth_preds = depth_preds.to(torch.float32) # ----------------- Test-time ensembling ----------------- if ensemble_size > 1: depth_pred, pred_uncert = ensemble_depth( depth_preds, scale_invariant=self.scale_invariant, shift_invariant=self.shift_invariant, max_res=50, **(ensemble_kwargs or {}), ) else: depth_pred = depth_preds pred_uncert = None # Resize back to original resolution if match_input_res: depth_pred = resize( depth_pred, input_size[-2:], interpolation=resample_method, antialias=True, ) # Convert to numpy depth_pred = depth_pred.squeeze() depth_pred = depth_pred.cpu().numpy() if pred_uncert is not None: pred_uncert = pred_uncert.squeeze().cpu().numpy() # Clip output range depth_pred = depth_pred.clip(0, 1) # Colorize if color_map is not None: depth_colored = colorize_depth_maps( depth_pred, 0, 1, cmap=color_map ).squeeze() # [3, H, W], value in (0, 1) depth_colored = (depth_colored * 255).astype(np.uint8) depth_colored_hwc = chw2hwc(depth_colored) depth_colored_img = Image.fromarray(depth_colored_hwc) else: depth_colored_img = None self.to(ori_type) return MarigoldDepthOutput( depth_np=depth_pred, depth_colored=depth_colored_img, uncertainty=pred_uncert, ) @torch.no_grad() def single_image2depth( self, rgb_in: torch.Tensor, num_inference_steps: int, generator: Union[torch.Generator, None], show_pbar: bool, cfg_scale: float = 1.0 ) -> torch.Tensor: """ Perform an individual depth prediction without ensembling. Args: rgb_in (`torch.Tensor`): Input RGB image. num_inference_steps (`int`): Number of diffusion denoisign steps (DDIM) during inference. show_pbar (`bool`): Display a progress bar of diffusion denoising. generator (`torch.Generator`) Random generator for initial noise generation. Returns: `torch.Tensor`: Predicted depth map. """ device = self.device rgb_in = rgb_in.to(device) # Set timesteps self.scheduler.set_timesteps(num_inference_steps, device=device) timesteps = self.scheduler.timesteps # [T] # Encode image rgb_latent = self.encode_rgb(rgb_in) # Initial depth map (noise) depth_latent = torch.randn( rgb_latent.shape, device=device, dtype=self.dtype, generator=generator, ) * self.scheduler.init_noise_sigma # [B, 4, h, w] # Batched empty text embedding if self.empty_text_embed is None: self.encode_empty_text() batch_empty_text_embed = self.empty_text_embed.repeat( (rgb_latent.shape[0], 1, 1) ).to(device).to(self.dtype) # [B, 2, 1024] # Denoising loop if show_pbar: iterable = tqdm( enumerate(timesteps), total=len(timesteps), leave=False, desc=" " * 4 + "Diffusion denoising", ) else: iterable = enumerate(timesteps) for i, t in iterable: unet_input = torch.cat( [rgb_latent, depth_latent], dim=1 ) # this order is important # predict the noise residual noise_pred = self.unet( unet_input, rgb_timestep=0, depth_timestep=t, encoder_hidden_states=batch_empty_text_embed ).sample # [B, 4, h, w] if cfg_scale > 1: uncond_noise_pred = self.unet( unet_input, rgb_timestep=0, depth_timestep=t, encoder_hidden_states=batch_empty_text_embed, rgb2depth_scale=0.3 ).sample # [B, 4, h, w] uncond_pred = uncond_noise_pred[:, 4:, :, :] cond_pred = noise_pred[:, 4:, :, :] cond_pred = uncond_pred + cfg_scale * (cond_pred - uncond_pred) else: cond_pred = noise_pred[:, 4:, :, :] # compute the previous noisy sample x_t -> x_t-1 depth_latent = self.scheduler.step( cond_pred, t, depth_latent ).prev_sample depth = self.decode_depth(depth_latent) # clip prediction depth = torch.clip(depth, -1.0, 1.0) # shift to [0, 1] depth = (depth + 1.0) / 2.0 return depth @torch.no_grad() def rgbd2rgbd(self, input_image:[torch.Tensor, PIL.Image.Image], depth_image:[torch.Tensor, PIL.Image.Image], prompt: str = '', guidance_scale: float = 7.5, strength: float = 0.75, generator: Union[torch.Generator, None] = None, num_inference_steps: int = 50, show_pbar: bool = False, resample_method: str = "bilinear", processing_res: int = 768 ) -> torch.Tensor: self._check_inference_step(num_inference_steps) resample_method: InterpolationMode = get_tv_resample_method(resample_method) # ----------------- encoder prompt ----------------- if isinstance(prompt, list): bs = len(prompt) batch_text_embed = [] for p in prompt: batch_text_embed.append(self.encode_text(p)) batch_text_embed = torch.cat(batch_text_embed, dim=0) elif isinstance(prompt, str): bs = 1 batch_text_embed = self.encode_text(prompt).unsqueeze(0) else: raise NotImplementedError if self.empty_text_embed is None: self.encode_empty_text() batch_empty_text_embed = self.empty_text_embed.repeat( (batch_text_embed.shape[0], 1, 1) ).to(self.device) # [B, 2, 1024] text_embed = torch.cat([batch_empty_text_embed, batch_text_embed], dim=0) # ----------------- Image Preprocess ----------------- # Convert to torch tensor rgb = input_image input_size = rgb.shape assert ( 4 == rgb.dim() and 3 == input_size[-3] ), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" if processing_res > 0: rgb = resize_max_res( rgb, max_edge_resolution=processing_res, resample_method=resample_method, ) rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 # [0, 255] -> [-1, 1] rgb_in = rgb_norm.to(self.dtype).to(self.device) assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 depth = depth_image depth = depth.repeat(1, 3, 1, 1) input_size = depth.shape assert ( 4 == depth.dim() and 3 == input_size[-3] ), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" if processing_res > 0: depth = resize_max_res( depth, max_edge_resolution=processing_res, resample_method=resample_method, ) depth_norm: torch.Tensor = (depth - depth.min()) / (depth.max() - depth.min()) * 2.0 - 1.0 # [0, 255] -> [-1, 1] depth_in = depth_norm.to(self.dtype).to(self.device) assert depth_norm.min() >= -1.0 and depth_norm.max() <= 1.0 # Set timesteps self.scheduler.set_timesteps(num_inference_steps, device=self.device) init_timestep = min(int(num_inference_steps * strength), num_inference_steps) t_start = max(num_inference_steps - init_timestep, 0) timesteps = self.scheduler.timesteps[t_start * self.scheduler.order:] num_inference_steps = num_inference_steps - t_start latent_timestep = timesteps[:1] # Encode depth rgb_latent = self.encode_rgb(rgb_in) depth_latent = self.encode_depth(depth_in) input_latent = torch.cat([rgb_latent, depth_latent], dim=1) noise = torch.randn( input_latent.shape, device=self.device, dtype=self.dtype, generator=generator, ) cat_latent = self.scheduler.add_noise(input_latent, noise, latent_timestep) # noisy_latent = self.scheduler.add_noise(rgb_latent, noise, latent_timestep) # cat_latent = torch.cat([noisy_latent, depth_latent], dim=1) for i, t in enumerate(timesteps): latent_model_input = torch.cat([cat_latent] * 2) latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) # predict the noise residual with torch.no_grad(): noise_pred = self.unet( latent_model_input, rgb_timestep=t, depth_timestep=t, encoder_hidden_states=text_embed, return_dict=False, # separate_list=self.separate_list )[0] # perform guidance noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) # compute the previous noisy sample x_t -> x_t-1 cat_latent = self.scheduler.step(noise_pred, t, cat_latent).prev_sample image = self.decode_image(cat_latent[:, :4, :, :]) image = self.numpy_to_pil(image) d_image = self.decode_depth(cat_latent[:, 4:, :, :]) d_image = d_image.cpu().permute(0, 2, 3, 1).numpy() for i in range(len(prompt)): d_image[i] = (d_image[i] - d_image[i].min()) / (d_image[i].max() - d_image[i].min()) d_image = self.numpy_to_pil(d_image) cat_image = make_image_grid([image[0], d_image[0]], rows=1, cols=2) return cat_image @torch.no_grad() def single_depth2image( self, depth_image: [torch.Tensor, PIL.Image.Image], prompt: str = '', generator: Union[torch.Generator, None] = None, num_inference_steps: int = 50, show_pbar: bool = False, resample_method: str = "bilinear", processing_res: int = 640 ) -> torch.Tensor: """ Perform an individual depth prediction without ensembling. Args: rgb_in (`torch.Tensor`): Input RGB image. num_inference_steps (`int`): Number of diffusion denoisign steps (DDIM) during inference. show_pbar (`bool`): Display a progress bar of diffusion denoising. generator (`torch.Generator`) Random generator for initial noise generation. Returns: `torch.Tensor`: Predicted depth map. """ device = self.device ori_type = self.dtype # Check if denoising step is reasonable self._check_inference_step(num_inference_steps) resample_method: InterpolationMode = get_tv_resample_method(resample_method) # ----------------- Image Preprocess ----------------- # Convert to torch tensor if isinstance(depth_image, Image.Image): depth = pil_to_tensor(depth_image) depth = depth.unsqueeze(0) # [1, rgb, H, W] elif isinstance(depth_image, torch.Tensor): depth = depth_image else: raise TypeError(f"Unknown input type: {type(depth_image) = }") depth = depth.repeat(1, 3, 1, 1) input_size = depth.shape assert ( 4 == depth.dim() and 3 == input_size[-3] ), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" # Resize image if processing_res > 0: depth = resize_max_res( depth, max_edge_resolution=processing_res, resample_method=resample_method, ) # Normalize rgb values depth_norm: torch.Tensor = depth / 255.0 * 2.0 - 1.0 # [0, 255] -> [-1, 1] assert depth_norm.min() >= -1.0 and depth_norm.max() <= 1.0 depth_in = depth_norm.to(ori_type).to(device) # Set timesteps self.scheduler.set_timesteps(num_inference_steps, device=device) timesteps = self.scheduler.timesteps # [T] # Encode depth depth_latent = self.encode_depth(depth_in) # Initial rgb map (noise) rgb_latent = torch.randn( depth_latent.shape, device=device, dtype=ori_type, generator=generator, ) * self.scheduler.init_noise_sigma # [B, 4, h, w] # encode text input_ids if isinstance(prompt, list): bs = len(prompt) batch_text_embed = [] for p in prompt: batch_text_embed.append(self.encode_text(p)) batch_text_embed = torch.cat(batch_text_embed, dim=0) elif isinstance(prompt, str): bs = 1 batch_text_embed = self.encode_text(prompt) else: raise NotImplementedError if self.empty_text_embed is None: self.encode_empty_text() batch_empty_text_embed = self.empty_text_embed.repeat((batch_text_embed.shape[0], 1, 1)).to(device) # [B, 2, 1024] text_embed = torch.cat([batch_empty_text_embed, batch_text_embed], dim=0) # Denoising loop if show_pbar: iterable = tqdm( enumerate(timesteps), total=len(timesteps), leave=False, desc=" " * 4 + "Diffusion denoising", ) else: iterable = enumerate(timesteps) self.unet.to(torch.bfloat16) for i, t in iterable: cat_latent = torch.cat( [rgb_latent, depth_latent], dim=1 ) # this order is important latent_model_input = torch.cat([cat_latent] * 2) # predict the noise residual with torch.no_grad(): noise_pred = self.unet( latent_model_input.to(torch.bfloat16), rgb_timestep=t, depth_timestep=0, encoder_hidden_states=text_embed.to(torch.bfloat16), return_dict=False, )[0] # perform guidance guidance_scale = 7.5 noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) # compute the previous noisy sample x_t -> x_t-1 rgb_latent = self.scheduler.step(noise_pred[:, :4, :, :], t, rgb_latent).prev_sample image = self.decode_image(rgb_latent) image = self.numpy_to_pil(image)[0] image = image.resize((input_size[-1], input_size[-2]), Image.BILINEAR) self.unet.to(ori_type) return image def encode_rgb(self, rgb_in: torch.Tensor) -> torch.Tensor: """ Encode RGB image into latent. Args: rgb_in (`torch.Tensor`): Input RGB image to be encoded. Returns: `torch.Tensor`: Image latent. """ # encode h = self.vae.encoder(rgb_in) moments = self.vae.quant_conv(h) mean, logvar = torch.chunk(moments, 2, dim=1) # scale latent rgb_latent = mean * self.rgb_latent_scale_factor return rgb_latent def encode_depth(self, depth_in: torch.Tensor) -> torch.Tensor: """ Encode RGB image into latent. Args: rgb_in (`torch.Tensor`): Input RGB image to be encoded. Returns: `torch.Tensor`: Image latent. """ # encode h = self.vae.encoder(depth_in) moments = self.vae.quant_conv(h) mean, logvar = torch.chunk(moments, 2, dim=1) # scale latent depth_latent = mean * self.depth_latent_scale_factor return depth_latent def decode_image(self, latents): latents = 1 / self.vae.config.scaling_factor * latents z = self.vae.post_quant_conv(latents) image = self.vae.decoder(z) image = (image / 2 + 0.5).clamp(0, 1) # we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16 image = image.cpu().permute(0, 2, 3, 1).float().numpy() return image def decode_depth(self, depth_latent: torch.Tensor) -> torch.Tensor: """ Decode depth latent into depth map. Args: depth_latent (`torch.Tensor`): Depth latent to be decoded. Returns: `torch.Tensor`: Decoded depth map. """ # scale latent depth_latent = depth_latent / self.depth_latent_scale_factor # decode z = self.vae.post_quant_conv(depth_latent) stacked = self.vae.decoder(z) # mean of output channels depth_mean = stacked.mean(dim=1, keepdim=True) return depth_mean def post_process_rgbd(self, prompts, rgb_image, depth_image): rgbd_images = [] for idx, p in enumerate(prompts): image1, image2 = rgb_image[idx], depth_image[idx] width1, height1 = image1.size width2, height2 = image2.size font = ImageFont.load_default(size=20) text = p draw = ImageDraw.Draw(image1) text_bbox = draw.textbbox((0, 0), text, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] new_image = Image.new('RGB', (width1 + width2, max(height1, height2) + text_height), (255, 255, 255)) text_x = (new_image.width - text_width) // 2 text_y = 0 draw = ImageDraw.Draw(new_image) draw.text((text_x, text_y), text, fill="black", font=font) new_image.paste(image1, (0, text_height)) new_image.paste(image2, (width1, text_height)) rgbd_images.append(pil_to_tensor(new_image)) return rgbd_images