File size: 8,347 Bytes
8aa4f1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
from mmengine.fileio import FileClient
from mmengine.dist import master_only
from einops import rearrange
import torch
import mmcv
import numpy as np
import os.path as osp
import cv2
from typing import Optional, Sequence
import torch.nn as nn
from mmdet.apis import inference_detector
from mmcv.transforms import Compose
from mmdet.engine import DetVisualizationHook
from mmdet.registry import HOOKS
from mmdet.structures import DetDataSample

from utils.io_utils import find_all_imgs, square_pad_resize, imglist2grid

def inference_detector(
    model: nn.Module,
    imgs,
    test_pipeline
):

    if isinstance(imgs, (list, tuple)):
        is_batch = True
    else:
        imgs = [imgs]
        is_batch = False

    if len(imgs) == 0:
        return []

    test_pipeline = test_pipeline.copy()
    if isinstance(imgs[0], np.ndarray):
        # Calling this method across libraries will result
        # in module unregistered error if not prefixed with mmdet.
        test_pipeline[0].type = 'mmdet.LoadImageFromNDArray'

    test_pipeline = Compose(test_pipeline)

    result_list = []
    for img in imgs:
        # prepare data
        if isinstance(img, np.ndarray):
            # TODO: remove img_id.
            data_ = dict(img=img, img_id=0)
        else:
            # TODO: remove img_id.
            data_ = dict(img_path=img, img_id=0)
        # build the data pipeline
        data_ = test_pipeline(data_)

        data_['inputs'] = [data_['inputs']]
        data_['data_samples'] = [data_['data_samples']]

        # forward the model
        with torch.no_grad():
            results = model.test_step(data_)[0]

        result_list.append(results)

    if not is_batch:
        return result_list[0]
    else:
        return result_list


@HOOKS.register_module()
class InstanceSegVisualizationHook(DetVisualizationHook):

    def __init__(self, visualize_samples: str = '', 
                 read_rgb: bool = False,
                 draw: bool = False,
                 interval: int = 50,
                 score_thr: float = 0.3,
                 show: bool = False,
                 wait_time: float = 0.,
                 test_out_dir: Optional[str] = None,
                 file_client_args: dict = dict(backend='disk')):
        super().__init__(draw, interval, score_thr, show, wait_time, test_out_dir, file_client_args)
        self.vis_samples = []

        if osp.exists(visualize_samples):
            self.channel_order = channel_order = 'rgb' if read_rgb else 'bgr' 
            samples = find_all_imgs(visualize_samples, abs_path=True)
            for imgp in samples:
                img = mmcv.imread(imgp, channel_order=channel_order)
                img, _, _, _ = square_pad_resize(img, 640)
                self.vis_samples.append(img)

    def before_val(self, runner) -> None:
        total_curr_iter = runner.iter
        self._visualize_data(total_curr_iter, runner)
        return super().before_val(runner)

    # def after_val_iter(self, runner: Runner, batch_idx: int, data_batch: dict,
    #                    outputs: Sequence[DetDataSample]) -> None:
    #     """Run after every ``self.interval`` validation iterations.

    #     Args:
    #         runner (:obj:`Runner`): The runner of the validation process.
    #         batch_idx (int): The index of the current batch in the val loop.
    #         data_batch (dict): Data from dataloader.
    #         outputs (Sequence[:obj:`DetDataSample`]]): A batch of data samples
    #             that contain annotations and predictions.
    #     """
    #     # if self.draw is False:
    #     #     return

    #     if self.file_client is None:
    #         self.file_client = FileClient(**self.file_client_args)


    #     # There is no guarantee that the same batch of images
    #     # is visualized for each evaluation.
    #     total_curr_iter = runner.iter + batch_idx

    #     # # Visualize only the first data
    #     # img_path = outputs[0].img_path
    #     # img_bytes = self.file_client.get(img_path)
    #     # img = mmcv.imfrombytes(img_bytes, channel_order='rgb')
    #     if total_curr_iter % self.interval == 0 and self.vis_samples:
    #         self._visualize_data(total_curr_iter, runner)


    @master_only
    def _visualize_data(self, total_curr_iter, runner):

        tgt_size = 384

        runner.model.eval()
        outputs = inference_detector(runner.model, self.vis_samples, test_pipeline=runner.cfg.test_pipeline)
        vis_results = []
        for img, output in zip(self.vis_samples, outputs):
            vis_img = self.add_datasample(
                    'val_img',
                    img,
                    data_sample=output,
                    show=self.show,
                    wait_time=self.wait_time,
                    pred_score_thr=self.score_thr,
                    draw_gt=False,
                    step=total_curr_iter)
            vis_results.append(cv2.resize(vis_img, (tgt_size, tgt_size), interpolation=cv2.INTER_AREA))

        drawn_img = imglist2grid(vis_results, tgt_size)
        if drawn_img is None:
            return
        drawn_img = cv2.cvtColor(drawn_img, cv2.COLOR_BGR2RGB)
        visualizer = self._visualizer
        visualizer.set_image(drawn_img)
        visualizer.add_image('val_img', drawn_img, total_curr_iter)


    @master_only
    def add_datasample(
            self,
            name: str,
            image: np.ndarray,
            data_sample: Optional['DetDataSample'] = None,
            draw_gt: bool = True,
            draw_pred: bool = True,
            show: bool = False,
            wait_time: float = 0,
            # TODO: Supported in mmengine's Viusalizer.
            out_file: Optional[str] = None,
            pred_score_thr: float = 0.3,
            step: int = 0) -> np.ndarray:
        image = image.clip(0, 255).astype(np.uint8)
        visualizer = self._visualizer
        classes = visualizer.dataset_meta.get('classes', None)
        palette = visualizer.dataset_meta.get('palette', None)

        gt_img_data = None
        pred_img_data = None

        if data_sample is not None:
            data_sample = data_sample.cpu()

        if draw_gt and data_sample is not None:
            gt_img_data = image
            if 'gt_instances' in data_sample:
                gt_img_data = visualizer._draw_instances(image,
                                                   data_sample.gt_instances,
                                                   classes, palette)

            if 'gt_panoptic_seg' in data_sample:
                assert classes is not None, 'class information is ' \
                                            'not provided when ' \
                                            'visualizing panoptic ' \
                                            'segmentation results.'
                gt_img_data = visualizer._draw_panoptic_seg(
                    gt_img_data, data_sample.gt_panoptic_seg, classes)

        if draw_pred and data_sample is not None:
            pred_img_data = image
            if 'pred_instances' in data_sample:
                pred_instances = data_sample.pred_instances
                pred_instances = pred_instances[
                    pred_instances.scores > pred_score_thr]
                pred_img_data = visualizer._draw_instances(image, pred_instances,
                                                     classes, palette)
            if 'pred_panoptic_seg' in data_sample:
                assert classes is not None, 'class information is ' \
                                            'not provided when ' \
                                            'visualizing panoptic ' \
                                            'segmentation results.'
                pred_img_data = visualizer._draw_panoptic_seg(
                    pred_img_data, data_sample.pred_panoptic_seg.numpy(),
                    classes)

        if gt_img_data is not None and pred_img_data is not None:
            drawn_img = np.concatenate((gt_img_data, pred_img_data), axis=1)
        elif gt_img_data is not None:
            drawn_img = gt_img_data
        elif pred_img_data is not None:
            drawn_img = pred_img_data
        else:
            # Display the original image directly if nothing is drawn.
            drawn_img = image

        return drawn_img