Source code for otx.data.utils.utils

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Utility functions for the data module."""

from __future__ import annotations

import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any

import cv2
import numpy as np
from datumaro.components.annotation import AnnotationType, Bbox, Polygon
from datumaro.components.annotation import Shape as _Shape

from otx.types import OTXTaskType

if TYPE_CHECKING:
    from datumaro import Dataset, DatasetSubset

    from otx.config.data import TileConfig


logger = logging.getLogger(__name__)

# Annotation type for each task
TASK_ANNO_TYPE = {
    OTXTaskType.INSTANCE_SEGMENTATION: Polygon,
    OTXTaskType.SEMANTIC_SEGMENTATION: Polygon,
    OTXTaskType.DETECTION: Bbox,
}


def compute_robust_statistics(values: np.array) -> dict[str, float]:
    """Computes robust statistics of given samples.

    Args:
        values (np.array): Array of samples

    Returns:
        dict[str, float]: Robust avg, min, max values
    """
    stat: dict = {}
    if values.size == 0:
        return stat

    avg_value = np.mean(values)
    std_value = np.std(values)
    avg_3std_min_value = avg_value - 3 * std_value
    avg_3std_max_value = avg_value + 3 * std_value
    min_value = np.min(values)
    max_value = np.max(values)

    # Refine min/max to reduce outlier effect
    robust_min_value = max(min_value, avg_3std_min_value)
    robust_max_value = min(max_value, avg_3std_max_value)

    stat["avg"] = float(avg_value)
    stat["std"] = float(std_value)
    stat["min"] = float(min_value)
    stat["max"] = float(max_value)
    stat["robust_min"] = float(robust_min_value)
    stat["robust_max"] = float(robust_max_value)
    return stat


def compute_robust_scale_statistics(values: np.array) -> dict[str, float]:
    """Computes robust statistics of scale values.

    Average of 0.5x scale and 2x scale should be 1x

    Args:
        values (np.array): Array of positive scale values

    Returns:
        dict[str, float]: Robust avg, min, max values
    """
    # Compute stat in log scale & convert back to original scale
    if values.size == 0:
        return {}

    stat = compute_robust_statistics(np.log(values))
    stat = {k: float(np.exp(v)) for k, v in stat.items()}
    # Normal scale std is easier to understand
    stat["std"] = float(np.std(values))
    return stat


def compute_robust_dataset_statistics(
    dataset: DatasetSubset,
    task: OTXTaskType = OTXTaskType.DETECTION,
    max_samples: int = 1000,
) -> dict[str, Any]:
    """Computes robust statistics of image & annotation sizes.

    Args:
        dataset (DatasetSubset): Input dataset.
        task (OTXTaskType, optional): Task type of the model. Defaults to OTXTaskType.DETECTION.
        max_samples (int, optional): Maximum number of dataset subsamples to analyze. Defaults to 1000.

    Returns:
        Dict[str, Any]: Robust avg, min, max values for images, and annotations optionally.
            ex) stat = {
                    "image": {
                        "height" : {"avg": ...},
                        "width" : {"avg": ...},
                    }
                    "annotation": {
                       "num_per_image": {"avg": ...},
                       "size_of_shape": {"avg": ...},
                    }
                }
    """
    stat: dict = {"image": {}, "annotation": {}}
    if len(dataset) == 0 or max_samples <= 0:
        return stat

    data_ids = [item.id for item in dataset]
    max_image_samples = min(max_samples, len(dataset))
    rng = np.random.default_rng(42)
    data_ids = rng.choice(data_ids, max_image_samples, replace=False)[:max_image_samples]

    height_arr = []
    width_arr = []
    for idx in data_ids:
        data = dataset.get(id=idx, subset=dataset.name)
        height, width = data.media.size
        height_arr.append(height)
        width_arr.append(width)
    stat["image"]["height"] = compute_robust_scale_statistics(np.array(height_arr))
    stat["image"]["width"] = compute_robust_scale_statistics(np.array(width_arr))
    label_names = dataset.as_dataset().categories()

    num_per_images: list[int] = []
    size_of_shapes: dict[str, list] = defaultdict(list)
    for idx in data_ids:
        data = dataset.get(id=idx, subset=dataset.name)
        annotations: dict[str, list] = defaultdict(list)
        for ann in data.annotations:
            if task is OTXTaskType.SEMANTIC_SEGMENTATION:
                # Skip background class
                if label_names and label_names[AnnotationType.label][ann.label].name == "background":
                    continue

                # convert foreground mask to multiple polygons
                contours, _ = cv2.findContours(ann.image.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
                annotations[Polygon].extend(
                    [Polygon(contour.flatten()) for contour in contours if len(contour) > 2],
                )
            else:
                annotations[ann.__class__].append(ann)

        num_per_images.append(max(len(val) for val in annotations.values()) if annotations else 0)

        if size_of_shapes and max(len(val) for val in size_of_shapes.values()) >= max_samples:
            continue

        for ann_type, anns in annotations.items():
            size_of_shapes[ann_type].extend(
                np.sqrt(area) for val in anns if isinstance(val, _Shape) and (area := val.get_area()) >= 1
            )

    stat["annotation"]["num_per_image"] = compute_robust_statistics(np.array(num_per_images))

    target_ann_type = TASK_ANNO_TYPE.get(task)
    if target_ann_type not in size_of_shapes:
        msg = (
            f"Task type {task} is not supported for computing annotation statistics. "
            "OTX will try to continue with annotation found in the dataset."
        )
        logger.warning(msg)
        target_ann_type = sorted(size_of_shapes.keys(), key=lambda x: len(size_of_shapes[x]), reverse=True)[0]
        logger.warning(f"Selected annotation type: {target_ann_type}")
    stat["annotation"]["size_of_shape"] = compute_robust_scale_statistics(np.array(size_of_shapes[target_ann_type]))
    return stat


_MIN_RECOGNIZABLE_OBJECT_SIZE = 32  # Minimum object size recognizable by NNs: typically 16 ~ 32
# meaning NxN input pixels being downscaled to 1x1 on feature map
_MIN_DETECTION_INPUT_SIZE = 256  # Minimum input size for object detection


[docs] def adapt_input_size_to_dataset( dataset: Dataset, task: OTXTaskType = OTXTaskType.DETECTION, input_size_multiplier: int | None = None, ) -> tuple[int, int]: """Compute appropriate model input size w.r.t. dataset statistics. Args: dataset (Dataset): Datumaro dataset including all subsets. task (OTXTaskType, optional): Task type of the model. Defaults to OTXTaskType.DETECTION. downscale_only (bool, optional) : Whether to allow only smaller size than default setting. Defaults to True. input_size_multiplier (int | None, optional): Multiplier for input size. If it's set, return the input size which can be divisible by the value. Defaults to None. Returns: tuple[int, int]: Recommended input size based on dataset statistics. """ if (train_dataset := dataset.subsets().get("train")) is None: msg = "Dataset does not have 'train' subset. Cannot compute dataset statistics." raise ValueError(msg) logger.info("Adapting model input size based on dataset stat") stat = compute_robust_dataset_statistics(train_dataset, task) max_image_size: list[int] = [ stat["image"].get("height", {}).get("robust_max", 0), stat["image"].get("width", {}).get("robust_max", 0), ] min_object_size = None image_size = max_image_size logger.info(f"-> Based on typical large image size: {image_size}") # Refine using annotation shape size stat # Fit to typical small object size (conservative) # -> "avg" size might be preferrable for efficiency min_object_size = stat.get("annotation", {}).get("size_of_shape", {}).get("robust_min", None) if min_object_size is not None and min_object_size > 0: image_size = [round(val * _MIN_RECOGNIZABLE_OBJECT_SIZE / min_object_size) for val in image_size] logger.info(f"-> Based on typical small object size {min_object_size}: {image_size}") if image_size[0] > max_image_size[0]: image_size = max_image_size logger.info(f"-> Restrict to max image size: {image_size}") if image_size[0] < _MIN_DETECTION_INPUT_SIZE or image_size[1] < _MIN_DETECTION_INPUT_SIZE: big_val_idx = 0 if image_size[0] > image_size[1] else 1 small_val_idx = 1 - big_val_idx image_size[big_val_idx] = image_size[big_val_idx] * _MIN_DETECTION_INPUT_SIZE // image_size[small_val_idx] image_size[small_val_idx] = _MIN_DETECTION_INPUT_SIZE logger.info(f"-> Based on minimum object detection input size: {image_size}") if input_size_multiplier is not None: for i, val in enumerate(image_size): if val % input_size_multiplier != 0: image_size[i] = (val // input_size_multiplier + 1) * input_size_multiplier image_size = tuple(int(val) for val in image_size) # type: ignore[assignment] logger.info(f"-> Adapted input size: {image_size}") return image_size # type: ignore[return-value]
[docs] def adapt_tile_config(tile_config: TileConfig, dataset: Dataset, task: OTXTaskType) -> None: """Config tile parameters. Adapt based on annotation statistics. i.e. tile size, tile overlap, ratio and max objects per sample Args: tile_config (TileConfig): tiling parameters of the model dataset (Dataset): Datumaro dataset including all subsets task (Task): task type of the model """ if (train_dataset := dataset.subsets().get("train") or dataset.subsets().get("TRAINING")) is not None: stat = compute_robust_dataset_statistics(train_dataset, task=task) max_num_objects = round(stat["annotation"]["num_per_image"]["max"]) avg_size = stat["annotation"]["size_of_shape"]["avg"] min_size = stat["annotation"]["size_of_shape"]["robust_min"] max_size = stat["annotation"]["size_of_shape"]["robust_max"] logger.info(f"----> [stat] scale avg: {avg_size}") logger.info(f"----> [stat] scale min: {min_size}") logger.info(f"----> [stat] scale max: {max_size}") logger.warning("[Adaptive tiling pararms]") object_tile_ratio = tile_config.object_tile_ratio tile_size = int(avg_size / object_tile_ratio) tile_overlap = max_size / tile_size logger.info(f"----> avg_object_size: {avg_size}") logger.info(f"----> max_object_size: {max_size}") logger.warning(f"----> object_tile_ratio: {object_tile_ratio}") logger.warning(f"----> tile_size: {avg_size} / {object_tile_ratio} = {tile_size}") logger.warning(f"----> tile_overlap: {max_size} / {tile_size} = {tile_overlap}") if tile_overlap >= 0.9: # Use the average object area if the tile overlap is too large to prevent 0 stride. tile_overlap = min(avg_size / tile_size, 0.9) logger.warning(f"----> (too big) tile_overlap: {avg_size} / {tile_size} = min[{tile_overlap}, 0.9]") # TODO(Eugene): how to validate lower/upper_bound? dataclass? pydantic? # https://github.com/openvinotoolkit/training_extensions/pull/2903 tile_config.tile_size = (tile_size, tile_size) tile_config.max_num_instances = max_num_objects tile_config.overlap = tile_overlap