diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..c02ae14 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,6 @@ +charset = utf-8 +indent_size = 4 +indent_style = space +insert_final_newline = true +root = true +trim_trailing_whitespace = true diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py index a74aec0..ee5b55e 100644 --- a/mjpeg_streamer/__init__.py +++ b/mjpeg_streamer/__init__.py @@ -1,4 +1,5 @@ -from .mjpeg_streamer import MjpegServer, Stream +from .server import MjpegServer, Server +from .stream import ManagedStream, Stream, StreamBase -__all__ = ["MjpegServer", "Stream"] +__all__ = ["StreamBase", "ManagedStream", "MjpegServer", "Stream", "Server"] __version__ = "2024.2.8" diff --git a/mjpeg_streamer/cli.py b/mjpeg_streamer/cli.py index 5f70a4d..f8b777c 100644 --- a/mjpeg_streamer/cli.py +++ b/mjpeg_streamer/cli.py @@ -1,11 +1,10 @@ import argparse import re -import threading -from typing import List, Tuple, Union +import time +from typing import Dict, List, Tuple, Union -import cv2 - -from mjpeg_streamer import MjpegServer, Stream +from .server import Server +from .stream import ManagedStream def parse_args() -> argparse.Namespace: @@ -13,7 +12,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--host", type=str, default="localhost") parser.add_argument("--port", type=int, default=8080) parser.add_argument( - "--prefix", type=str, default="source", help="Name prefix for the streams" + "--prefix", type=str, default="", help="Name prefix for the streams" ) parser.add_argument( "--source", @@ -34,84 +33,58 @@ def parse_args() -> argparse.Namespace: ) args: argparse.Namespace = parser.parse_args() args.prefix = re.sub("[^0-9a-zA-Z]+", "_", args.prefix) - args.source: List[Union[int, str],] = [[0]] if args.source is None else args.source + args.source = [[0]] if args.source is None else args.source args.source = [item for sublist in args.source for item in sublist] args.source = list(set(args.source)) return args -def run( - cap: cv2.VideoCapture, - stream: Stream, - stop_event: threading.Event, - show_bandwidth: bool, -) -> None: - while not stop_event.is_set(): - ret, frame = cap.read() - if not ret: - stop_event.set() - break - stream.set_frame(frame) - if show_bandwidth: - global bandwidth - bandwidth[stream.name] = stream.get_bandwidth() - cap.release() - - def main() -> None: args = parse_args() size: Tuple[int, int] = (args.width, args.height) - server = MjpegServer(args.host, args.port) - threads: List[threading.Thread,] = [] - stop_events: List[threading.Event,] = [] + streams: List[ManagedStream] = [] + server = Server(args.host, args.port) if args.show_bandwidth: - global bandwidth - bandwidth = {} # dict[str, int] + bandwidth: Dict[str, int] = {} for source in args.source: source: Union[int, str] = int(source) if str(source).isdigit() else source - cap = cv2.VideoCapture(source) source_display = ( re.sub("[^0-9a-zA-Z]+", "_", source) if isinstance(source, str) else source ) - stream = Stream( - f"{args.prefix}_{source_display!s}", + stream = ManagedStream( + f"{args.prefix}{'_' if args.prefix else ''}{source_display!s}", + source=source, size=size, quality=args.quality, fps=args.fps, ) server.add_stream(stream) - stop_event = threading.Event() - stop_events.append(stop_event) - thread = threading.Thread( - target=run, args=(cap, stream, stop_event, args.show_bandwidth) - ) - threads.append(thread) + streams.append(stream) try: - for thread in threads: - thread.start() + for stream in streams: + stream.start() server.start() while True: if args.show_bandwidth: + for stream in streams: + bandwidth[stream.name] = stream.get_bandwidth() print( f"{' | '.join([f'{k}: {round(v / 1024, 2)} KB/s' for k, v in bandwidth.items()])}", end="\r", ) + else: + time.sleep(1) # Keep the main thread alive, but don't consume CPU except KeyboardInterrupt: - for stop_event in stop_events: - stop_event.set() - server.stop() - for thread in threads: - thread.join() + print("\nExiting...") except Exception as e: - print(e) - for stop_event in stop_events: - stop_event.set() + print("Error:", e) + finally: + for stream in streams: + stream.stop() server.stop() - for thread in threads: - thread.join() if __name__ == "__main__": diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py deleted file mode 100644 index fdc5859..0000000 --- a/mjpeg_streamer/mjpeg_streamer.py +++ /dev/null @@ -1,164 +0,0 @@ -import asyncio -import threading -import time -from collections import deque -from typing import List, Optional, Tuple, Union - -import aiohttp -import cv2 -import netifaces -import numpy as np -from aiohttp import MultipartWriter, web -from aiohttp.web_runner import GracefulExit - - -class Stream: - def __init__( - self, - name: str, - size: Optional[Tuple[int, int]] = None, - quality: int = 50, - fps: int = 30, - ) -> None: - self.name = name.lower().casefold().replace(" ", "_") - self.size = size - self.quality = max(1, min(quality, 100)) - self.fps = fps - self._frame = np.zeros((320, 240, 1), dtype=np.uint8) - self._lock = asyncio.Lock() - self._byte_frame_window = deque(maxlen=30) - self._bandwidth_last_modified_time = time.time() - - def set_frame(self, frame: np.ndarray) -> None: - self._frame = frame - - def get_bandwidth(self) -> float: - if ( - len(self._byte_frame_window) > 0 - and time.time() - self._bandwidth_last_modified_time >= 1 - ): - deque.clear(self._byte_frame_window) - return sum(self._byte_frame_window) - - def __process_current_frame(self) -> np.ndarray: - frame = cv2.resize( - self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) - ) - val, frame = cv2.imencode( - ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] - ) - if not val: - raise ValueError("Error encoding frame") - self._byte_frame_window.append(len(frame.tobytes())) - self._bandwidth_last_modified_time = time.time() - return frame - - async def get_frame(self) -> np.ndarray: - async with self._lock: - return self._frame - - async def get_frame_processed(self) -> np.ndarray: - async with self._lock: - return self.__process_current_frame() - - -class _StreamHandler: - def __init__(self, stream: Stream) -> None: - self._stream = stream - - async def __call__(self, request: web.Request) -> web.StreamResponse: - response = web.StreamResponse( - status=200, - reason="OK", - headers={ - "Content-Type": "multipart/x-mixed-replace;boundary=image-boundary" - }, - ) - await response.prepare(request) - - while True: - await asyncio.sleep(1 / self._stream.fps) - frame = await self._stream.get_frame_processed() - with MultipartWriter("image/jpeg", boundary="image-boundary") as mpwriter: - mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"}) - try: - await mpwriter.write(response, close_boundary=False) - except (ConnectionResetError, ConnectionAbortedError): - return web.Response(status=499, text="Client closed the connection") - await response.write(b"\r\n") - - -class MjpegServer: - def __init__(self, host: Union[str, int] = "localhost", port: int = 8080) -> None: - if isinstance(host, str) and host != "0.0.0.0": - self._host = [host] - elif isinstance(host, list): - if "0.0.0.0" in host: - host.remove("0.0.0.0") - host = host + [ - netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] - for iface in netifaces.interfaces() - if netifaces.AF_INET in netifaces.ifaddresses(iface) - ] - self._host = list(dict.fromkeys(host)) - else: - self._host = [ - netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] - for iface in netifaces.interfaces() - if netifaces.AF_INET in netifaces.ifaddresses(iface) - ] - self._port = port - self._app = web.Application() - self._app.is_running = False - self._cap_routes: List[str,] = [] - - def is_running(self) -> bool: - return self._app.is_running - - async def __root_handler(self, _) -> web.Response: - text = "

Available streams:

" - for route in self._cap_routes: - text += f"{route}\n
\n" - return aiohttp.web.Response(text=text, content_type="text/html") - - def add_stream(self, stream: Stream) -> None: - if self.is_running(): - raise RuntimeError("Cannot add stream after the server has started") - route = f"/{stream.name}" - if route in self._cap_routes: - raise ValueError(f"A stream with the name {route} already exists") - self._cap_routes.append(route) - self._app.router.add_route("GET", route, _StreamHandler(stream)) - - def __start_func(self) -> None: - self._app.router.add_route("GET", "/", self.__root_handler) - runner = web.AppRunner(self._app) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(runner.setup()) - site = web.TCPSite(runner, self._host, self._port) - loop.run_until_complete(site.start()) - loop.run_forever() - - def start(self) -> None: - if not self.is_running(): - thread = threading.Thread(target=self.__start_func, daemon=True) - thread.start() - self._app.is_running = True - else: - print("\nServer is already running\n") - - for addr in self._host: - print(f"\nStreams index: http://{addr}:{self._port!s}") - print("Available streams:\n") - for route in self._cap_routes: # route has a leading slash - print(f"http://{addr}:{self._port!s}{route}") - print("--------------------------------\n") - print("\nPress Ctrl+C to stop the server\n") - - def stop(self) -> None: - if self.is_running(): - self._app.is_running = False - print("\nStopping...\n") - raise GracefulExit - print("\nServer is not running\n") diff --git a/mjpeg_streamer/server.py b/mjpeg_streamer/server.py new file mode 100644 index 0000000..3f2901f --- /dev/null +++ b/mjpeg_streamer/server.py @@ -0,0 +1,131 @@ +import asyncio +import threading +from typing import List, Union + +import aiohttp +from aiohttp import MultipartWriter, web +from aiohttp.web_runner import GracefulExit +from multidict import MultiDict + +from .stream import StreamBase + + +class _StreamHandler: + def __init__(self, stream: StreamBase) -> None: + self._stream = stream + + async def __call__(self, request: web.Request) -> web.StreamResponse: + viewer_token = request.cookies.get("viewer_token") + response = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": "multipart/x-mixed-replace;boundary=image-boundary" + }, + ) + try: + await response.prepare(request) + except (ConnectionResetError, ConnectionAbortedError, ConnectionError): + pass + if not viewer_token: + viewer_token = await self._stream._add_viewer() + response.set_cookie("viewer_token", viewer_token) + elif viewer_token not in self._stream._active_viewers: + await self._stream._add_viewer(viewer_token) + try: + while True: + try: + await asyncio.sleep(1 / self._stream.fps) + frame = await self._stream._get_frame() + with MultipartWriter( + "image/jpeg", boundary="image-boundary" + ) as mpwriter: + mpwriter.append( + frame.tobytes(), + MultiDict({"Content-Type": "image/jpeg"}), + ) + await mpwriter.write(response, close_boundary=False) + await response.write(b"\r\n") + except (ConnectionResetError, ConnectionAbortedError, ConnectionError): + break + finally: + await self._stream._remove_viewer(viewer_token) + return response + + +class Server: + def __init__( + self, host: Union[str, List[str,]] = "localhost", port: int = 8080 + ) -> None: + if isinstance(host, str): + self._host: List[str,] = [ + host, + ] + elif isinstance(host, list): + if "0.0.0.0" in host: + host = ["0.0.0.0"] + if "localhost" in host and "127.0.0.1" in host: + host.remove("localhost") + self._host = list(set(host)) + self._port = port + self._app: web.Application = web.Application() + self._app_is_running: bool = False + self._cap_routes: List[str,] = [] + + def is_running(self) -> bool: + return self._app_is_running + + async def __root_handler(self, _) -> web.Response: + text = "

Available streams:

" + for route in self._cap_routes: + text += f"{route}\n
\n" + return aiohttp.web.Response(text=text, content_type="text/html") + + def add_stream(self, stream: StreamBase) -> None: + if self.is_running(): + raise RuntimeError("Cannot add stream after the server has started") + route = f"/{stream.name}" + if route in self._cap_routes: + raise ValueError(f"A stream with the name {route} already exists") + self._cap_routes.append(route) + self._app.router.add_route("GET", route, _StreamHandler(stream)) + + def __start_func(self) -> None: + self._app.router.add_route("GET", "/", self.__root_handler) + runner = web.AppRunner(self._app) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(runner.setup()) + site = web.TCPSite(runner, self._host, self._port) + loop.run_until_complete(site.start()) + loop.run_forever() + + def start(self) -> None: + if not self.is_running(): + thread = threading.Thread(target=self.__start_func, daemon=True) + thread.start() + self._app_is_running = True + else: + print("\nServer is already running\n") + + for addr in self._host: + print(f"\nStreams index: http://{addr}:{self._port!s}") + print("Available streams:\n") + for route in self._cap_routes: # route has a leading slash + print(f"http://{addr}:{self._port!s}{route}") + print("--------------------------------\n") + print("\nPress Ctrl+C to stop the server\n") + + def stop(self) -> None: + if self.is_running(): + self._app_is_running = False + print("\nStopping...\n") + GracefulExit() + print("\nServer stopped\n") + else: + print("\nServer is not running\n") + + +class MjpegServer(Server): + # Alias for Server, to maintain backwards compatibility + pass diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py new file mode 100644 index 0000000..894bc02 --- /dev/null +++ b/mjpeg_streamer/stream.py @@ -0,0 +1,272 @@ +import asyncio +import time +import uuid +from collections import deque +from typing import Deque, Dict, List, Optional, Set, Tuple, Union + +import cv2 +import numpy as np + + +class StreamBase: + def __init__( + self, + name: str, + fps: int = 30, + ) -> None: + self.name = name.casefold().replace(" ", "_") + self.fps = fps + self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) + self._last_processed_frame: np.ndarray = cv2.imencode( + ".jpg", self._frame, [cv2.IMWRITE_JPEG_QUALITY, 1] + )[1] + self._lock: asyncio.Lock = asyncio.Lock() + self._frames_buffer: Deque[int] = deque(maxlen=fps) + self._bandwidth_last_modified_time: float = time.time() + self._active_viewers: Set[str] = set() + self._tasks: Dict[str, asyncio.Task] = {"_clear_bandwidth": None} + + async def _ensure_background_tasks(self) -> None: + for task_name, task in self._tasks.items(): + if task is None or task.done(): + self._tasks[task_name] = asyncio.create_task( + eval(f"self.{task_name}()") + ) + + async def _clear_bandwidth(self) -> None: + while True: + await asyncio.sleep(1.0 / self.fps) + if ( + len(self._frames_buffer) > 0 + and time.time() - self._bandwidth_last_modified_time >= 1 + ): + self._frames_buffer.clear() + + async def _add_viewer(self, viewer_token: Optional[str] = None) -> str: + viewer_token = viewer_token or str(uuid.uuid4()) + async with self._lock: + self._active_viewers.add(viewer_token) + return viewer_token + + async def _remove_viewer(self, viewer_token: str) -> None: + async with self._lock: + self._active_viewers.discard(viewer_token) + + async def _process_current_frame(self) -> np.ndarray: + self._last_processed_frame = self._frame + return self._frame + + async def __check_encoding(self, frame: np.ndarray) -> str: + if isinstance(frame, np.ndarray) and frame.ndim == 1 and frame.size > 2: + # Check JPG header (0xFFD8) and footer (0xFFD9) + if ( + frame[0] == 255 + and frame[1] == 216 + and frame[-2] == 255 + and frame[-1] == 217 + ): + return "jpg" + return "one-dim-non-jpg" + if isinstance(frame, np.ndarray): + return "multi-dim" + return "unknown" + + async def _resize_and_encode_frame( + self, frame: np.ndarray, size: Tuple[int, int], quality: int + ) -> np.ndarray: + resized_frame = cv2.resize(frame, size) + if not await self.__check_encoding(resized_frame) == "jpg": + val, encoded_frame = cv2.imencode( + ".jpg", resized_frame, [cv2.IMWRITE_JPEG_QUALITY, quality] + ) + if not val: + raise ValueError( + f"Error encoding frame. Format/shape: {await self.__check_encoding(resized_frame)}" + ) + return encoded_frame + + def settings(self) -> None: + for key, value in self.__dict__.items(): + if key.startswith("_"): + continue + print(f"{key}: {value}") + + def has_demand(self) -> bool: + return len(self._active_viewers) > 0 + + def active_viewers(self) -> int: + return len(self._active_viewers) + + def get_bandwidth(self) -> float: + return sum(self._frames_buffer) + + def set_fps(self, fps: int) -> None: + self.fps = fps + self._frames_buffer = deque(maxlen=fps) + + async def _get_frame(self) -> np.ndarray: + await self._ensure_background_tasks() # A little hacky + if time.time() - self._bandwidth_last_modified_time <= 1.0 / self.fps: + return self._last_processed_frame # Prevents redundant processing + async with self._lock: + self._frames_buffer.append(len(self._last_processed_frame.tobytes())) + self._bandwidth_last_modified_time = time.time() + return await self._process_current_frame() + + def set_frame(self, frame: np.ndarray) -> None: + self._frame = frame + + +class Stream(StreamBase): + def __init__( + self, + name: str, + fps: int = 30, + size: Optional[Tuple[int, int]] = None, + quality: int = 50, + ) -> None: + super().__init__(name, fps) + self.size = size + self.quality = max(1, min(quality, 100)) + self._last_processed_frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) + + async def _process_current_frame(self) -> np.ndarray: + frame = await self._resize_and_encode_frame( + self._frame, + self.size or (self._frame.shape[1], self._frame.shape[0]), + self.quality, + ) + self._last_processed_frame = frame + return frame + + def set_size(self, size: Tuple[int, int]) -> None: + self.size = size + + def set_quality(self, quality: int) -> None: + self.quality = max(1, min(quality, 100)) + + +class ManagedStream(StreamBase): + def __init__( + self, + name: str, + source: Union[int, str] = 0, + fps: int = 30, + size: Optional[Tuple[int, int]] = None, + quality: int = 50, + mode: str = "fast-on-demand", + poll_delay_ms: Optional[Union[float, int]] = None, + ) -> None: + super().__init__(name, fps) + self.source = source + self.mode = mode + self._available_modes: List[str,] = ["fast-on-demand", "full-on-demand"] + if self.mode not in self._available_modes: + raise ValueError(f"Invalid mode. Available modes: {self._available_modes}") + self.size = size + self.quality = max(1, min(quality, 100)) + self.poll_delay_seconds = poll_delay_ms / 1000.0 if poll_delay_ms else 1.0 / fps + self._cap_is_open: bool = False + self._cap: cv2.VideoCapture = None + self._is_running: bool = False + self._tasks["_manage_cap_state"] = None + + async def _manage_cap_state(self) -> None: + while True: + await asyncio.sleep(self.poll_delay_seconds) + if self.mode == "full-on-demand": + if self.has_demand() and not self._cap_is_open: + async with self._lock: + await self.__open_cap() + elif not self.has_demand() and self._cap_is_open: + async with self._lock: + await self.__close_cap() + elif not self._cap_is_open: + async with self._lock: + await self.__open_cap() + + async def __open_cap(self) -> None: + if not self._cap_is_open and self._is_running: + self._cap = cv2.VideoCapture(self.source) + if not self._cap.isOpened(): + raise ValueError("Cannot open the capture device") + self._cap_is_open = True + + async def __close_cap(self) -> None: + if self._cap_is_open and self._is_running: + self._cap.release() + self._cap_is_open = False + + async def __read_frame(self) -> None: + if self._cap_is_open and self._is_running: + val, frame = self._cap.read() + if not val: + async with self._lock: + val, frame = self._cap.read() + if not val: + raise RuntimeError("Error reading frame") + self._frame = frame + else: + await self.__open_cap() + + async def _process_current_frame(self) -> np.ndarray: + if not self.has_demand(): + return self._last_processed_frame + await self.__read_frame() + frame = await self._resize_and_encode_frame( + self._frame, + self.size or (self._frame.shape[1], self._frame.shape[0]), + self.quality, + ) + self._last_processed_frame = frame + return frame + + async def _get_frame(self) -> np.ndarray: + if not self._is_running: + raise RuntimeError( + "Stream is not running, please call the start method first." + ) + return await super()._get_frame() + + def set_size(self, size: Tuple[int, int]) -> None: + self.size = size + + def set_quality(self, quality: int) -> None: + self.quality = max(1, min(quality, 100)) + + def set_frame(self, frame: np.ndarray) -> None: + raise NotImplementedError( + "This method is not available for ManagedStream, use Stream or StreamBase instead." + ) + + def change_mode(self, mode: str) -> None: + if mode not in self._available_modes: + print(f"Invalid mode. Available modes: {self._available_modes}") + self.mode = mode + + def change_source(self, source: Union[int, str]) -> None: + self.source = source + self.__close_cap() + self.__open_cap() + + def set_poll_delay_ms(self, poll_delay_ms: float) -> None: + self.poll_delay_seconds = poll_delay_ms / 1000.0 + + def start(self) -> None: + if not self._is_running: + self._is_running = True + else: + print("Stream has already started") + + def stop(self) -> None: + if self._is_running: + for task in self._tasks.values(): + if task and not task.done(): + task.cancel() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.__close_cap()) + loop.close() if loop.is_running() else None + self._is_running = False + self._cap_is_open = False + else: + print("Stream has already stopped") diff --git a/perf/py-spy-results.md b/perf/py-spy-results.md new file mode 100644 index 0000000..13627d6 --- /dev/null +++ b/perf/py-spy-results.md @@ -0,0 +1,39 @@ +# PY-SPY Tests + +Performance results gathered using py-spy. [Test scripts](../tests/) were run with the following command: + +```bash +py-spy top -- python tests/.py +``` + +## Results + +*Python version: 3.11.8* + +### test_streambase: +- no demand: +``GIL: 0% Active: 5-15% Threads: 2`` +- demand: +``GIL: 0% Active: 5-15% Threads: 2`` + +### test_stream: +- no demand: +``GIL: 0% Active: 0-4% Threads: 2`` +- demand: +``GIL: 0-2% Active: 3-10% Threads: 2`` + +### test_full_od: + - initial no demand: + ``GIL: 0% Active: 0-1% Threads: 2`` + - no demand: + ``GIL: 0% Active: 0% Threads: 2`` + - demand: + ``GIL: 0-2% Active: 2-10% Threads: 2`` + +### test_fast_od: + - initial no demand: + ``GIL: 0% Active: 0-1% Threads: 2`` + - no demand: + ``GIL: 0% Active: 0-1% Threads: 2`` + - demand: + ``GIL: 0-2% Active: 2-10% Threads: 2`` diff --git a/perf/usage-tests.md b/perf/usage-tests.md new file mode 100644 index 0000000..fc01ac6 --- /dev/null +++ b/perf/usage-tests.md @@ -0,0 +1,53 @@ +# Usage Tests + +Results gathered from running the tests in the [tests](../tests/) directory. These results are very arbitrary and should not be taken as a definitive measure of performance. They are meant to give a rough idea of how the system behaves under different conditions. + +## Computer Specs + +```bash +$ neofetch --off +OS: Ubuntu 23.10 x86_64 +Host: Victus by HP Laptop 16-e0xxx +Kernel: 6.5.0-28-generic +Shell: zsh 5.9 +DE: GNOME 45.2 +Terminal: gnome-terminal +CPU: AMD Ryzen 7 5800H with Radeon Graphics (16) @ 4.463GHz +GPU: AMD ATI Radeon Vega Series / Radeon Vega Mobile Series +GPU: NVIDIA GeForce RTX 3050 Ti Mobile +Memory: 15313MiB +``` + +*Note:* Irrelevant information has been removed from the output. + +## Results + +*Python version: 3.11.8* + +### test_streambase: +- no demand: +``CPU: 14-16% Memory: 39.2 MB`` +- demand: +``CPU: 15-19% Memory: 39.5 MB`` + +### test_stream: +- no demand: +``CPU: 14-16% Memory: 39.7 MB`` +- demand: +``CPU: 14-17% Memory: 41.7 MB`` + +### test_full_od: + - initial no demand: + ``CPU: 0% Memory: 37 MB`` + - no demand: + ``CPU: 0% Memory: 39.9 MB`` + - demand: + ``CPU: 15-18% Memory: 41 MB`` + +### test_fast_od: + - initial no demand: + ``CPU: 0% Memory: 37.2 MB`` + - no demand: + ``CPU: 0-0.1% Memory: 41.1 MB`` + - demand: + ``CPU: 15-18% Memory: 41.1 MB`` diff --git a/pyproject.toml b/pyproject.toml index 34a047d..5bea27e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,6 @@ dependencies = [ 'aiohttp==3.8.6; python_version >= "3.6" and python_version <= "3.7"', 'aiohttp==3.9.1; python_version == "3.8"', 'aiohttp; python_version >= "3.9"', - "netifaces", 'numpy==1.19.5; python_version == "3.6"', 'numpy==1.21.6; python_version == "3.7"', 'numpy==1.24.4; python_version == "3.8"', @@ -79,9 +78,9 @@ isolated = true path = "mjpeg_streamer/__init__.py" [tool.ruff] -exclude = ["examples"] +exclude = ["examples", "test*"] fix = false -ignore = [ +lint.ignore = [ "TID252", # Relative imports are banned | __init__.py "T201", # `print` found | TODO: Migrate to logging "S104", # Possible binding to all interfaces | False positive diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..82ed91f --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,4 @@ +hatch +mypy +pre-commit +ruff diff --git a/tests/test_fast_od.py b/tests/test_fast_od.py new file mode 100644 index 0000000..7cb9518 --- /dev/null +++ b/tests/test_fast_od.py @@ -0,0 +1,24 @@ +import time + +from mjpeg_streamer import ManagedStream, Server + +server = Server(host="0.0.0.0", port=8080) +stream = ManagedStream( + "test", + source=0, + fps=30, + size=(640, 480), + quality=50, + mode="fast-on-demand", + poll_delay_ms=None, +) + +server.add_stream(stream) +server.start() +stream.start() + +print(stream.settings()) + +while True: + time.sleep(1 / 30) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r") diff --git a/tests/test_full_od.py b/tests/test_full_od.py new file mode 100644 index 0000000..a61dca0 --- /dev/null +++ b/tests/test_full_od.py @@ -0,0 +1,24 @@ +import time + +from mjpeg_streamer import ManagedStream, Server + +server = Server(host="0.0.0.0", port=8080) +stream = ManagedStream( + "test", + source=0, + fps=30, + size=(640, 480), + quality=50, + mode="full-on-demand", + poll_delay_ms=None, +) + +server.add_stream(stream) +server.start() +stream.start() + +print(stream.settings()) + +while True: + time.sleep(1 / 30) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r") diff --git a/tests/test_stream.py b/tests/test_stream.py new file mode 100644 index 0000000..8c382fc --- /dev/null +++ b/tests/test_stream.py @@ -0,0 +1,17 @@ +import cv2 + +from mjpeg_streamer import Server, Stream + +capture = cv2.VideoCapture(0) + +server = Server(host="0.0.0.0", port=8080) +stream = Stream("test", fps=30, size=(640, 480), quality=50) +server.add_stream(stream) +server.start() + +print(stream.settings()) + +while True: + frame = capture.read()[1] + stream.set_frame(frame) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r") diff --git a/tests/test_streambase.py b/tests/test_streambase.py new file mode 100644 index 0000000..bf5a450 --- /dev/null +++ b/tests/test_streambase.py @@ -0,0 +1,19 @@ +import cv2 + +from mjpeg_streamer import Server, StreamBase + +capture = cv2.VideoCapture(0) + +server = Server(host="0.0.0.0", port=8080) +stream = StreamBase("test", fps=30) + +server.add_stream(stream) +server.start() + +print(stream.settings()) + +while True: + frame = capture.read()[1] + frame = cv2.imencode(".jpg", frame)[1] + stream.set_frame(frame) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r")