# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# ruff: noqa: S311
from __future__ import annotations
import math
import operator
import random
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import numpy as np
from cv2ext._jit import register_jit
if TYPE_CHECKING:
from typing_extensions import Self
[docs]
class AbstractFramePacker(ABC):
"""
Pack regions of a frame together based on detection activity.
Detections are represented as a list of bounding boxes with
scores and class id labels optional.
"""
[docs]
@abstractmethod
def pack(
self: Self,
image: np.ndarray,
exclude: tuple[int, int, int, int] | list[tuple[int, int, int, int]],
) -> tuple[np.ndarray, np.ndarray]:
"""
Pack regions of a frame together.
Parameters
----------
image : np.ndarray
The image to pack.
exclude : tuple[int, int, int, int] | list[tuple[int, int, int, int]]
Regions of the image to exclude from the packing.
Returns
-------
tuple[np.ndarray, np.ndarray]
The packed image and the transform information.
"""
[docs]
@abstractmethod
def unpack(
self: Self,
detections: list[tuple[int, int, int, int]]
| list[tuple[tuple[int, int, int, int], float, int]],
transform: np.ndarray,
) -> (
list[tuple[int, int, int, int]]
| list[tuple[tuple[int, int, int, int], float, int]]
):
"""
Unpack regions of a frame.
Parameters
----------
detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]]
The regions to unpack.
transform : np.ndarray
The transform information generated by the pack method.
Returns
-------
list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]]
"""
[docs]
@abstractmethod
def update(
self: Self,
detections: list[tuple[int, int, int, int]]
| list[tuple[tuple[int, int, int, int], float, int]],
) -> None:
"""
Update the packer with new detections.
Parameters
----------
detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]]
The detections to update the packer with.
"""
[docs]
@abstractmethod
def reset(
self: Self,
image_shape: tuple[int, int] | None = None,
gridsize: int | None = None,
) -> None:
"""
Reset the packer.
Parameters
----------
image_shape : tuple[int, int], optional
The shape of the image in form (width, height).
gridsize : int, optional
The new gridsize value to use.
"""
@register_jit()
def _bbox_to_gridcells(
bbox: tuple[int, int, int, int],
row_step: int,
col_step: int,
n_rows: int,
n_cols: int,
) -> list[tuple[int, int]]:
x1, y1, x2, y2 = bbox
min_row = max(0, math.floor(y1 / row_step))
max_row = min(n_rows, math.ceil(y2 / row_step))
min_col = max(0, math.floor(x1 / col_step))
max_col = min(n_cols, math.ceil(x2 / col_step))
cells: list[tuple[int, int]] = []
for i in range(min_row, max_row):
for j in range(min_col, max_col):
cells.extend([(i, j)])
return cells
@register_jit()
def _bboxes_to_gridcells(
bboxes: list[tuple[int, int, int, int]],
row_step: int,
col_step: int,
n_rows: int,
n_cols: int,
) -> set[tuple[int, int]]:
cells: list[tuple[int, int]] = []
for bbox in bboxes:
cells.extend(_bbox_to_gridcells(bbox, row_step, col_step, n_rows, n_cols))
return set(cells)
@register_jit()
def _unpack_grid_single_bbox(
bbox: tuple[int, int, int, int],
transform: np.ndarray,
gridsize: int,
) -> tuple[int, int, int, int]:
x1, y1, x2, y2 = bbox
# Determine grid coordinates for the packed bounding box
n_col = int(((x1 + x2) / 2.0) // gridsize)
n_row = int(((y1 + y2) / 2.0) // gridsize)
# clamp indices
n_row = max(0, min(n_row, transform.shape[0] - 1))
n_col = max(0, min(n_col, transform.shape[1] - 1))
# Retrieve original top-left offset for the grid cell
o_x, o_y = transform[n_row][n_col]
# Calculate the absolute coordinates in the original image
abs_x1 = int(x1 - (n_col * gridsize) + o_x)
abs_y1 = int(y1 - (n_row * gridsize) + o_y)
abs_x2 = int(x2 - (n_col * gridsize) + o_x)
abs_y2 = int(y2 - (n_row * gridsize) + o_y)
return abs_x1, abs_y1, abs_x2, abs_y2
@register_jit()
def _unpack_grid_bboxes_conf_classid(
detections: list[tuple[tuple[int, int, int, int], float, int]],
transform: np.ndarray,
gridsize: int,
) -> list[tuple[tuple[int, int, int, int], float, int]]:
unpacked_dets: list[tuple[tuple[int, int, int, int], float, int]] = []
for bbox, conf, classid in detections:
abs_x1, abs_y1, abs_x2, abs_y2 = _unpack_grid_single_bbox(
bbox,
transform,
gridsize,
)
# Append the unpacked detection
unpacked_dets.append(((abs_x1, abs_y1, abs_x2, abs_y2), conf, classid))
return unpacked_dets
@register_jit()
def _unpack_grid_bboxes(
detections: list[tuple[int, int, int, int]],
transform: np.ndarray,
gridsize: int,
) -> list[tuple[int, int, int, int]]:
unpacked_dets: list[tuple[int, int, int, int]] = []
for bbox in detections:
abs_x1, abs_y1, abs_x2, abs_y2 = _unpack_grid_single_bbox(
bbox,
transform,
gridsize,
)
# Append the unpacked detection
unpacked_dets.append((abs_x1, abs_y1, abs_x2, abs_y2))
return unpacked_dets
@register_jit()
def _simple_grid_repack(
image: np.ndarray,
cells: list[tuple[tuple[int, int, int, int], tuple[int, int]]],
gridsize: int,
) -> tuple[np.ndarray, np.ndarray]:
# need to repack the cells into a new image
num_cells = len(cells)
dim1 = max(1, math.ceil(math.sqrt(num_cells)))
dim2 = max(1, math.ceil(num_cells / dim1))
# allocate new data for the patches
new_image: np.ndarray = np.zeros(
(dim2 * gridsize, dim1 * gridsize, 3),
dtype=np.uint8,
)
# copy the old data into new packed image
# generate the transforms
# new_grids: np.ndarray = np.zeros((self._n_rows, self._n_cols, 2), dtype=int)
new_grids: np.ndarray = np.zeros((dim2, dim1, 2), dtype=int)
for i, (bbox, _) in enumerate(cells):
x1, y1, x2, y2 = bbox
# generate the new row/col
n_row = math.floor(i / dim1) # Fixed dimension calculation
n_col = i % dim1
# generate the new bounding box
n_x1 = n_col * gridsize
n_x2 = n_x1 + gridsize
n_y1 = n_row * gridsize
n_y2 = n_y1 + gridsize
# generate the offset, same as old coords for x1, y1
offset = (x1, y1)
# perform the data copy
new_image[n_y1:n_y2, n_x1:n_x2] = image[y1:y2, x1:x2]
# save the new grid entry
new_grids[n_row, n_col] = offset
return new_image, new_grids
@register_jit()
def _shelf_grid_repack(
image: np.ndarray,
cells: list[tuple[tuple[int, int, int, int], tuple[int, int]]],
gridsize: int,
) -> tuple[np.ndarray, np.ndarray]:
# two steps to the smart placement algorithm
# STEP 1: identify dense groupings of cells
# STEP 2: using shelf based 2d bin packing to repack
# ======================
# STEP 1: Grouping cells
# ======================
# for each cell, identify it it belongs to a 2x2 square
# if 2x2 square all needs to be matched, keep it as a group
to_match: set[tuple[int, int]] = {loc for _, loc in cells}
matched: set[tuple[int, int]] = set()
groups: list[list[tuple[int, int]]] = []
# if we can match this cell, check combos of surrounding cells
offset_groups: list[list[tuple[int, int]]] = [
# 2x2 square sampling sets
[(0, -1), (-1, 0), (-1, -1)], # down/left
[(0, 1), (-1, 0), (-1, 1)], # down/right
[(0, 1), (1, 0), (1, 1)], # up/right
[(0, -1), (1, 0), (1, -1)], # up/left
# 1x2 sampling sets
[(0, -1)],
[(0, 1)],
# 2x1 sampling sets
[(-1, 0)],
[(1, 0)],
]
# construct lookup for bboxes from loc
height, width = image.shape[:2]
rows = math.ceil(height / gridsize)
cols = math.ceil(width / gridsize)
lookup: list[list[tuple[int, int, int, int]]] = [
[(0, 0, 0, 0) for _ in range(cols)] for _ in range(rows)
]
# iterate over all cells in cells
for bbox1, loc1 in cells:
# fill out the lookup
lookup[loc1[0]][loc1[1]] = bbox1
if loc1 in matched:
continue
# for each set of offsets making a 2x2 square
for offsets in offset_groups:
found = 0
# compute the new locations
new_locs = [
(loc1[0] + offset[0], loc1[1] + offset[1]) for offset in offsets
]
# check if they are valid
for new_loc in new_locs:
# if new location is not matched and needs to be matched
if new_loc not in matched and new_loc in to_match:
found += 1
# if we found that all 3 adjacent cells are valid
# we have a valid match setup
# skip if did not match all 3
if found != len(offsets):
continue
# make new group from 3 matches
new_group = [loc1, *new_locs]
groups.append(new_group)
for new_loc in new_group:
matched.add(new_loc)
to_match.remove(new_loc)
# do not check anymore groups
break
# execute the else block if the for loop finished
# meaning no group found and no break occured
else:
groups.append([loc1])
matched.add(loc1)
to_match.remove(loc1)
# ===============================
# STEP 2: Repacking using shelves
# ===============================
# sort each sub group to maintain right then down (in image coords)
# ordering such that simple iteratation will place
# cells in correct spots later
groups = [sorted(group, key=operator.itemgetter(0, 1)) for group in groups]
# post process the groups to sort by overall size
# sort in descending order so we greedily allocate
# largest first
groups.sort(key=len, reverse=True)
# assign heuristic value to attempt to hold max dimensions to
target_size = math.ceil(math.sqrt(len(cells)))
# use shelf packing stragegies
# should get fairly good fit
# since we have 2 discrete shelf widths, focus on group to shelf fit gap
# each entry is [group, height, width]
shelves: list[tuple[list[tuple[int, int]], int, int]] = []
# for each group, get first shelf where width fits
for group in groups:
# compute height/width of group based on corner cell
corner = group[0]
g_height = 1
g_width = 1
for cell in group:
g_height = max(g_height, cell[0] - corner[0] + 1)
g_width = max(g_width, cell[1] - corner[1] + 1)
# special case where no shelves are allocated yet
if len(shelves) == 0:
shelves.append(
(group, g_height, g_width),
)
continue
# in this case there is at least one shelf
# iterate over the shelves
# case 1:
# if we find a shelf with equal width to group
# and the group fits below target_size in height
# add the group to that shelf
shelf_id = -1
for s_idx, (_, s_height, s_width) in enumerate(shelves):
if g_width != s_width:
continue
if g_height + s_height > target_size:
continue
# as soon as we find one, break
shelf_id = s_idx
break
if shelf_id != -1:
s_boxes, s_height, s_width = shelves[shelf_id]
s_boxes.extend(group)
s_height += g_height
shelves[shelf_id] = (
s_boxes,
s_height,
s_width,
)
continue
# case 2:
# could not find a shelf, need to make a new one
shelves.append(
(group, g_height, g_width),
)
# compute the overall dimensions of the new image and grid
new_height = 0 # maximum height of any shelf
new_width = 0 # sum of all shelf widths
for _, s_height, s_width in shelves:
new_height = max(new_height, s_height)
new_width += s_width
# create new image and new grid based on the new dimensions
dim1 = max(new_height, 1)
dim2 = max(new_width, 1)
new_image: np.ndarray = np.zeros(
(dim1 * gridsize, dim2 * gridsize, 3),
dtype=np.uint8,
)
new_grid: np.ndarray = np.zeros((dim1, dim2, 2), dtype=int)
# for each shelf, iterate over the cells and add them into grid and image
# need to keep a rolling start width, to denote the offset of each shelf
frontier = 0
for s_boxes, s_height, s_width in shelves:
# iterate over each (height, width) block in
# the shelf. Place each cell in that spot and copy
for i in range(s_height):
for j in range(s_width):
# compute box id based on s_width
b_idx = i * s_width + j
# compute the original coords from loc
o_r, o_c = s_boxes[b_idx]
x1, y1, x2, y2 = lookup[o_r][o_c]
# based on the original image offset update new_Grid
n_c = j + frontier
new_grid[i][n_c] = (x1, y1)
# update the region of new_image with the region of the original image
n_y = i * gridsize
n_x = n_c * gridsize
new_image[n_y : n_y + gridsize, n_x : n_x + gridsize] = image[
y1:y2,
x1:x2,
]
# once shelf is added, update the frontier (x coord tracker)
frontier += s_width
# finally return the completed new_image and transform info
return new_image, new_grid
[docs]
class AbstractGridFramePacker(AbstractFramePacker):
"""Pack regions of a frame together based on a grid."""
def __init__(
self: Self,
image_shape: tuple[int, int],
gridsize: int = 128,
detection_buffer: int = 30,
method: str = "shelf",
) -> None:
"""
Create a new GridFramePacker.
Parameters
----------
image_shape : tuple[int, int]
The shape of the image in form (width, height).
gridsize : int, optional
The size of each cell in the overlaid grid.
Default is 128.
detection_buffer : int, optional
The number of frames to consider for detection activity.
Used instead of current frame count once frame count exceeds buffer size.
Allows more recent detections to have more influence.
Default is 30.
method : str, optional
The method to use for repacking grid cells into new images.
By default, 'shelf'
Options are: ['simple', 'shelf']
Simple will place tiles of the grid FCFS basis in the new image,
while shelf will attempt to place connected regions together.
"""
super().__init__()
self._width, self._height = image_shape
self._gridsize = gridsize
self._detection_buffer = detection_buffer
self._method = method
# assign type hints to variables used in initialize_cells
self._n_cols: int
self._n_rows: int
self._col_step: int
self._row_step: int
self._num_dets: np.ndarray
self._cells: np.ndarray
self._initialize_cells()
# tracking variables
self._counter: int = 0
[docs]
def reset(
self: Self,
image_shape: tuple[int, int] | None = None,
gridsize: int | None = None,
) -> None:
"""
Reset the packer.
Parameters
----------
image_shape : tuple[int, int], optional
The shape of the image in form (width, height).
gridsize : int, optional
The new gridsize value to use.
"""
if image_shape:
self._width, self._height = image_shape
if gridsize:
self._gridsize = gridsize
self._initialize_cells()
self._counter = 0
def _initialize_cells(self: Self) -> None:
"""Initialize the grid cells and related parameters."""
# num rows/cols
self._n_cols = math.ceil(self._width / self._gridsize)
self._n_rows = math.ceil(self._height / self._gridsize)
# step size between grid cells
self._col_step = (
int((self._width - self._gridsize) / (self._n_cols - 1))
if self._n_cols > 1
else self._width
)
self._row_step = (
int((self._height - self._gridsize) / (self._n_rows - 1))
if self._n_rows > 1
else self._height
)
# detection count info
self._num_dets = np.zeros((self._n_rows, self._n_cols), dtype=int)
# create the cell setup
self._cells = np.zeros((self._n_rows * self._n_cols, 6), dtype=int)
index: int = 0
for i in range(self._n_rows):
for j in range(self._n_cols):
v_off = i * self._row_step
h_off = j * self._col_step
bbox = (
h_off,
v_off,
h_off + self._gridsize,
v_off + self._gridsize,
)
self._cells[index, :4] = bbox
self._cells[index, 4:] = (i, j)
index += 1
@abstractmethod
def _should_explore(
self: Self,
image: np.ndarray,
bbox: tuple[int, int, int, int],
row: int,
col: int,
detections: int,
) -> bool:
"""
Whether to explore a grid cell based on detection activity.
Parameters
----------
image : np.ndarray
The image to be packed.
bbox : tuple[int, int, int, int]
The bounding box of the cell.
row : int
The row of the cell.
col : int
The column of the cell.
detections : int
The number of detections in the cell.
Returns
-------
bool
Whether to explore the cell.
"""
[docs]
def pack(
self,
image: np.ndarray,
exclude: tuple[int, int, int, int]
| list[tuple[int, int, int, int]]
| None = None,
method: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Pack regions of a frame together.
Parameters
----------
image : np.ndarray
The image to pack.
exclude : tuple[int, int, int, int] | list[tuple[int, int, int, int]], optional
Regions of the image to exclude from the packing.
By default None.
method : str, optional
The method to pack the bounding boxes with.
By default, None
Options are: ['simple', 'shelf']
Returns
-------
tuple[np.ndarray, np.ndarray]
The packed image and the transform information.
Raises
------
ValueError
If the image shape does not match the packer shape.
"""
height, width = image.shape[:2]
if height != self._height or width != self._width:
err_msg = f"Image shape {image.shape} does not match packer shape {self._height, self._width}."
raise ValueError(err_msg)
# get the excluded cells
excluded_cells: set[tuple[int, int]] = set()
if exclude is not None and len(exclude) > 0:
if isinstance(exclude, tuple):
excluded_cells = set(
_bbox_to_gridcells(
exclude,
self._row_step,
self._col_step,
self._n_rows,
self._n_cols,
),
)
else:
excluded_cells = _bboxes_to_gridcells(
exclude,
self._row_step,
self._col_step,
self._n_rows,
self._n_cols,
)
# get all cells which are not in the excluded region
included_cells = [
(x1, y1, x2, y2, r, c)
for (x1, y1, x2, y2, r, c) in self._cells
if (r, c) not in excluded_cells
]
# need to assess if the cells should be included based on detections and NCC
filtered_cells: list[tuple[tuple[int, int, int, int], tuple[int, int]]] = []
for x1, y1, x2, y2, row, col in included_cells:
bbox = (x1, y1, x2, y2)
if self._should_explore(
image,
bbox,
row,
col,
self._num_dets[row][col],
):
filtered_cells.append((bbox, (row, col)))
# use the simple grid repacking
method = method or self._method
if method == "simple":
new_image, new_grids = _simple_grid_repack(
image,
filtered_cells,
self._gridsize,
)
else:
new_image, new_grids = _shelf_grid_repack(
image,
filtered_cells,
self._gridsize,
)
# update the image
self._prev_image = image
self._counter += 1
return new_image, new_grids
[docs]
def unpack(
self: Self,
detections: list[tuple[int, int, int, int]]
| list[tuple[tuple[int, int, int, int], float, int]],
transform: np.ndarray,
) -> (
list[tuple[int, int, int, int]]
| list[tuple[tuple[int, int, int, int], float, int]]
):
"""
Unpack regions of a frame.
Parameters
----------
detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]]
The regions to unpack.
transform : np.ndarray
The transform information generated by the pack method.
Returns
-------
list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]]
"""
if len(detections) == 0:
return []
# get the type of detections passed
if len(detections[0]) == 3:
return _unpack_grid_bboxes_conf_classid(
detections, # type: ignore[arg-type]
transform,
self._gridsize,
)
return _unpack_grid_bboxes(detections, transform, self._gridsize) # type: ignore[arg-type]
[docs]
def update(
self: Self,
detections: list[tuple[int, int, int, int]]
| list[tuple[tuple[int, int, int, int], float, int]],
) -> None:
"""
Update the packer with new detections.
Parameters
----------
detections : list[tuple[int, int, int, int]] | list[tuple[tuple[int, int, int, int], float, int]]
The detections to update the packer with.
"""
# add det counts to all possible grids with detections
for elements in detections:
if len(elements) == 3:
bbox, _, _ = elements
else:
bbox = elements
# get the grid cells that the bbox intersects
grids = _bbox_to_gridcells(
bbox,
self._row_step,
self._col_step,
self._n_rows,
self._n_cols,
)
# increment the detection count for each grid cell
for o_row, o_col in grids:
if 0 <= o_row < self._n_rows and 0 <= o_col < self._n_cols:
self._num_dets[o_row][o_col] += 1
# decrement all counters by 1
self._num_dets = np.maximum(0, self._num_dets - 1)
[docs]
class AnnealingFramePacker(AbstractGridFramePacker):
"""
Pack regions of a frame together based on detection activity.
Detections are represented as a list of bounding boxes with
scores and class id labels optional.
"""
def __init__(
self: Self,
image_shape: tuple[int, int],
gridsize: int = 128,
alpha: float = 0.01,
min_prob: float = 0.1,
detection_buffer: int = 30,
method: str = "shelf",
) -> None:
"""
Create a new AnnealingFramePacker.
Parameters
----------
image_shape : tuple[int, int]
The shape of the image in form (width, height).
gridsize : int, optional
The size of each cell in the overlaid grid.
Default is 128.
alpha : float, optional
The learning rate for the annealing process.
Default is 0.01.
min_prob : float, optional
The minimum probability for a region to be considered active.
Default is 0.1.
detection_buffer : int, optional
The number of frames to consider for detection activity.
Used instead of current frame count once frame count exceeds buffer size.
Allows more recent detections to have more influence.
Default is 30.
method : str, optional
The method to use for repacking grid cells into new images.
By default, 'shelf'
Options are: ['simple', 'shelf']
Simple will place tiles of the grid FCFS basis in the new image,
while shelf will attempt to place connected regions together.
"""
super().__init__(image_shape, gridsize, detection_buffer, method)
# specific annealing parameters
self._alpha = alpha
self._min_prob = min_prob
def _should_explore(
self: Self,
image: np.ndarray, # noqa: ARG002
bbox: tuple[int, int, int, int], # noqa: ARG002
row: int, # noqa: ARG002
col: int, # noqa: ARG002
detections: int,
) -> bool:
time_factor = math.exp(-self._alpha * self._counter)
detection_factor = min(
1.0,
detections / (min(self._counter, self._detection_buffer - 1) + 1),
)
explore_probability = max(self._min_prob, time_factor + detection_factor)
return random.random() < explore_probability
[docs]
class RandomFramePacker(AbstractGridFramePacker):
"""Pack regions of a frame together randomly."""
def __init__(
self: Self,
image_shape: tuple[int, int],
gridsize: int = 128,
threshold: float = 0.1,
detection_buffer: int = 30,
method: str = "shelf",
) -> None:
"""
Create a new RandomFramePacker.
Parameters
----------
image_shape : tuple[int, int]
The shape of the image in form (width, height).
gridsize : int, optional
The size of each cell in the overlaid grid.
Default is 128.
threshold : float, optional
The threshold for which to randomly explore a grid cell.
If the random value is less than the threshold, the cell is explored.
Threshold of 0.1 means 10% of the time a cell is explored.
Default is 0.1.
detection_buffer : int, optional
The number of frames to consider for detection activity.
Used instead of current frame count once frame count exceeds buffer size.
Allows more recent detections to have more influence.
Default is 30.
method : str, optional
The method to use for repacking grid cells into new images.
By default, 'shelf'
Options are: ['simple', 'shelf']
Simple will place tiles of the grid FCFS basis in the new image,
while shelf will attempt to place connected regions together.
"""
super().__init__(image_shape, gridsize, detection_buffer, method)
# specific parameters
self._threshold = threshold
def _should_explore(
self: Self,
image: np.ndarray, # noqa: ARG002
bbox: tuple[int, int, int, int], # noqa: ARG002
row: int, # noqa: ARG002
col: int, # noqa: ARG002
detections: int, # noqa: ARG002
) -> bool:
return random.random() < self._threshold