# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Utility functions for the data module."""
from __future__ import annotations
import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any
import cv2
import numpy as np
from datumaro.components.annotation import AnnotationType, Bbox, Polygon
from datumaro.components.annotation import Shape as _Shape
from otx.types import OTXTaskType
if TYPE_CHECKING:
from datumaro import Dataset, DatasetSubset
from otx.config.data import TileConfig
logger = logging.getLogger(__name__)
# Annotation type for each task
TASK_ANNO_TYPE = {
OTXTaskType.INSTANCE_SEGMENTATION: Polygon,
OTXTaskType.SEMANTIC_SEGMENTATION: Polygon,
OTXTaskType.DETECTION: Bbox,
}
def compute_robust_statistics(values: np.array) -> dict[str, float]:
"""Computes robust statistics of given samples.
Args:
values (np.array): Array of samples
Returns:
dict[str, float]: Robust avg, min, max values
"""
stat: dict = {}
if values.size == 0:
return stat
avg_value = np.mean(values)
std_value = np.std(values)
avg_3std_min_value = avg_value - 3 * std_value
avg_3std_max_value = avg_value + 3 * std_value
min_value = np.min(values)
max_value = np.max(values)
# Refine min/max to reduce outlier effect
robust_min_value = max(min_value, avg_3std_min_value)
robust_max_value = min(max_value, avg_3std_max_value)
stat["avg"] = float(avg_value)
stat["std"] = float(std_value)
stat["min"] = float(min_value)
stat["max"] = float(max_value)
stat["robust_min"] = float(robust_min_value)
stat["robust_max"] = float(robust_max_value)
return stat
def compute_robust_scale_statistics(values: np.array) -> dict[str, float]:
"""Computes robust statistics of scale values.
Average of 0.5x scale and 2x scale should be 1x
Args:
values (np.array): Array of positive scale values
Returns:
dict[str, float]: Robust avg, min, max values
"""
# Compute stat in log scale & convert back to original scale
if values.size == 0:
return {}
stat = compute_robust_statistics(np.log(values))
stat = {k: float(np.exp(v)) for k, v in stat.items()}
# Normal scale std is easier to understand
stat["std"] = float(np.std(values))
return stat
def compute_robust_dataset_statistics(
dataset: DatasetSubset,
task: OTXTaskType = OTXTaskType.DETECTION,
max_samples: int = 1000,
) -> dict[str, Any]:
"""Computes robust statistics of image & annotation sizes.
Args:
dataset (DatasetSubset): Input dataset.
task (OTXTaskType, optional): Task type of the model. Defaults to OTXTaskType.DETECTION.
max_samples (int, optional): Maximum number of dataset subsamples to analyze. Defaults to 1000.
Returns:
Dict[str, Any]: Robust avg, min, max values for images, and annotations optionally.
ex) stat = {
"image": {
"height" : {"avg": ...},
"width" : {"avg": ...},
}
"annotation": {
"num_per_image": {"avg": ...},
"size_of_shape": {"avg": ...},
}
}
"""
stat: dict = {"image": {}, "annotation": {}}
if len(dataset) == 0 or max_samples <= 0:
return stat
data_ids = [item.id for item in dataset]
max_image_samples = min(max_samples, len(dataset))
rng = np.random.default_rng(42)
data_ids = rng.choice(data_ids, max_image_samples, replace=False)[:max_image_samples]
height_arr = []
width_arr = []
for idx in data_ids:
data = dataset.get(id=idx, subset=dataset.name)
height, width = data.media.size
height_arr.append(height)
width_arr.append(width)
stat["image"]["height"] = compute_robust_scale_statistics(np.array(height_arr))
stat["image"]["width"] = compute_robust_scale_statistics(np.array(width_arr))
label_names = dataset.as_dataset().categories()
num_per_images: list[int] = []
size_of_shapes: dict[str, list] = defaultdict(list)
for idx in data_ids:
data = dataset.get(id=idx, subset=dataset.name)
annotations: dict[str, list] = defaultdict(list)
for ann in data.annotations:
if task is OTXTaskType.SEMANTIC_SEGMENTATION:
# Skip background class
if label_names and label_names[AnnotationType.label][ann.label].name == "background":
continue
# convert foreground mask to multiple polygons
contours, _ = cv2.findContours(ann.image.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
annotations[Polygon].extend(
[Polygon(contour.flatten()) for contour in contours if len(contour) > 2],
)
else:
annotations[ann.__class__].append(ann)
num_per_images.append(max(len(val) for val in annotations.values()) if annotations else 0)
if size_of_shapes and max(len(val) for val in size_of_shapes.values()) >= max_samples:
continue
for ann_type, anns in annotations.items():
size_of_shapes[ann_type].extend(
np.sqrt(area) for val in anns if isinstance(val, _Shape) and (area := val.get_area()) >= 1
)
stat["annotation"]["num_per_image"] = compute_robust_statistics(np.array(num_per_images))
target_ann_type = TASK_ANNO_TYPE.get(task)
if target_ann_type not in size_of_shapes:
msg = (
f"Task type {task} is not supported for computing annotation statistics. "
"OTX will try to continue with annotation found in the dataset."
)
logger.warning(msg)
target_ann_type = sorted(size_of_shapes.keys(), key=lambda x: len(size_of_shapes[x]), reverse=True)[0]
logger.warning(f"Selected annotation type: {target_ann_type}")
stat["annotation"]["size_of_shape"] = compute_robust_scale_statistics(np.array(size_of_shapes[target_ann_type]))
return stat
_MIN_RECOGNIZABLE_OBJECT_SIZE = 32 # Minimum object size recognizable by NNs: typically 16 ~ 32
# meaning NxN input pixels being downscaled to 1x1 on feature map
_MIN_DETECTION_INPUT_SIZE = 256 # Minimum input size for object detection
[docs]
def adapt_tile_config(tile_config: TileConfig, dataset: Dataset, task: OTXTaskType) -> None:
"""Config tile parameters.
Adapt based on annotation statistics.
i.e. tile size, tile overlap, ratio and max objects per sample
Args:
tile_config (TileConfig): tiling parameters of the model
dataset (Dataset): Datumaro dataset including all subsets
task (Task): task type of the model
"""
if (train_dataset := dataset.subsets().get("train") or dataset.subsets().get("TRAINING")) is not None:
stat = compute_robust_dataset_statistics(train_dataset, task=task)
max_num_objects = round(stat["annotation"]["num_per_image"]["max"])
avg_size = stat["annotation"]["size_of_shape"]["avg"]
min_size = stat["annotation"]["size_of_shape"]["robust_min"]
max_size = stat["annotation"]["size_of_shape"]["robust_max"]
logger.info(f"----> [stat] scale avg: {avg_size}")
logger.info(f"----> [stat] scale min: {min_size}")
logger.info(f"----> [stat] scale max: {max_size}")
logger.warning("[Adaptive tiling pararms]")
object_tile_ratio = tile_config.object_tile_ratio
tile_size = int(avg_size / object_tile_ratio)
tile_overlap = max_size / tile_size
logger.info(f"----> avg_object_size: {avg_size}")
logger.info(f"----> max_object_size: {max_size}")
logger.warning(f"----> object_tile_ratio: {object_tile_ratio}")
logger.warning(f"----> tile_size: {avg_size} / {object_tile_ratio} = {tile_size}")
logger.warning(f"----> tile_overlap: {max_size} / {tile_size} = {tile_overlap}")
if tile_overlap >= 0.9:
# Use the average object area if the tile overlap is too large to prevent 0 stride.
tile_overlap = min(avg_size / tile_size, 0.9)
logger.warning(f"----> (too big) tile_overlap: {avg_size} / {tile_size} = min[{tile_overlap}, 0.9]")
# TODO(Eugene): how to validate lower/upper_bound? dataclass? pydantic?
# https://github.com/openvinotoolkit/training_extensions/pull/2903
tile_config.tile_size = (tile_size, tile_size)
tile_config.max_num_instances = max_num_objects
tile_config.overlap = tile_overlap