# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
from __future__ import annotations
import contextlib
import logging
from pathlib import Path
from queue import Empty, Full, Queue
from threading import Thread
import cv2
import numpy as np
from typing_extensions import Self
_log = logging.getLogger(__name__)
[docs]
class IterableVideo:
def __init__(
self: Self,
filename: Path | str | int,
channels: int = 3,
buffersize: int = 8,
*,
use_thread: bool | None = None,
) -> None:
"""
Create a new instance of the video.
Parameters
----------
filename : Path | str | int
Path to the video file or device number.
channels : int
The number of channels in the video.
This defaults to 3, and is used to pre-allocate a frame,
such that the first frame is not empty.
Use 1 for grayscale videos.
buffersize : int
The size of the buffer for the thread.
This is only used if `use_thread` is True.
Defaults to 8.
use_thread : bool
If True, the frames will be loaded in a separate thread.
This can help speedup iteration times.
Defaults to None, in which case the thread is used.
Raises
------
FileNotFoundError
If the file does not exist.
Examples
--------
>>> from cv2ext.io import IterableVideo
>>> video = IterableVideo("video.mp4")
>>> for i, frame in video:
... print(f"Frame {i} has {frame.shape} shape")
>>> video.stop()
"""
# resolve path to a string
if isinstance(filename, Path):
filename = str(filename.resolve())
# only check path if it is a path
if isinstance(filename, str) and not Path(filename).exists():
err_msg = f"File {filename} does not exist."
raise FileNotFoundError(err_msg)
# it is called filename, but may be interger
self._cap = cv2.VideoCapture(filename)
# assign rest of attributes
self._frame_num = 0
self._consumed = 0
self._got = False
self._length = int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT))
self._fps = float(self._cap.get(cv2.CAP_PROP_FPS))
self._width = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self._height = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self._channels = channels
self._buffersize = buffersize
self._frame: np.ndarray = np.zeros(
(self._height, self._width, self._channels),
dtype=np.uint8,
)
# info for the thread
if use_thread is None:
use_thread = True
self._thread_loads = use_thread
if self._thread_loads:
self._thread = Thread(target=self._run, daemon=True)
self._queue: Queue[tuple[int, bool, np.ndarray]] = Queue(
maxsize=self._buffersize,
)
self._closed = False
self._thread.start()
def _run(self: Self) -> None:
"""Read the VideoCapture object."""
while not self._closed:
if self._frame_num == self._length:
break
got, frame = self._cap.read()
if not got:
self._queue.put(
(
self._frame_num,
False,
np.zeros(
(self._height, self._width, self._channels),
dtype=np.uint8,
),
),
)
break
while not self._closed:
with contextlib.suppress(Full):
self._queue.put((self._frame_num, got, frame), timeout=0.1)
self._frame_num += 1
break
if self._closed:
return
self._closed = True
@property
def frame(self: Self) -> np.ndarray:
"""
Get the current frame.
When using threading this value will be out of sync from the iterator.
Returns
-------
numpy.ndarray
The current frame.
"""
return self._frame
@property
def frame_num(self: Self) -> int:
"""
Get the current frame number.
When using threading this value will be out of sync from the iterator.
Returns
-------
int
The current frame number.
"""
return self._frame_num
@property
def success(self: Self) -> bool:
"""
Get the success of the last frame read.
When using threading this value will be out of sync from the iterator.
Returns
-------
bool
True if the frame was successfully loaded.
"""
return self._got
@property
def length(self: Self) -> int:
"""
Get the length of the video.
Returns
-------
int
The number of frames in the video.
"""
return self._length
@property
def fps(self: Self) -> float:
"""
Get the frames per second of the video.
Returns
-------
float
The frames per second of the video.
"""
return self._fps
@property
def size(self: Self) -> tuple[int, int]:
"""
Get the size of the video.
Returns
-------
tuple
The width and height of the video.
"""
return (self._width, self._height)
@property
def channels(self: Self) -> int:
self._consumed = 0
"""
Get the number of channels in the video.
Returns
-------
int
The number of channels in the video.
"""
return self._channels
@property
def width(self: Self) -> int:
"""
Get the width of the video.
Returns
-------
int
The width of the video.
"""
return self._width
@property
def height(self: Self) -> int:
"""
Get the height of the video.
Returns
-------
int
The height of the video.
"""
return self._height
def __len__(self: Self) -> int:
"""
Get the length of the video.
Returns
-------
int
The number of frames in the video.
"""
return self.length
def __iter__(self: Self) -> Self:
"""
Get the iterator.
Returns
-------
IterableVideo
The current instance.
"""
return self
def __next__(self: Self) -> tuple[int, np.ndarray]:
"""
Read the next frame from the video.
Returns
-------
bool
True if the frame was successfully loaded.
numpy.ndarray
The current frame.
Raises
------
StopIteration
If the video has ended
"""
if not self._thread_loads:
self._got, self._frame = self._cap.read()
num = self._frame_num
self._frame_num += 1
if not self._got:
self._stop()
raise StopIteration
return num, self._frame
# otherwise use threading
if self._consumed == self._length:
self._stop()
raise StopIteration
num, got, frame = self._queue.get()
self._consumed += 1
if not got:
self._stop()
raise StopIteration
return num, frame
def _stop(self: Self) -> None:
"""Stop the video."""
if self._thread_loads:
self._closed = True
for _ in range(self._buffersize):
with contextlib.suppress(Empty):
self._queue.get_nowait()
self._thread.join()
self._cap.release()
else:
self._cap.release()
[docs]
def stop(self: Self) -> None:
"""Stop the video."""
self._stop()
[docs]
def read(self: Self) -> tuple[bool, np.ndarray]:
"""
Read the next frame from the video.
Returns
-------
bool
True if the frame was successfully loaded.
numpy.ndarray
The current frame.
"""
if not self._thread_loads:
self._got, self._frame = self._cap.read()
self._frame_num += 1
return self._got, self._frame
# otherwise use threading
try:
_, frame = next(self)
except StopIteration:
return False, np.zeros(
(self._height, self._width, self._channels),
dtype=np.uint8,
)
else:
return True, frame