Source code for cv2ext.io._display
# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
from __future__ import annotations
import contextlib
import logging
import time
from queue import Empty, Full, Queue
from threading import Condition, Thread
from typing import TYPE_CHECKING
import cv2
import numpy as np
from cv2ext import _WINDOW_MANAGER
if TYPE_CHECKING:
from types import TracebackType
from typing_extensions import Self
_log = logging.getLogger(__name__)
[docs]
class Display:
"""Class for displaying images using a separate thread."""
def __init__(
self: Self,
windowname: str,
stopkey: str = "q",
nextkey: str | None = None,
buffersize: int = 1,
fps: int | None = None,
*,
show: bool | None = None,
) -> None:
"""
Create a new display.
Parameters
----------
windowname : str
The name of the window to display the images in.
stopkey : str, optional
The key to press to stop the display.
By default, this is "q".
nextkey : str, optional
The key to press to move to the next frame or stop waiting threads.
By default None, so no key will trigger such behavior.
buffersize : int
The size of the buffer for the display.
By default, this is 1.
fps : int | None
The frames per second to display the images at.
If None, the display will be as fast as possible.
By default, this is None.
show : bool | None
If True, the window will be shown.
If False, the window will not be shown.
If None, the window will be shown.
Primarily used for debugging purposes, with show being
False, the display class does not do anything except
store the current image.
"""
if show is None:
show = True
self._windowname = windowname
self._stopkey = stopkey
self._nextkey = nextkey
self._next = Condition()
self._buffersize = buffersize
self._fps = 1 / fps if fps is not None else None
self._show = show
# allocate runtime variables
self._image: np.ndarray = np.zeros((100, 100, 3), dtype=np.uint8)
self._last_image = self._image.copy()
self._frameid = -1 # no frame yet
self._stopped = False
self._running = True
self._queue: Queue[np.ndarray] = Queue(maxsize=self._buffersize)
# thread allocation
_WINDOW_MANAGER.logwindow(self._windowname)
self._thread = Thread(target=self._display, daemon=True)
self._thread.start()
@property
def frame(self: Self) -> np.ndarray:
"""
The most recent frame.
Returns
-------
np.ndarray
The most recent frame.
"""
return self._image
@property
def frameid(self: Self) -> int:
"""
The current frame id.
Returns
-------
int
The current frame id.
"""
return self._frameid
@property
def stopped(self: Self) -> bool:
"""
Whether the stop key has been pressed.
If it has been pressed, this property will be reset.
Should be used for control loops on user side.
Returns
-------
bool
Whether the display is stopped.
"""
val = self._stopped
if val:
self._stopped = False
return val
@property
def is_alive(self: Self) -> bool:
"""
Whether the display thread is running.
Returns
-------
bool
Whether the display is running.
"""
return self._thread.is_alive()
def __call__(self: Self, frame: np.ndarray) -> None:
"""
Update the frame being displayed.
Parameters
----------
frame : np.ndarray
The frame to display.
Raises
------
queue.Full
If timeout is provided, and the queue if full at the end of timeout.
"""
self.update(frame)
def __del__(self: Self) -> None:
self._stop()
def __enter__(self: Self) -> Self:
return self
def __exit__(
self: Self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.stop()
def _display(self: Self) -> None:
if self._show:
cv2.namedWindow(self._windowname, cv2.WINDOW_AUTOSIZE)
# cv2.startWindowThread()
while self._running:
t0 = time.perf_counter()
_log.debug(f"Display {self._windowname} thread starting new loop @ {t0}")
# get frame
image: np.ndarray | None = None
with contextlib.suppress(Empty):
image = self._queue.get(timeout=0.1)
self._last_image = image.copy()
# display image if show
if self._show:
image = image if image is not None else self._last_image
cv2.imshow(self._windowname, image)
keypress = cv2.waitKey(1) & 0xFF
_log.debug(f"Display {self._windowname} received keypress: {keypress}")
if keypress == ord(self._stopkey):
self._stopped = True
continue
if self._nextkey and keypress == ord(self._nextkey):
with self._next:
self._next.notify_all()
# handle rough FPS sync
if self._fps is not None:
t1 = time.perf_counter()
dt = t1 - t0
if dt < self._fps:
time.sleep(self._fps - dt)
# cleanup on thread stop
_log.debug(f"Display {self._windowname} thread stopped")
# if self._show:
# _log.debug(f"Destroying window {self._windowname}")
# cv2.destroyWindow(self._windowname)
# cv2.waitKey(1)
def _stop(self: Self) -> None:
"""Stop the display."""
self._running = False
while self._thread.is_alive():
_log.debug(f"Attempting join for display thread {self._windowname}")
self._thread.join(timeout=0.01)
with contextlib.suppress(RuntimeError), self._next:
self._next.notify_all()
[docs]
def stop(self: Self) -> None:
"""Stop the display."""
self._stop()
[docs]
def update(self: Self, frame: np.ndarray) -> None:
"""
Update the frame being displayed.
Parameters
----------
frame : np.ndarray
The frame to display.
"""
self._image = frame
self._frameid += 1
with contextlib.suppress(Full):
self._queue.put_nowait(frame)
_log.debug(f"Sent frame to dispaly: {self._windowname}")
[docs]
def wait(self: Self, timeout: float | None = None) -> None:
"""
Wait for the next press of nextkey if specified.
If nextkey has not been specified, will instead wait for the timeout
amount. This will mimic a set framerate (although the underlying thread may
still not update as fast.)
Parameters
----------
timeout : float, optional
The maximum amount of time to wait.
"""
if self._nextkey:
with self._next:
self._next.wait(timeout=timeout)
elif timeout:
time.sleep(timeout)