Source code for cv2ext.cli.convert_annotations._convert

# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# ruff: noqa: S311
from __future__ import annotations

import argparse
import csv
import json
import random
from pathlib import Path

import cv2

from cv2ext import IterableVideo


def _read_csv(
    csvfile: Path,
) -> tuple[list[list[tuple[int, int, int, int]]], str]:
    bboxes: list[list[tuple[int, int, int, int]]] = []
    formatstr = "xywh"
    with Path(csvfile).open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            frame = int(row["frame"])
            if len(bboxes) <= frame:
                bboxes.append([])
            try:
                x = int(row["x"])
                y = int(row["y"])
                w = int(row["w"])
                h = int(row["h"])
                formatstr = "xywh"
                bboxes[frame].append((x, y, w, h))
            except KeyError:
                x1 = int(row["x1"])
                y1 = int(row["y1"])
                x2 = int(row["x2"])
                y2 = int(row["y2"])
                formatstr = "xyxy"
                bboxes[frame].append((x1, y1, x2, y2))
    return bboxes, formatstr


def _read_json(
    jsonfile: Path,
) -> tuple[list[list[tuple[int, int, int, int]]], str]:
    with Path(jsonfile).open("r", encoding="utf-8") as f:
        dictdata: dict[str, dict[str, dict[str, int]]] = json.load(f)
    bboxes: list[list[tuple[int, int, int, int]]] = []
    formatstr = "xywh"
    found_format = False
    for bids in dictdata.values():
        local_bboxes: list[tuple[int, int, int, int]] = []
        for bbox in bids.values():
            if not found_format:
                if "x1" in bbox:
                    formatstr = "xyxy"
                found_format = True
            if formatstr == "xywh":
                x = bbox["x"]
                y = bbox["y"]
                w = bbox["w"]
                h = bbox["h"]
                formatstr = "xywh"
                local_bboxes.append((x, y, w, h))
            else:
                x1 = bbox["x1"]
                y1 = bbox["y1"]
                x2 = bbox["x2"]
                y2 = bbox["y2"]
                formatstr = "xyxy"
                local_bboxes.append((x1, y1, x2, y2))
        bboxes.append(local_bboxes)
    return bboxes, formatstr


def _write_yolo(
    bboxes: list[list[tuple[int, int, int, int]]],
    output_dir: Path,
    input_video: Path,
    formatstr: str,
    split: float = 0.8,
    classid: int = 0,
    classname: str = "object",
) -> None:
    if not output_dir.exists():
        output_dir.mkdir(parents=True)
    else:
        err_msg = f"Output directory {output_dir} already exists."
        raise FileExistsError(err_msg)
    val_dir = output_dir / "val"
    train_dir = output_dir / "train"
    val_image_dir = val_dir / "images"
    train_image_dir = train_dir / "images"
    val_label_dir = val_dir / "labels"
    train_label_dir = train_dir / "labels"
    val_image_dir.mkdir(parents=True)
    train_image_dir.mkdir(parents=True)
    val_label_dir.mkdir(parents=True)
    train_label_dir.mkdir(parents=True)

    vidname = input_video.stem

    ids = [idx for idx, b in enumerate(bboxes) if len(b) > 0]
    val_ids = random.choices(ids, k=int(len(ids) * (1 - split)))

    for boxes, (fidx, frame) in zip(bboxes, IterableVideo(input_video)):
        if len(boxes) == 0:
            continue
        imagename = f"{vidname}_{fidx}.jpg"
        labelname = f"{vidname}_{fidx}.txt"

        imagepath = train_image_dir / imagename
        labelpath = train_label_dir / labelname
        if fidx in val_ids:
            imagepath = val_image_dir / imagename
            labelpath = val_label_dir / labelname

        cv2.imwrite(str(imagepath), frame)
        with labelpath.open("w") as f:
            for box in boxes:
                x1, y1, e1, e2 = box
                if formatstr == "xyxy":
                    w1 = e1 - x1
                    h1 = e2 - y1
                else:
                    w1 = e1
                    h1 = e2
                x = x1 / frame.shape[1]
                y = y1 / frame.shape[0]
                w = w1 / frame.shape[1]
                h = h1 / frame.shape[0]
                f.write(f"{classid} {x} {y} {w} {h}\n")

    yamlpath = output_dir / "data.yaml"
    with yamlpath.open("r") as f:
        f.write(f"train: {train_image_dir!s}\n")
        f.write(f"val: {val_image_dir!s}\n")
        f.write("nc: 1\n")
        f.write(f"names: ['{classname}']\n")


[docs] def convert_annotations_cli() -> None: parser = argparse.ArgumentParser( description="Convert annotations to other formats.", ) parser.add_argument( "--input", required=True, type=Path, help="The input file to convert.", ) parser.add_argument( "--format", required=True, type=str, options=["yolo"], help="The format to convert to.", ) parser.add_argument( "--output_dir", required=True, type=Path, default=None, help="The output file to write.", ) parser.add_argument( "--input_video", type=Path, default=None, help="The input video file used for annotations.", ) parser.add_argument( "--split", type=float, default=0.8, help="The split ratio for the train/test split.", ) parser.add_argument( "--classid", type=int, default=0, help="The class id to use for the annotations.", ) parser.add_argument( "--classname", type=str, default="object", help="The class name to use.", ) args = parser.parse_args() inputfile = args.input if not inputfile.exists(): err_msg = f"Input file {inputfile} does not exist." raise FileNotFoundError(err_msg) bboxes: list[list[tuple[int, int, int, int]]] formatstr: str = "xyxy" if inputfile.suffix == ".csv": bboxes, formatstr = _read_csv(inputfile) elif inputfile.suffix == ".json": bboxes, formatstr = _read_json(inputfile) else: err_msg = f"Unknown file type: {inputfile.suffix}" raise ValueError(err_msg) if args.format == "yolo": if args.output_dir is None: err_msg = "Output directory is required for YOLO format." raise ValueError(err_msg) if args.input_video is None: err_msg = ( "Input video is for annotation source is required for YOLO format." ) raise ValueError(err_msg) _write_yolo( bboxes, args.output_dir, args.input_video, formatstr, args.split, args.classid, args.classname, ) else: err_msg = f"Unknown format for conversion: {args.format}" raise ValueError(err_msg)