Source code for cv2ext.detection._packer

# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# ruff: noqa: S311
from __future__ import annotations

import math
import operator
import random
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

import numpy as np

from cv2ext._jit import register_jit

if TYPE_CHECKING:
    from typing_extensions import Self


[docs] class AbstractFramePacker(ABC): """ Pack regions of a frame together based on detection activity. Detections are represented as a list of bounding boxes with scores and class id labels optional. """
[docs] @abstractmethod def pack( self: Self, image: np.ndarray, exclude: tuple[int, int, int, int] | list[tuple[int, int, int, int]], ) -> tuple[np.ndarray, np.ndarray]: """ Pack regions of a frame together. Parameters ---------- image : np.ndarray The image to pack. exclude : tuple[int, int, int, int] | list[tuple[int, int, int, int]] Regions of the image to exclude from the packing. Returns ------- tuple[np.ndarray, np.ndarray] The packed image and the transform information. """
[docs] @abstractmethod def unpack( self: Self, detections: list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]], transform: np.ndarray, ) -> ( list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] ): """ Unpack regions of a frame. Parameters ---------- detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] The regions to unpack. transform : np.ndarray The transform information generated by the pack method. Returns ------- list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] """
[docs] @abstractmethod def update( self: Self, detections: list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]], ) -> None: """ Update the packer with new detections. Parameters ---------- detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] The detections to update the packer with. """
[docs] @abstractmethod def reset( self: Self, image_shape: tuple[int, int] | None = None, gridsize: int | None = None, ) -> None: """ Reset the packer. Parameters ---------- image_shape : tuple[int, int], optional The shape of the image in form (width, height). gridsize : int, optional The new gridsize value to use. """
@register_jit() def _bbox_to_gridcells( bbox: tuple[int, int, int, int], row_step: int, col_step: int, n_rows: int, n_cols: int, ) -> list[tuple[int, int]]: x1, y1, x2, y2 = bbox min_row = max(0, math.floor(y1 / row_step)) max_row = min(n_rows, math.ceil(y2 / row_step)) min_col = max(0, math.floor(x1 / col_step)) max_col = min(n_cols, math.ceil(x2 / col_step)) cells: list[tuple[int, int]] = [] for i in range(min_row, max_row): for j in range(min_col, max_col): cells.extend([(i, j)]) return cells @register_jit() def _bboxes_to_gridcells( bboxes: list[tuple[int, int, int, int]], row_step: int, col_step: int, n_rows: int, n_cols: int, ) -> set[tuple[int, int]]: cells: list[tuple[int, int]] = [] for bbox in bboxes: cells.extend(_bbox_to_gridcells(bbox, row_step, col_step, n_rows, n_cols)) return set(cells) @register_jit() def _unpack_grid_single_bbox( bbox: tuple[int, int, int, int], transform: np.ndarray, gridsize: int, ) -> tuple[int, int, int, int]: x1, y1, x2, y2 = bbox # Determine grid coordinates for the packed bounding box n_col = int(((x1 + x2) / 2.0) // gridsize) n_row = int(((y1 + y2) / 2.0) // gridsize) # clamp indices n_row = max(0, min(n_row, transform.shape[0] - 1)) n_col = max(0, min(n_col, transform.shape[1] - 1)) # Retrieve original top-left offset for the grid cell o_x, o_y = transform[n_row][n_col] # Calculate the absolute coordinates in the original image abs_x1 = int(x1 - (n_col * gridsize) + o_x) abs_y1 = int(y1 - (n_row * gridsize) + o_y) abs_x2 = int(x2 - (n_col * gridsize) + o_x) abs_y2 = int(y2 - (n_row * gridsize) + o_y) return abs_x1, abs_y1, abs_x2, abs_y2 @register_jit() def _unpack_grid_bboxes_conf_classid( detections: list[tuple[tuple[int, int, int, int], float, int]], transform: np.ndarray, gridsize: int, ) -> list[tuple[tuple[int, int, int, int], float, int]]: unpacked_dets: list[tuple[tuple[int, int, int, int], float, int]] = [] for bbox, conf, classid in detections: abs_x1, abs_y1, abs_x2, abs_y2 = _unpack_grid_single_bbox( bbox, transform, gridsize, ) # Append the unpacked detection unpacked_dets.append(((abs_x1, abs_y1, abs_x2, abs_y2), conf, classid)) return unpacked_dets @register_jit() def _unpack_grid_bboxes( detections: list[tuple[int, int, int, int]], transform: np.ndarray, gridsize: int, ) -> list[tuple[int, int, int, int]]: unpacked_dets: list[tuple[int, int, int, int]] = [] for bbox in detections: abs_x1, abs_y1, abs_x2, abs_y2 = _unpack_grid_single_bbox( bbox, transform, gridsize, ) # Append the unpacked detection unpacked_dets.append((abs_x1, abs_y1, abs_x2, abs_y2)) return unpacked_dets @register_jit() def _simple_grid_repack( image: np.ndarray, cells: list[tuple[tuple[int, int, int, int], tuple[int, int]]], gridsize: int, ) -> tuple[np.ndarray, np.ndarray]: # need to repack the cells into a new image num_cells = len(cells) dim1 = max(1, math.ceil(math.sqrt(num_cells))) dim2 = max(1, math.ceil(num_cells / dim1)) # allocate new data for the patches new_image: np.ndarray = np.zeros( (dim2 * gridsize, dim1 * gridsize, 3), dtype=np.uint8, ) # copy the old data into new packed image # generate the transforms # new_grids: np.ndarray = np.zeros((self._n_rows, self._n_cols, 2), dtype=int) new_grids: np.ndarray = np.zeros((dim2, dim1, 2), dtype=int) for i, (bbox, _) in enumerate(cells): x1, y1, x2, y2 = bbox # generate the new row/col n_row = math.floor(i / dim1) # Fixed dimension calculation n_col = i % dim1 # generate the new bounding box n_x1 = n_col * gridsize n_x2 = n_x1 + gridsize n_y1 = n_row * gridsize n_y2 = n_y1 + gridsize # generate the offset, same as old coords for x1, y1 offset = (x1, y1) # perform the data copy new_image[n_y1:n_y2, n_x1:n_x2] = image[y1:y2, x1:x2] # save the new grid entry new_grids[n_row, n_col] = offset return new_image, new_grids @register_jit() def _shelf_grid_repack( image: np.ndarray, cells: list[tuple[tuple[int, int, int, int], tuple[int, int]]], gridsize: int, ) -> tuple[np.ndarray, np.ndarray]: # two steps to the smart placement algorithm # STEP 1: identify dense groupings of cells # STEP 2: using shelf based 2d bin packing to repack # ====================== # STEP 1: Grouping cells # ====================== # for each cell, identify it it belongs to a 2x2 square # if 2x2 square all needs to be matched, keep it as a group to_match: set[tuple[int, int]] = {loc for _, loc in cells} matched: set[tuple[int, int]] = set() groups: list[list[tuple[int, int]]] = [] # if we can match this cell, check combos of surrounding cells offset_groups: list[list[tuple[int, int]]] = [ # 2x2 square sampling sets [(0, -1), (-1, 0), (-1, -1)], # down/left [(0, 1), (-1, 0), (-1, 1)], # down/right [(0, 1), (1, 0), (1, 1)], # up/right [(0, -1), (1, 0), (1, -1)], # up/left # 1x2 sampling sets [(0, -1)], [(0, 1)], # 2x1 sampling sets [(-1, 0)], [(1, 0)], ] # construct lookup for bboxes from loc height, width = image.shape[:2] rows = math.ceil(height / gridsize) cols = math.ceil(width / gridsize) lookup: list[list[tuple[int, int, int, int]]] = [ [(0, 0, 0, 0) for _ in range(cols)] for _ in range(rows) ] # iterate over all cells in cells for bbox1, loc1 in cells: # fill out the lookup lookup[loc1[0]][loc1[1]] = bbox1 if loc1 in matched: continue # for each set of offsets making a 2x2 square for offsets in offset_groups: found = 0 # compute the new locations new_locs = [ (loc1[0] + offset[0], loc1[1] + offset[1]) for offset in offsets ] # check if they are valid for new_loc in new_locs: # if new location is not matched and needs to be matched if new_loc not in matched and new_loc in to_match: found += 1 # if we found that all 3 adjacent cells are valid # we have a valid match setup # skip if did not match all 3 if found != len(offsets): continue # make new group from 3 matches new_group = [loc1, *new_locs] groups.append(new_group) for new_loc in new_group: matched.add(new_loc) to_match.remove(new_loc) # do not check anymore groups break # execute the else block if the for loop finished # meaning no group found and no break occured else: groups.append([loc1]) matched.add(loc1) to_match.remove(loc1) # =============================== # STEP 2: Repacking using shelves # =============================== # sort each sub group to maintain right then down (in image coords) # ordering such that simple iteratation will place # cells in correct spots later groups = [sorted(group, key=operator.itemgetter(0, 1)) for group in groups] # post process the groups to sort by overall size # sort in descending order so we greedily allocate # largest first groups.sort(key=len, reverse=True) # assign heuristic value to attempt to hold max dimensions to target_size = math.ceil(math.sqrt(len(cells))) # use shelf packing stragegies # should get fairly good fit # since we have 2 discrete shelf widths, focus on group to shelf fit gap # each entry is [group, height, width] shelves: list[tuple[list[tuple[int, int]], int, int]] = [] # for each group, get first shelf where width fits for group in groups: # compute height/width of group based on corner cell corner = group[0] g_height = 1 g_width = 1 for cell in group: g_height = max(g_height, cell[0] - corner[0] + 1) g_width = max(g_width, cell[1] - corner[1] + 1) # special case where no shelves are allocated yet if len(shelves) == 0: shelves.append( (group, g_height, g_width), ) continue # in this case there is at least one shelf # iterate over the shelves # case 1: # if we find a shelf with equal width to group # and the group fits below target_size in height # add the group to that shelf shelf_id = -1 for s_idx, (_, s_height, s_width) in enumerate(shelves): if g_width != s_width: continue if g_height + s_height > target_size: continue # as soon as we find one, break shelf_id = s_idx break if shelf_id != -1: s_boxes, s_height, s_width = shelves[shelf_id] s_boxes.extend(group) s_height += g_height shelves[shelf_id] = ( s_boxes, s_height, s_width, ) continue # case 2: # could not find a shelf, need to make a new one shelves.append( (group, g_height, g_width), ) # compute the overall dimensions of the new image and grid new_height = 0 # maximum height of any shelf new_width = 0 # sum of all shelf widths for _, s_height, s_width in shelves: new_height = max(new_height, s_height) new_width += s_width # create new image and new grid based on the new dimensions dim1 = max(new_height, 1) dim2 = max(new_width, 1) new_image: np.ndarray = np.zeros( (dim1 * gridsize, dim2 * gridsize, 3), dtype=np.uint8, ) new_grid: np.ndarray = np.zeros((dim1, dim2, 2), dtype=int) # for each shelf, iterate over the cells and add them into grid and image # need to keep a rolling start width, to denote the offset of each shelf frontier = 0 for s_boxes, s_height, s_width in shelves: # iterate over each (height, width) block in # the shelf. Place each cell in that spot and copy for i in range(s_height): for j in range(s_width): # compute box id based on s_width b_idx = i * s_width + j # compute the original coords from loc o_r, o_c = s_boxes[b_idx] x1, y1, x2, y2 = lookup[o_r][o_c] # based on the original image offset update new_Grid n_c = j + frontier new_grid[i][n_c] = (x1, y1) # update the region of new_image with the region of the original image n_y = i * gridsize n_x = n_c * gridsize new_image[n_y : n_y + gridsize, n_x : n_x + gridsize] = image[ y1:y2, x1:x2, ] # once shelf is added, update the frontier (x coord tracker) frontier += s_width # finally return the completed new_image and transform info return new_image, new_grid
[docs] class AbstractGridFramePacker(AbstractFramePacker): """Pack regions of a frame together based on a grid.""" def __init__( self: Self, image_shape: tuple[int, int], gridsize: int = 128, detection_buffer: int = 30, method: str = "shelf", ) -> None: """ Create a new GridFramePacker. Parameters ---------- image_shape : tuple[int, int] The shape of the image in form (width, height). gridsize : int, optional The size of each cell in the overlaid grid. Default is 128. detection_buffer : int, optional The number of frames to consider for detection activity. Used instead of current frame count once frame count exceeds buffer size. Allows more recent detections to have more influence. Default is 30. method : str, optional The method to use for repacking grid cells into new images. By default, 'shelf' Options are: ['simple', 'shelf'] Simple will place tiles of the grid FCFS basis in the new image, while shelf will attempt to place connected regions together. """ super().__init__() self._width, self._height = image_shape self._gridsize = gridsize self._detection_buffer = detection_buffer self._method = method # assign type hints to variables used in initialize_cells self._n_cols: int self._n_rows: int self._col_step: int self._row_step: int self._num_dets: np.ndarray self._cells: np.ndarray self._initialize_cells() # tracking variables self._counter: int = 0
[docs] def reset( self: Self, image_shape: tuple[int, int] | None = None, gridsize: int | None = None, ) -> None: """ Reset the packer. Parameters ---------- image_shape : tuple[int, int], optional The shape of the image in form (width, height). gridsize : int, optional The new gridsize value to use. """ if image_shape: self._width, self._height = image_shape if gridsize: self._gridsize = gridsize self._initialize_cells() self._counter = 0
def _initialize_cells(self: Self) -> None: """Initialize the grid cells and related parameters.""" # num rows/cols self._n_cols = math.ceil(self._width / self._gridsize) self._n_rows = math.ceil(self._height / self._gridsize) # step size between grid cells self._col_step = ( int((self._width - self._gridsize) / (self._n_cols - 1)) if self._n_cols > 1 else self._width ) self._row_step = ( int((self._height - self._gridsize) / (self._n_rows - 1)) if self._n_rows > 1 else self._height ) # detection count info self._num_dets = np.zeros((self._n_rows, self._n_cols), dtype=int) # create the cell setup self._cells = np.zeros((self._n_rows * self._n_cols, 6), dtype=int) index: int = 0 for i in range(self._n_rows): for j in range(self._n_cols): v_off = i * self._row_step h_off = j * self._col_step bbox = ( h_off, v_off, h_off + self._gridsize, v_off + self._gridsize, ) self._cells[index, :4] = bbox self._cells[index, 4:] = (i, j) index += 1 @abstractmethod def _should_explore( self: Self, image: np.ndarray, bbox: tuple[int, int, int, int], row: int, col: int, detections: int, ) -> bool: """ Whether to explore a grid cell based on detection activity. Parameters ---------- image : np.ndarray The image to be packed. bbox : tuple[int, int, int, int] The bounding box of the cell. row : int The row of the cell. col : int The column of the cell. detections : int The number of detections in the cell. Returns ------- bool Whether to explore the cell. """
[docs] def pack( self, image: np.ndarray, exclude: tuple[int, int, int, int] | list[tuple[int, int, int, int]] | None = None, method: str | None = None, ) -> tuple[np.ndarray, np.ndarray]: """ Pack regions of a frame together. Parameters ---------- image : np.ndarray The image to pack. exclude : tuple[int, int, int, int] | list[tuple[int, int, int, int]], optional Regions of the image to exclude from the packing. By default None. method : str, optional The method to pack the bounding boxes with. By default, None Options are: ['simple', 'shelf'] Returns ------- tuple[np.ndarray, np.ndarray] The packed image and the transform information. Raises ------ ValueError If the image shape does not match the packer shape. """ height, width = image.shape[:2] if height != self._height or width != self._width: err_msg = f"Image shape {image.shape} does not match packer shape {self._height, self._width}." raise ValueError(err_msg) # get the excluded cells excluded_cells: set[tuple[int, int]] = set() if exclude is not None and len(exclude) > 0: if isinstance(exclude, tuple): excluded_cells = set( _bbox_to_gridcells( exclude, self._row_step, self._col_step, self._n_rows, self._n_cols, ), ) else: excluded_cells = _bboxes_to_gridcells( exclude, self._row_step, self._col_step, self._n_rows, self._n_cols, ) # get all cells which are not in the excluded region included_cells = [ (x1, y1, x2, y2, r, c) for (x1, y1, x2, y2, r, c) in self._cells if (r, c) not in excluded_cells ] # need to assess if the cells should be included based on detections and NCC filtered_cells: list[tuple[tuple[int, int, int, int], tuple[int, int]]] = [] for x1, y1, x2, y2, row, col in included_cells: bbox = (x1, y1, x2, y2) if self._should_explore( image, bbox, row, col, self._num_dets[row][col], ): filtered_cells.append((bbox, (row, col))) # use the simple grid repacking method = method or self._method if method == "simple": new_image, new_grids = _simple_grid_repack( image, filtered_cells, self._gridsize, ) else: new_image, new_grids = _shelf_grid_repack( image, filtered_cells, self._gridsize, ) # update the image self._prev_image = image self._counter += 1 return new_image, new_grids
[docs] def unpack( self: Self, detections: list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]], transform: np.ndarray, ) -> ( list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] ): """ Unpack regions of a frame. Parameters ---------- detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] The regions to unpack. transform : np.ndarray The transform information generated by the pack method. Returns ------- list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] """ if len(detections) == 0: return [] # get the type of detections passed if len(detections[0]) == 3: return _unpack_grid_bboxes_conf_classid( detections, # type: ignore[arg-type] transform, self._gridsize, ) return _unpack_grid_bboxes(detections, transform, self._gridsize) # type: ignore[arg-type]
[docs] def update( self: Self, detections: list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]], ) -> None: """ Update the packer with new detections. Parameters ---------- detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]] The detections to update the packer with. """ # add det counts to all possible grids with detections for elements in detections: if len(elements) == 3: bbox, _, _ = elements else: bbox = elements # get the grid cells that the bbox intersects grids = _bbox_to_gridcells( bbox, self._row_step, self._col_step, self._n_rows, self._n_cols, ) # increment the detection count for each grid cell for o_row, o_col in grids: if 0 <= o_row < self._n_rows and 0 <= o_col < self._n_cols: self._num_dets[o_row][o_col] += 1 # decrement all counters by 1 self._num_dets = np.maximum(0, self._num_dets - 1)
[docs] class AnnealingFramePacker(AbstractGridFramePacker): """ Pack regions of a frame together based on detection activity. Detections are represented as a list of bounding boxes with scores and class id labels optional. """ def __init__( self: Self, image_shape: tuple[int, int], gridsize: int = 128, alpha: float = 0.01, min_prob: float = 0.1, detection_buffer: int = 30, method: str = "shelf", ) -> None: """ Create a new AnnealingFramePacker. Parameters ---------- image_shape : tuple[int, int] The shape of the image in form (width, height). gridsize : int, optional The size of each cell in the overlaid grid. Default is 128. alpha : float, optional The learning rate for the annealing process. Default is 0.01. min_prob : float, optional The minimum probability for a region to be considered active. Default is 0.1. detection_buffer : int, optional The number of frames to consider for detection activity. Used instead of current frame count once frame count exceeds buffer size. Allows more recent detections to have more influence. Default is 30. method : str, optional The method to use for repacking grid cells into new images. By default, 'shelf' Options are: ['simple', 'shelf'] Simple will place tiles of the grid FCFS basis in the new image, while shelf will attempt to place connected regions together. """ super().__init__(image_shape, gridsize, detection_buffer, method) # specific annealing parameters self._alpha = alpha self._min_prob = min_prob def _should_explore( self: Self, image: np.ndarray, # noqa: ARG002 bbox: tuple[int, int, int, int], # noqa: ARG002 row: int, # noqa: ARG002 col: int, # noqa: ARG002 detections: int, ) -> bool: time_factor = math.exp(-self._alpha * self._counter) detection_factor = min( 1.0, detections / (min(self._counter, self._detection_buffer - 1) + 1), ) explore_probability = max(self._min_prob, time_factor + detection_factor) return random.random() < explore_probability
[docs] class RandomFramePacker(AbstractGridFramePacker): """Pack regions of a frame together randomly.""" def __init__( self: Self, image_shape: tuple[int, int], gridsize: int = 128, threshold: float = 0.1, detection_buffer: int = 30, method: str = "shelf", ) -> None: """ Create a new RandomFramePacker. Parameters ---------- image_shape : tuple[int, int] The shape of the image in form (width, height). gridsize : int, optional The size of each cell in the overlaid grid. Default is 128. threshold : float, optional The threshold for which to randomly explore a grid cell. If the random value is less than the threshold, the cell is explored. Threshold of 0.1 means 10% of the time a cell is explored. Default is 0.1. detection_buffer : int, optional The number of frames to consider for detection activity. Used instead of current frame count once frame count exceeds buffer size. Allows more recent detections to have more influence. Default is 30. method : str, optional The method to use for repacking grid cells into new images. By default, 'shelf' Options are: ['simple', 'shelf'] Simple will place tiles of the grid FCFS basis in the new image, while shelf will attempt to place connected regions together. """ super().__init__(image_shape, gridsize, detection_buffer, method) # specific parameters self._threshold = threshold def _should_explore( self: Self, image: np.ndarray, # noqa: ARG002 bbox: tuple[int, int, int, int], # noqa: ARG002 row: int, # noqa: ARG002 col: int, # noqa: ARG002 detections: int, # noqa: ARG002 ) -> bool: return random.random() < self._threshold