Inpaint / marigold /marigold_xl_pipeline.py
ZehanWang's picture
Upload folder using huggingface_hub
ebca029 verified
raw
history blame contribute delete
No virus
42.1 kB
# Copyright 2023 Bingxin Ke, ETH Zurich. All rights reserved.
# Last modified: 2024-05-24
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------
# If you find this code useful, we kindly ask you to cite our paper in your work.
# Please find bibtex at: https://github.com/prs-eth/Marigold#-citation
# More information about the method can be found at https://marigoldmonodepth.github.io
# --------------------------------------------------------------------------
from typing import Any, Callable, Dict, List, Optional, Union
import logging
from diffusers.image_processor import VaeImageProcessor
import pdb
from typing import Dict, Optional, Union
import torchvision.transforms as transforms
import PIL.Image
import numpy as np
import torch
from diffusers import (
AutoencoderKL,
DDIMScheduler,
DiffusionPipeline,
LCMScheduler,
UNet2DConditionModel,
)
from .duplicate_unet import DoubleUNet2DConditionModel
import os
from torch.nn import Conv2d
from PIL import Image, ImageDraw, ImageFont
from torch.nn.parameter import Parameter
from diffusers.utils import BaseOutput
from PIL import Image
from torch.utils.data import DataLoader, TensorDataset
from torchvision.transforms import InterpolationMode
from torchvision.transforms.functional import pil_to_tensor, resize
from tqdm.auto import tqdm
from transformers import CLIPTextModel, CLIPTokenizer, CLIPTextModelWithProjection
from .util.batchsize import find_batch_size
from .util.ensemble import ensemble_depth
from .util.image_util import (
chw2hwc,
colorize_depth_maps,
get_tv_resample_method,
resize_max_res,
)
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
"""
Rescale `noise_cfg` according to `guidance_rescale`. Based on findings of [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). See Section 3.4
"""
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
# rescale the results from guidance (fixes overexposure)
noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
# mix with the original results from guidance by factor guidance_rescale to avoid "plain looking" images
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
return noise_cfg
class MarigoldDepthOutput(BaseOutput):
"""
Output class for Marigold monocular depth prediction pipeline.
Args:
depth_np (`np.ndarray`):
Predicted depth map, with depth values in the range of [0, 1].
depth_colored (`PIL.Image.Image`):
Colorized depth map, with the shape of [3, H, W] and values in [0, 1].
uncertainty (`None` or `np.ndarray`):
Uncalibrated uncertainty(MAD, median absolute deviation) coming from ensembling.
"""
depth_np: np.ndarray
depth_colored: Union[None, Image.Image]
uncertainty: Union[None, np.ndarray]
class MarigoldXLPipeline(DiffusionPipeline):
"""
Pipeline for monocular depth estimation using Marigold: https://marigoldmonodepth.github.io.
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)
Args:
unet (`UNet2DConditionModel`):
Conditional U-Net to denoise the depth latent, conditioned on image latent.
vae (`AutoencoderKL`):
Variational Auto-Encoder (VAE) Model to encode and decode images and depth maps
to and from latent representations.
scheduler (`DDIMScheduler`):
A scheduler to be used in combination with `unet` to denoise the encoded image latents.
text_encoder (`CLIPTextModel`):
Text-encoder, for empty text embedding.
tokenizer (`CLIPTokenizer`):
CLIP tokenizer.
scale_invariant (`bool`, *optional*):
A model property specifying whether the predicted depth maps are scale-invariant. This value must be set in
the model config. When used together with the `shift_invariant=True` flag, the model is also called
"affine-invariant". NB: overriding this value is not supported.
shift_invariant (`bool`, *optional*):
A model property specifying whether the predicted depth maps are shift-invariant. This value must be set in
the model config. When used together with the `scale_invariant=True` flag, the model is also called
"affine-invariant". NB: overriding this value is not supported.
default_denoising_steps (`int`, *optional*):
The minimum number of denoising diffusion steps that are required to produce a prediction of reasonable
quality with the given model. This value must be set in the model config. When the pipeline is called
without explicitly setting `num_inference_steps`, the default value is used. This is required to ensure
reasonable results with various model flavors compatible with the pipeline, such as those relying on very
short denoising schedules (`LCMScheduler`) and those with full diffusion schedules (`DDIMScheduler`).
default_processing_resolution (`int`, *optional*):
The recommended value of the `processing_resolution` parameter of the pipeline. This value must be set in
the model config. When the pipeline is called without explicitly setting `processing_resolution`, the
default value is used. This is required to ensure reasonable results with various model flavors trained
with varying optimal processing resolution values.
"""
rgb_latent_scale_factor = 0.13025
depth_latent_scale_factor = 0.13025
def __init__(
self,
unet: DoubleUNet2DConditionModel,
vae: AutoencoderKL,
scheduler: Union[DDIMScheduler, LCMScheduler],
text_encoder: CLIPTextModel,
text_encoder_2: CLIPTextModelWithProjection,
tokenizer: CLIPTokenizer,
tokenizer_2: CLIPTokenizer,
scale_invariant: Optional[bool] = True,
shift_invariant: Optional[bool] = True,
default_denoising_steps: Optional[int] = None,
default_processing_resolution: Optional[int] = None,
requires_safety_checker: bool = False,
):
super().__init__()
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
scheduler=scheduler,
)
self.register_to_config(
scale_invariant=scale_invariant,
shift_invariant=shift_invariant,
default_denoising_steps=default_denoising_steps,
default_processing_resolution=default_processing_resolution,
)
self.scale_invariant = scale_invariant
self.shift_invariant = shift_invariant
self.default_denoising_steps = default_denoising_steps
self.default_processing_resolution = default_processing_resolution
self.empty_text_embed = None
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
self.register_to_config(requires_safety_checker=requires_safety_checker)
self.separate_list = [3,1,3]
self.default_sample_size = self.unet.config.sample_size
@torch.no_grad()
def __call__(
self,
input_image: Union[Image.Image, torch.Tensor],
denoising_steps: Optional[int] = None,
ensemble_size: int = 5,
processing_res: Optional[int] = None,
match_input_res: bool = True,
resample_method: str = "bilinear",
batch_size: int = 0,
generator: Union[torch.Generator, None] = None,
color_map: str = "Spectral",
show_progress_bar: bool = True,
ensemble_kwargs: Dict = None,
) -> MarigoldDepthOutput:
"""
Function invoked when calling the pipeline.
Args:
input_image (`Image`):
Input RGB (or gray-scale) image.
denoising_steps (`int`, *optional*, defaults to `None`):
Number of denoising diffusion steps during inference. The default value `None` results in automatic
selection. The number of steps should be at least 10 with the full Marigold models, and between 1 and 4
for Marigold-LCM models.
ensemble_size (`int`, *optional*, defaults to `10`):
Number of predictions to be ensembled.
processing_res (`int`, *optional*, defaults to `None`):
Effective processing resolution. When set to `0`, processes at the original image resolution. This
produces crisper predictions, but may also lead to the overall loss of global context. The default
value `None` resolves to the optimal value from the model config.
match_input_res (`bool`, *optional*, defaults to `True`):
Resize depth prediction to match input resolution.
Only valid if `processing_res` > 0.
resample_method: (`str`, *optional*, defaults to `bilinear`):
Resampling method used to resize images and depth predictions. This can be one of `bilinear`, `bicubic` or `nearest`, defaults to: `bilinear`.
batch_size (`int`, *optional*, defaults to `0`):
Inference batch size, no bigger than `num_ensemble`.
If set to 0, the script will automatically decide the proper batch size.
generator (`torch.Generator`, *optional*, defaults to `None`)
Random generator for initial noise generation.
show_progress_bar (`bool`, *optional*, defaults to `True`):
Display a progress bar of diffusion denoising.
color_map (`str`, *optional*, defaults to `"Spectral"`, pass `None` to skip colorized depth map generation):
Colormap used to colorize the depth map.
scale_invariant (`str`, *optional*, defaults to `True`):
Flag of scale-invariant prediction, if True, scale will be adjusted from the raw prediction.
shift_invariant (`str`, *optional*, defaults to `True`):
Flag of shift-invariant prediction, if True, shift will be adjusted from the raw prediction, if False, near plane will be fixed at 0m.
ensemble_kwargs (`dict`, *optional*, defaults to `None`):
Arguments for detailed ensembling settings.
Returns:
`MarigoldDepthOutput`: Output class for Marigold monocular depth prediction pipeline, including:
- **depth_np** (`np.ndarray`) Predicted depth map, with depth values in the range of [0, 1]
- **depth_colored** (`PIL.Image.Image`) Colorized depth map, with the shape of [3, H, W] and values in [0, 1], None if `color_map` is `None`
- **uncertainty** (`None` or `np.ndarray`) Uncalibrated uncertainty(MAD, median absolute deviation)
coming from ensembling. None if `ensemble_size = 1`
"""
# Model-specific optimal default values leading to fast and reasonable results.
if denoising_steps is None:
denoising_steps = self.default_denoising_steps
if processing_res is None:
processing_res = self.default_processing_resolution
assert processing_res >= 0
assert ensemble_size >= 1
# Check if denoising step is reasonable
self._check_inference_step(denoising_steps)
resample_method: InterpolationMode = get_tv_resample_method(resample_method)
# ----------------- Image Preprocess -----------------
# Convert to torch tensor
if isinstance(input_image, Image.Image):
input_image = input_image.convert("RGB")
# convert to torch tensor [H, W, rgb] -> [rgb, H, W]
rgb = pil_to_tensor(input_image)
rgb = rgb.unsqueeze(0) # [1, rgb, H, W]
elif isinstance(input_image, torch.Tensor):
rgb = input_image
else:
raise TypeError(f"Unknown input type: {type(input_image) = }")
input_size = rgb.shape
assert (
4 == rgb.dim() and 3 == input_size[-3]
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]"
# Resize image
if processing_res > 0:
rgb = resize_max_res(
rgb,
max_edge_resolution=processing_res,
resample_method=resample_method,
)
# Normalize rgb values
rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 # [0, 255] -> [-1, 1]
rgb_norm = rgb_norm.to(self.dtype)
assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0
# ----------------- Predicting depth -----------------
# Batch repeated input image
duplicated_rgb = rgb_norm.expand(ensemble_size, -1, -1, -1)
single_rgb_dataset = TensorDataset(duplicated_rgb)
if batch_size > 0:
_bs = batch_size
else:
_bs = find_batch_size(
ensemble_size=ensemble_size,
input_res=max(rgb_norm.shape[1:]),
dtype=self.dtype,
)
single_rgb_loader = DataLoader(
single_rgb_dataset, batch_size=_bs, shuffle=False
)
# Predict depth maps (batched)
depth_pred_ls = []
if show_progress_bar:
iterable = tqdm(
single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False
)
else:
iterable = single_rgb_loader
for batch in iterable:
(batched_img,) = batch
depth_pred_raw = self.single_infer(
rgb_in=batched_img,
num_inference_steps=denoising_steps,
show_pbar=show_progress_bar,
generator=generator,
)
depth_pred_ls.append(depth_pred_raw.detach())
depth_preds = torch.concat(depth_pred_ls, dim=0)
torch.cuda.empty_cache() # clear vram cache for ensembling
# ----------------- Test-time ensembling -----------------
if ensemble_size > 1:
depth_pred, pred_uncert = ensemble_depth(
depth_preds,
scale_invariant=self.scale_invariant,
shift_invariant=self.shift_invariant,
max_res=50,
**(ensemble_kwargs or {}),
)
else:
depth_pred = depth_preds
pred_uncert = None
# Resize back to original resolution
if match_input_res:
depth_pred = resize(
depth_pred,
input_size[-2:],
interpolation=resample_method,
antialias=True,
)
# Convert to numpy
depth_pred = depth_pred.squeeze()
depth_pred = depth_pred.cpu().numpy()
if pred_uncert is not None:
pred_uncert = pred_uncert.squeeze().cpu().numpy()
# Clip output range
depth_pred = depth_pred.clip(0, 1)
# Colorize
if color_map is not None:
depth_colored = colorize_depth_maps(
depth_pred, 0, 1, cmap=color_map
).squeeze() # [3, H, W], value in (0, 1)
depth_colored = (depth_colored * 255).astype(np.uint8)
depth_colored_hwc = chw2hwc(depth_colored)
depth_colored_img = Image.fromarray(depth_colored_hwc)
else:
depth_colored_img = None
return MarigoldDepthOutput(
depth_np=depth_pred,
depth_colored=depth_colored_img,
uncertainty=pred_uncert,
)
def _replace_unet_conv_in(self):
# replace the first layer to accept 8 in_channels
_weight = self.unet.conv_in.weight.clone() # [320, 4, 3, 3]
_bias = self.unet.conv_in.bias.clone() # [320]
zero_weight = torch.zeros(_weight.shape).to(_weight.device)
_weight = torch.cat([_weight, zero_weight], dim=1)
# _weight = _weight.repeat((1, 2, 1, 1)) # Keep selected channel(s)
# half the activation magnitude
# _weight *= 0.5
# new conv_in channel
_n_convin_out_channel = self.unet.conv_in.out_channels
_new_conv_in = Conv2d(
8, _n_convin_out_channel, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)
)
_new_conv_in.weight = Parameter(_weight)
_new_conv_in.bias = Parameter(_bias)
self.unet.conv_in = _new_conv_in
logging.info("Unet conv_in layer is replaced")
# replace config
self.unet.config["in_channels"] = 8
logging.info("Unet config is updated")
return
def _replace_unet_conv_out(self):
# replace the first layer to accept 8 in_channels
_weight = self.unet.conv_out.weight.clone() # [8, 320, 3, 3]
_bias = self.unet.conv_out.bias.clone() # [320]
_weight = _weight.repeat((2, 1, 1, 1)) # Keep selected channel(s)
_bias = _bias.repeat((2))
# half the activation magnitude
# new conv_in channel
_n_convin_out_channel = self.unet.conv_out.out_channels
_new_conv_out = Conv2d(
_n_convin_out_channel, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)
)
_new_conv_out.weight = Parameter(_weight)
_new_conv_out.bias = Parameter(_bias)
self.unet.conv_out = _new_conv_out
logging.info("Unet conv_out layer is replaced")
# replace config
self.unet.config["out_channels"] = 8
logging.info("Unet config is updated")
return
def _check_inference_step(self, n_step: int) -> None:
"""
Check if denoising step is reasonable
Args:
n_step (`int`): denoising steps
"""
assert n_step >= 1
if isinstance(self.scheduler, DDIMScheduler):
if n_step < 10:
logging.warning(
f"Too few denoising steps: {n_step}. Recommended to use the LCM checkpoint for few-step inference."
)
elif isinstance(self.scheduler, LCMScheduler):
if not 1 <= n_step <= 4:
logging.warning(
f"Non-optimal setting of denoising steps: {n_step}. Recommended setting is 1-4 steps."
)
else:
raise RuntimeError(f"Unsupported scheduler type: {type(self.scheduler)}")
def encode_text(self, prompt):
"""
Encode text embedding for empty prompt
"""
tokenizers = [self.tokenizer, self.tokenizer_2] if self.tokenizer is not None else [self.tokenizer_2]
text_encoders = (
[self.text_encoder, self.text_encoder_2] if self.text_encoder is not None else [self.text_encoder_2]
)
prompts = [prompt, prompt]
prompt_embeds_list = []
for prompt, tokenizer, text_encoder in zip(prompts, tokenizers, text_encoders):
text_inputs = tokenizer(
prompt,
padding="max_length",
max_length=tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids
untruncated_ids = tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(
text_input_ids, untruncated_ids
):
removed_text = tokenizer.batch_decode(untruncated_ids[:, tokenizer.model_max_length - 1: -1])
print(
"The following part of your input was truncated because CLIP can only handle sequences up to"
f" {tokenizer.model_max_length} tokens: {removed_text}"
)
prompt_embeds = text_encoder(text_input_ids.to(text_encoder.device), output_hidden_states=True)
pooled_prompt_embeds = prompt_embeds[0]
prompt_embeds = prompt_embeds.hidden_states[-2]
prompt_embeds_list.append(prompt_embeds)
prompt_embeds = torch.concat(prompt_embeds_list, dim=-1)
return prompt_embeds, pooled_prompt_embeds
def _get_add_time_ids(self, original_size, crops_coords_top_left, target_size, dtype):
add_time_ids = list(original_size + crops_coords_top_left + target_size)
passed_add_embed_dim = (
self.unet.config.addition_time_embed_dim * len(add_time_ids) + self.text_encoder_2.config.projection_dim
)
expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features
if expected_add_embed_dim != passed_add_embed_dim:
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
return add_time_ids
def numpy_to_pil(self, images: np.ndarray) -> PIL.Image.Image:
"""
Convert a numpy image or a batch of images to a PIL image.
"""
if images.ndim == 3:
images = images[None, ...]
images = (images * 255).round().astype("uint8")
if images.shape[-1] == 1:
# special case for grayscale (single channel) images
pil_images = [Image.fromarray(image.squeeze(), mode="L") for image in images]
else:
pil_images = [Image.fromarray(image) for image in images]
return pil_images
@torch.no_grad()
def generate_rgbd(
self,
prompt: str or list,
num_inference_steps: int,
generator: Union[torch.Generator, None],
show_pbar: bool = None,
negative_prompt: str or list = '',
color_map: str = "Spectral",
height: int = 1024,
width: int = 1024,
guidance_scale: float = 5.5
):
"""
Perform an individual depth prediction without ensembling.
Args:
rgb_in (`torch.Tensor`):
Input RGB image.
num_inference_steps (`int`):
Number of diffusion denoisign steps (DDIM) during inference.
show_pbar (`bool`):
Display a progress bar of diffusion denoising.
generator (`torch.Generator`)
Random generator for initial noise generation.
Returns:
`torch.Tensor`: Predicted depth map.
"""
device = self.device
ori_type = self.dtype
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
original_size = (height, width)
target_size = (height, width)
# Set timesteps
self.scheduler.set_timesteps(num_inference_steps, device=device)
timesteps = self.scheduler.timesteps # [T]
# prepare text embeddings
if isinstance(prompt, list):
bs = len(prompt)
batch_text_embed = []
batch_pooled_text_embed = []
for p in prompt:
prompt_embed, pooled_prompt_embed = self.encode_text(p)
batch_text_embed.append(prompt_embed)
batch_pooled_text_embed.append(pooled_prompt_embed)
batch_text_embed = torch.cat(batch_text_embed, dim=0)
batch_pooled_text_embed = torch.cat(batch_pooled_text_embed, dim=0)
elif isinstance(prompt, str):
bs = 1
batch_text_embed, batch_pooled_text_embed = self.encode_text(prompt)
else:
raise NotImplementedError
batch_empty_text_embed = torch.zeros_like(batch_text_embed).to(device) # [B, 77, d]
batch_pooled_empty_text_embed = torch.zeros_like(batch_pooled_text_embed).to(device)
# prepare added time ids & embeddings
add_time_ids = self._get_add_time_ids(
original_size, (0, 0), target_size, dtype=batch_text_embed.dtype
)
negative_add_time_ids = add_time_ids
prompt_embeds = torch.cat([batch_empty_text_embed, batch_text_embed], dim=0)
add_text_embeds = torch.cat([batch_pooled_empty_text_embed, batch_pooled_text_embed], dim=0)
add_time_ids = torch.cat([negative_add_time_ids, add_time_ids], dim=0)
prompt_embeds = prompt_embeds.to(device).to(torch.bfloat16)
add_text_embeds = add_text_embeds.to(device).to(torch.bfloat16)
add_time_ids = add_time_ids.to(device).repeat(bs, 1)
# Initial depth map (noise)
cat_latent = torch.randn(
[bs, self.unet.config["in_channels"], height // self.vae_scale_factor, width // self.vae_scale_factor],
device=device,
dtype=torch.bfloat16,
generator=generator,
) # [B, 8, h, w]
cat_latent = cat_latent * self.scheduler.init_noise_sigma
# Denoising loop
if show_pbar:
iterable = tqdm(
enumerate(timesteps),
total=len(timesteps),
leave=False,
desc=" " * 4 + "Diffusion denoising",
)
else:
iterable = enumerate(timesteps)
self.to(torch.bfloat16)
for i, t in iterable:
latent_model_input = torch.cat([cat_latent] * 2)
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
# predict the noise residual
added_cond_kwargs = {"text_embeds": add_text_embeds, "time_ids": add_time_ids}
with torch.no_grad():
noise_pred = self.unet(
latent_model_input,
t,
t,
encoder_hidden_states=prompt_embeds,
added_cond_kwargs=added_cond_kwargs,
separate_list=self.separate_list,
return_dict=False,
)[0]
# perform guidance
guidance_scale = guidance_scale
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# compute the previous noisy sample x_t -> x_t-1
cat_latent = self.scheduler.step(noise_pred, t, cat_latent, generator=generator).prev_sample
# self.unet.to(default_dtype)
# cat_latent.to(default_dtype)
image = self.decode_image(cat_latent[:, 0:4, :, :])
image = self.numpy_to_pil(image)
depth = self.decode_depth(cat_latent[:, 4:, :, :])
depth = torch.clip(depth, -1.0, 1.0)
depth = (depth + 1.0) / 2.0
depth_pred = depth.squeeze()
depth_pred = depth_pred.float().cpu().numpy()
depth_pred = depth_pred.clip(0, 1)
# Colorize
if color_map is not None:
depth_colored_img = []
depth_colored = colorize_depth_maps(
depth_pred, 0, 1, cmap=color_map
).squeeze() # [3, H, W], value in (0, 1)
depth_colored_img = self.numpy_to_pil(np.transpose(depth_colored, (0, 2, 3, 1)))
else:
depth_colored_img = None
rgbd_images = self.post_process_rgbd(prompt, image, depth_colored_img)
self.to(ori_type)
return rgbd_images
@torch.no_grad()
def image2depth(self,
input_image: Union[Image.Image, torch.Tensor],
denoising_steps: Optional[int] = None,
ensemble_size: int = 5,
processing_res: Optional[int] = None,
match_input_res: bool = True,
resample_method: str = "bilinear",
batch_size: int = 0,
generator: Union[torch.Generator, None] = None,
color_map: str = "Spectral",
show_progress_bar: bool = True,
ensemble_kwargs: Dict = None,
):
# Model-specific optimal default values leading to fast and reasonable results.
if denoising_steps is None:
denoising_steps = self.default_denoising_steps
if processing_res is None:
processing_res = self.default_processing_resolution
ori_type = self.dtype
self.to(torch.bfloat16)
assert processing_res >= 0
assert ensemble_size >= 1
# Check if denoising step is reasonable
self._check_inference_step(denoising_steps)
resample_method: InterpolationMode = get_tv_resample_method(resample_method)
# ----------------- Image Preprocess -----------------
# Convert to torch tensor
if isinstance(input_image, Image.Image):
input_image = input_image.convert("RGB")
# convert to torch tensor [H, W, rgb] -> [rgb, H, W]
rgb = pil_to_tensor(input_image)
rgb = rgb.unsqueeze(0) # [1, rgb, H, W]
elif isinstance(input_image, torch.Tensor):
rgb = input_image
else:
raise TypeError(f"Unknown input type: {type(input_image) = }")
input_size = rgb.shape
assert (
4 == rgb.dim() and 3 == input_size[-3]
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]"
# Resize image
if processing_res > 0:
rgb = resize_max_res(
rgb,
max_edge_resolution=processing_res,
resample_method=resample_method,
)
# Normalize rgb values
rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 # [0, 255] -> [-1, 1]
rgb_norm = rgb_norm.to(self.dtype)
assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0
# ----------------- Predicting depth -----------------
# Batch repeated input image
duplicated_rgb = rgb_norm.expand(ensemble_size, -1, -1, -1)
single_rgb_dataset = TensorDataset(duplicated_rgb)
if batch_size > 0:
_bs = batch_size
else:
_bs = find_batch_size(
ensemble_size=ensemble_size,
input_res=max(rgb_norm.shape[1:]),
dtype=self.dtype,
)
single_rgb_loader = DataLoader(
single_rgb_dataset, batch_size=_bs, shuffle=False
)
# Predict depth maps (batched)
depth_pred_ls = []
if show_progress_bar:
iterable = tqdm(
single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False
)
else:
iterable = single_rgb_loader
for batch in iterable:
(batched_img,) = batch
depth_pred_raw = self.single_image2depth(
rgb_in=batched_img,
num_inference_steps=denoising_steps,
show_pbar=show_progress_bar,
generator=generator,
)
depth_pred_ls.append(depth_pred_raw.detach())
depth_preds = torch.concat(depth_pred_ls, dim=0)
torch.cuda.empty_cache() # clear vram cache for ensembling
depth_preds = depth_preds.to(torch.float32)
# ----------------- Test-time ensembling -----------------
if ensemble_size > 1:
depth_pred, pred_uncert = ensemble_depth(
depth_preds,
scale_invariant=self.scale_invariant,
shift_invariant=self.shift_invariant,
max_res=50,
**(ensemble_kwargs or {}),
)
else:
depth_pred = depth_preds
pred_uncert = None
# Resize back to original resolution
if match_input_res:
depth_pred = resize(
depth_pred,
input_size[-2:],
interpolation=resample_method,
antialias=True,
)
# Convert to numpy
depth_pred = depth_pred.squeeze()
depth_pred = depth_pred.cpu().numpy()
if pred_uncert is not None:
pred_uncert = pred_uncert.squeeze().cpu().numpy()
# Clip output range
depth_pred = depth_pred.clip(0, 1)
# Colorize
if color_map is not None:
depth_colored = colorize_depth_maps(
depth_pred, 0, 1, cmap=color_map
).squeeze() # [3, H, W], value in (0, 1)
depth_colored = (depth_colored * 255).astype(np.uint8)
depth_colored_hwc = chw2hwc(depth_colored)
depth_colored_img = Image.fromarray(depth_colored_hwc)
else:
depth_colored_img = None
self.to(ori_type)
return MarigoldDepthOutput(
depth_np=depth_pred,
depth_colored=depth_colored_img,
uncertainty=pred_uncert,
)
@torch.no_grad()
def single_image2depth(
self,
rgb_in: torch.Tensor,
num_inference_steps: int,
generator: Union[torch.Generator, None],
show_pbar: bool
) -> torch.Tensor:
"""
Perform an individual depth prediction without ensembling.
Args:
rgb_in (`torch.Tensor`):
Input RGB image.
num_inference_steps (`int`):
Number of diffusion denoisign steps (DDIM) during inference.
show_pbar (`bool`):
Display a progress bar of diffusion denoising.
generator (`torch.Generator`)
Random generator for initial noise generation.
Returns:
`torch.Tensor`: Predicted depth map.
"""
device = self.device
rgb_in = rgb_in.to(device)
# Set timesteps
self.scheduler.set_timesteps(num_inference_steps, device=device)
timesteps = self.scheduler.timesteps # [T]
# Encode image
rgb_latent = self.encode_rgb(rgb_in)
# Initial depth map (noise)
depth_latent = torch.randn(
rgb_latent.shape,
device=device,
dtype=self.dtype,
generator=generator,
) # [B, 4, h, w]
# Batched empty text embedding
if self.empty_text_embed is None:
self.encode_empty_text()
batch_empty_text_embed = self.empty_text_embed.repeat(
(rgb_latent.shape[0], 1, 1)
).to(device).to(self.dtype) # [B, 2, 1024]
# Denoising loop
if show_pbar:
iterable = tqdm(
enumerate(timesteps),
total=len(timesteps),
leave=False,
desc=" " * 4 + "Diffusion denoising",
)
else:
iterable = enumerate(timesteps)
for i, t in iterable:
unet_input = torch.cat(
[rgb_latent, depth_latent], dim=1
) # this order is important
# predict the noise residual
noise_pred = self.unet(
unet_input, rgb_timestep=0, depth_timestep=t, encoder_hidden_states=batch_empty_text_embed
).sample # [B, 4, h, w]
# compute the previous noisy sample x_t -> x_t-1
depth_latent = self.scheduler.step(
noise_pred[:, 4:, :, :], t, depth_latent, generator=generator
).prev_sample
depth = self.decode_depth(depth_latent)
# clip prediction
depth = torch.clip(depth, -1.0, 1.0)
# shift to [0, 1]
depth = (depth + 1.0) / 2.0
return depth
def single_depth2image(
self,
depth_in: torch.Tensor,
prompt,
num_inference_steps: int,
generator: Union[torch.Generator, None],
show_pbar: bool
) -> torch.Tensor:
"""
Perform an individual depth prediction without ensembling.
Args:
rgb_in (`torch.Tensor`):
Input RGB image.
num_inference_steps (`int`):
Number of diffusion denoisign steps (DDIM) during inference.
show_pbar (`bool`):
Display a progress bar of diffusion denoising.
generator (`torch.Generator`)
Random generator for initial noise generation.
Returns:
`torch.Tensor`: Predicted depth map.
"""
device = self.device
depth_in = depth_in.to(device)
# Set timesteps
self.scheduler.set_timesteps(num_inference_steps, device=device)
timesteps = self.scheduler.timesteps # [T]
# Encode depth
depth_latent = self.encode_rgb(depth_in)
# Initial rgb map (noise)
rgb_latent = torch.randn(
depth_latent.shape,
device=device,
dtype=self.dtype,
generator=generator,
) # [B, 4, h, w]
# encode text
prompt_embed, pooled_prompt_embed = self.model.encode_text(prompt)
# Denoising loop
if show_pbar:
iterable = tqdm(
enumerate(timesteps),
total=len(timesteps),
leave=False,
desc=" " * 4 + "Diffusion denoising",
)
else:
iterable = enumerate(timesteps)
for i, t in iterable:
unet_input = torch.cat(
[rgb_latent, depth_latent], dim=1
) # this order is important
# predict the noise residual
noise_pred = self.unet(
unet_input, rgb_timestep=t, depth_timestep=0, encoder_hidden_states=batch_text_embed
).sample # [B, 4, h, w]
# compute the previous noisy sample x_t -> x_t-1
rgb_latent = self.scheduler.step(
noise_pred[:, 0:4, :, :], t, rgb_latent, generator=generator
).prev_sample
image = self.decode_image(rgb_latent)
return image
def encode_rgb(self, rgb_in: torch.Tensor) -> torch.Tensor:
"""
Encode RGB image into latent.
Args:
rgb_in (`torch.Tensor`):
Input RGB image to be encoded.
Returns:
`torch.Tensor`: Image latent.
"""
# encode
h = self.vae.encoder(rgb_in)
moments = self.vae.quant_conv(h)
mean, logvar = torch.chunk(moments, 2, dim=1)
# scale latent
rgb_latent = mean * self.rgb_latent_scale_factor
return rgb_latent
def encode_depth(self, depth_in: torch.Tensor) -> torch.Tensor:
"""
Encode RGB image into latent.
Args:
rgb_in (`torch.Tensor`):
Input RGB image to be encoded.
Returns:
`torch.Tensor`: Image latent.
"""
# encode
h = self.vae.encoder(depth_in)
moments = self.vae.quant_conv(h)
mean, logvar = torch.chunk(moments, 2, dim=1)
# scale latent
rgb_latent = mean * self.rgb_latent_scale_factor
return rgb_latent
def decode_image(self, latents):
image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False)[0]
image = self.image_processor.postprocess(image, output_type='np')
return image
def decode_depth(self, depth_latent: torch.Tensor) -> torch.Tensor:
"""
Decode depth latent into depth map.
Args:
depth_latent (`torch.Tensor`):
Depth latent to be decoded.
Returns:
`torch.Tensor`: Decoded depth map.
"""
# scale latent
depth_latent = depth_latent / self.depth_latent_scale_factor
# decode
z = self.vae.post_quant_conv(depth_latent)
stacked = self.vae.decoder(z)
# mean of output channels
depth_mean = stacked.mean(dim=1, keepdim=True)
return depth_mean
def post_process_rgbd(self, prompts, rgb_image, depth_image):
rgbd_images = []
for idx, p in enumerate(prompts):
image1, image2 = rgb_image[idx], depth_image[idx]
width1, height1 = image1.size
width2, height2 = image2.size
font = ImageFont.load_default(size=20)
text = p
draw = ImageDraw.Draw(image1)
text_bbox = draw.textbbox((0, 0), text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
new_image = Image.new('RGB', (width1 + width2, max(height1, height2) + text_height), (255, 255, 255))
text_x = (new_image.width - text_width) // 2
text_y = 0
draw = ImageDraw.Draw(new_image)
draw.text((text_x, text_y), text, fill="black", font=font)
new_image.paste(image1, (0, text_height))
new_image.paste(image2, (width1, text_height))
rgbd_images.append(pil_to_tensor(new_image))
return rgbd_images