diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..c02ae14
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,6 @@
+charset = utf-8
+indent_size = 4
+indent_style = space
+insert_final_newline = true
+root = true
+trim_trailing_whitespace = true
diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py
index a74aec0..ee5b55e 100644
--- a/mjpeg_streamer/__init__.py
+++ b/mjpeg_streamer/__init__.py
@@ -1,4 +1,5 @@
-from .mjpeg_streamer import MjpegServer, Stream
+from .server import MjpegServer, Server
+from .stream import ManagedStream, Stream, StreamBase
-__all__ = ["MjpegServer", "Stream"]
+__all__ = ["StreamBase", "ManagedStream", "MjpegServer", "Stream", "Server"]
__version__ = "2024.2.8"
diff --git a/mjpeg_streamer/cli.py b/mjpeg_streamer/cli.py
index 5f70a4d..f8b777c 100644
--- a/mjpeg_streamer/cli.py
+++ b/mjpeg_streamer/cli.py
@@ -1,11 +1,10 @@
import argparse
import re
-import threading
-from typing import List, Tuple, Union
+import time
+from typing import Dict, List, Tuple, Union
-import cv2
-
-from mjpeg_streamer import MjpegServer, Stream
+from .server import Server
+from .stream import ManagedStream
def parse_args() -> argparse.Namespace:
@@ -13,7 +12,7 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
- "--prefix", type=str, default="source", help="Name prefix for the streams"
+ "--prefix", type=str, default="", help="Name prefix for the streams"
)
parser.add_argument(
"--source",
@@ -34,84 +33,58 @@ def parse_args() -> argparse.Namespace:
)
args: argparse.Namespace = parser.parse_args()
args.prefix = re.sub("[^0-9a-zA-Z]+", "_", args.prefix)
- args.source: List[Union[int, str],] = [[0]] if args.source is None else args.source
+ args.source = [[0]] if args.source is None else args.source
args.source = [item for sublist in args.source for item in sublist]
args.source = list(set(args.source))
return args
-def run(
- cap: cv2.VideoCapture,
- stream: Stream,
- stop_event: threading.Event,
- show_bandwidth: bool,
-) -> None:
- while not stop_event.is_set():
- ret, frame = cap.read()
- if not ret:
- stop_event.set()
- break
- stream.set_frame(frame)
- if show_bandwidth:
- global bandwidth
- bandwidth[stream.name] = stream.get_bandwidth()
- cap.release()
-
-
def main() -> None:
args = parse_args()
size: Tuple[int, int] = (args.width, args.height)
- server = MjpegServer(args.host, args.port)
- threads: List[threading.Thread,] = []
- stop_events: List[threading.Event,] = []
+ streams: List[ManagedStream] = []
+ server = Server(args.host, args.port)
if args.show_bandwidth:
- global bandwidth
- bandwidth = {} # dict[str, int]
+ bandwidth: Dict[str, int] = {}
for source in args.source:
source: Union[int, str] = int(source) if str(source).isdigit() else source
- cap = cv2.VideoCapture(source)
source_display = (
re.sub("[^0-9a-zA-Z]+", "_", source) if isinstance(source, str) else source
)
- stream = Stream(
- f"{args.prefix}_{source_display!s}",
+ stream = ManagedStream(
+ f"{args.prefix}{'_' if args.prefix else ''}{source_display!s}",
+ source=source,
size=size,
quality=args.quality,
fps=args.fps,
)
server.add_stream(stream)
- stop_event = threading.Event()
- stop_events.append(stop_event)
- thread = threading.Thread(
- target=run, args=(cap, stream, stop_event, args.show_bandwidth)
- )
- threads.append(thread)
+ streams.append(stream)
try:
- for thread in threads:
- thread.start()
+ for stream in streams:
+ stream.start()
server.start()
while True:
if args.show_bandwidth:
+ for stream in streams:
+ bandwidth[stream.name] = stream.get_bandwidth()
print(
f"{' | '.join([f'{k}: {round(v / 1024, 2)} KB/s' for k, v in bandwidth.items()])}",
end="\r",
)
+ else:
+ time.sleep(1) # Keep the main thread alive, but don't consume CPU
except KeyboardInterrupt:
- for stop_event in stop_events:
- stop_event.set()
- server.stop()
- for thread in threads:
- thread.join()
+ print("\nExiting...")
except Exception as e:
- print(e)
- for stop_event in stop_events:
- stop_event.set()
+ print("Error:", e)
+ finally:
+ for stream in streams:
+ stream.stop()
server.stop()
- for thread in threads:
- thread.join()
if __name__ == "__main__":
diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py
deleted file mode 100644
index fdc5859..0000000
--- a/mjpeg_streamer/mjpeg_streamer.py
+++ /dev/null
@@ -1,164 +0,0 @@
-import asyncio
-import threading
-import time
-from collections import deque
-from typing import List, Optional, Tuple, Union
-
-import aiohttp
-import cv2
-import netifaces
-import numpy as np
-from aiohttp import MultipartWriter, web
-from aiohttp.web_runner import GracefulExit
-
-
-class Stream:
- def __init__(
- self,
- name: str,
- size: Optional[Tuple[int, int]] = None,
- quality: int = 50,
- fps: int = 30,
- ) -> None:
- self.name = name.lower().casefold().replace(" ", "_")
- self.size = size
- self.quality = max(1, min(quality, 100))
- self.fps = fps
- self._frame = np.zeros((320, 240, 1), dtype=np.uint8)
- self._lock = asyncio.Lock()
- self._byte_frame_window = deque(maxlen=30)
- self._bandwidth_last_modified_time = time.time()
-
- def set_frame(self, frame: np.ndarray) -> None:
- self._frame = frame
-
- def get_bandwidth(self) -> float:
- if (
- len(self._byte_frame_window) > 0
- and time.time() - self._bandwidth_last_modified_time >= 1
- ):
- deque.clear(self._byte_frame_window)
- return sum(self._byte_frame_window)
-
- def __process_current_frame(self) -> np.ndarray:
- frame = cv2.resize(
- self._frame, self.size or (self._frame.shape[1], self._frame.shape[0])
- )
- val, frame = cv2.imencode(
- ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality]
- )
- if not val:
- raise ValueError("Error encoding frame")
- self._byte_frame_window.append(len(frame.tobytes()))
- self._bandwidth_last_modified_time = time.time()
- return frame
-
- async def get_frame(self) -> np.ndarray:
- async with self._lock:
- return self._frame
-
- async def get_frame_processed(self) -> np.ndarray:
- async with self._lock:
- return self.__process_current_frame()
-
-
-class _StreamHandler:
- def __init__(self, stream: Stream) -> None:
- self._stream = stream
-
- async def __call__(self, request: web.Request) -> web.StreamResponse:
- response = web.StreamResponse(
- status=200,
- reason="OK",
- headers={
- "Content-Type": "multipart/x-mixed-replace;boundary=image-boundary"
- },
- )
- await response.prepare(request)
-
- while True:
- await asyncio.sleep(1 / self._stream.fps)
- frame = await self._stream.get_frame_processed()
- with MultipartWriter("image/jpeg", boundary="image-boundary") as mpwriter:
- mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"})
- try:
- await mpwriter.write(response, close_boundary=False)
- except (ConnectionResetError, ConnectionAbortedError):
- return web.Response(status=499, text="Client closed the connection")
- await response.write(b"\r\n")
-
-
-class MjpegServer:
- def __init__(self, host: Union[str, int] = "localhost", port: int = 8080) -> None:
- if isinstance(host, str) and host != "0.0.0.0":
- self._host = [host]
- elif isinstance(host, list):
- if "0.0.0.0" in host:
- host.remove("0.0.0.0")
- host = host + [
- netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"]
- for iface in netifaces.interfaces()
- if netifaces.AF_INET in netifaces.ifaddresses(iface)
- ]
- self._host = list(dict.fromkeys(host))
- else:
- self._host = [
- netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"]
- for iface in netifaces.interfaces()
- if netifaces.AF_INET in netifaces.ifaddresses(iface)
- ]
- self._port = port
- self._app = web.Application()
- self._app.is_running = False
- self._cap_routes: List[str,] = []
-
- def is_running(self) -> bool:
- return self._app.is_running
-
- async def __root_handler(self, _) -> web.Response:
- text = "
Available streams:
"
- for route in self._cap_routes:
- text += f"{route}\n
\n"
- return aiohttp.web.Response(text=text, content_type="text/html")
-
- def add_stream(self, stream: Stream) -> None:
- if self.is_running():
- raise RuntimeError("Cannot add stream after the server has started")
- route = f"/{stream.name}"
- if route in self._cap_routes:
- raise ValueError(f"A stream with the name {route} already exists")
- self._cap_routes.append(route)
- self._app.router.add_route("GET", route, _StreamHandler(stream))
-
- def __start_func(self) -> None:
- self._app.router.add_route("GET", "/", self.__root_handler)
- runner = web.AppRunner(self._app)
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(runner.setup())
- site = web.TCPSite(runner, self._host, self._port)
- loop.run_until_complete(site.start())
- loop.run_forever()
-
- def start(self) -> None:
- if not self.is_running():
- thread = threading.Thread(target=self.__start_func, daemon=True)
- thread.start()
- self._app.is_running = True
- else:
- print("\nServer is already running\n")
-
- for addr in self._host:
- print(f"\nStreams index: http://{addr}:{self._port!s}")
- print("Available streams:\n")
- for route in self._cap_routes: # route has a leading slash
- print(f"http://{addr}:{self._port!s}{route}")
- print("--------------------------------\n")
- print("\nPress Ctrl+C to stop the server\n")
-
- def stop(self) -> None:
- if self.is_running():
- self._app.is_running = False
- print("\nStopping...\n")
- raise GracefulExit
- print("\nServer is not running\n")
diff --git a/mjpeg_streamer/server.py b/mjpeg_streamer/server.py
new file mode 100644
index 0000000..3f2901f
--- /dev/null
+++ b/mjpeg_streamer/server.py
@@ -0,0 +1,131 @@
+import asyncio
+import threading
+from typing import List, Union
+
+import aiohttp
+from aiohttp import MultipartWriter, web
+from aiohttp.web_runner import GracefulExit
+from multidict import MultiDict
+
+from .stream import StreamBase
+
+
+class _StreamHandler:
+ def __init__(self, stream: StreamBase) -> None:
+ self._stream = stream
+
+ async def __call__(self, request: web.Request) -> web.StreamResponse:
+ viewer_token = request.cookies.get("viewer_token")
+ response = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers={
+ "Content-Type": "multipart/x-mixed-replace;boundary=image-boundary"
+ },
+ )
+ try:
+ await response.prepare(request)
+ except (ConnectionResetError, ConnectionAbortedError, ConnectionError):
+ pass
+ if not viewer_token:
+ viewer_token = await self._stream._add_viewer()
+ response.set_cookie("viewer_token", viewer_token)
+ elif viewer_token not in self._stream._active_viewers:
+ await self._stream._add_viewer(viewer_token)
+ try:
+ while True:
+ try:
+ await asyncio.sleep(1 / self._stream.fps)
+ frame = await self._stream._get_frame()
+ with MultipartWriter(
+ "image/jpeg", boundary="image-boundary"
+ ) as mpwriter:
+ mpwriter.append(
+ frame.tobytes(),
+ MultiDict({"Content-Type": "image/jpeg"}),
+ )
+ await mpwriter.write(response, close_boundary=False)
+ await response.write(b"\r\n")
+ except (ConnectionResetError, ConnectionAbortedError, ConnectionError):
+ break
+ finally:
+ await self._stream._remove_viewer(viewer_token)
+ return response
+
+
+class Server:
+ def __init__(
+ self, host: Union[str, List[str,]] = "localhost", port: int = 8080
+ ) -> None:
+ if isinstance(host, str):
+ self._host: List[str,] = [
+ host,
+ ]
+ elif isinstance(host, list):
+ if "0.0.0.0" in host:
+ host = ["0.0.0.0"]
+ if "localhost" in host and "127.0.0.1" in host:
+ host.remove("localhost")
+ self._host = list(set(host))
+ self._port = port
+ self._app: web.Application = web.Application()
+ self._app_is_running: bool = False
+ self._cap_routes: List[str,] = []
+
+ def is_running(self) -> bool:
+ return self._app_is_running
+
+ async def __root_handler(self, _) -> web.Response:
+ text = "Available streams:
"
+ for route in self._cap_routes:
+ text += f"{route}\n
\n"
+ return aiohttp.web.Response(text=text, content_type="text/html")
+
+ def add_stream(self, stream: StreamBase) -> None:
+ if self.is_running():
+ raise RuntimeError("Cannot add stream after the server has started")
+ route = f"/{stream.name}"
+ if route in self._cap_routes:
+ raise ValueError(f"A stream with the name {route} already exists")
+ self._cap_routes.append(route)
+ self._app.router.add_route("GET", route, _StreamHandler(stream))
+
+ def __start_func(self) -> None:
+ self._app.router.add_route("GET", "/", self.__root_handler)
+ runner = web.AppRunner(self._app)
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ loop.run_until_complete(runner.setup())
+ site = web.TCPSite(runner, self._host, self._port)
+ loop.run_until_complete(site.start())
+ loop.run_forever()
+
+ def start(self) -> None:
+ if not self.is_running():
+ thread = threading.Thread(target=self.__start_func, daemon=True)
+ thread.start()
+ self._app_is_running = True
+ else:
+ print("\nServer is already running\n")
+
+ for addr in self._host:
+ print(f"\nStreams index: http://{addr}:{self._port!s}")
+ print("Available streams:\n")
+ for route in self._cap_routes: # route has a leading slash
+ print(f"http://{addr}:{self._port!s}{route}")
+ print("--------------------------------\n")
+ print("\nPress Ctrl+C to stop the server\n")
+
+ def stop(self) -> None:
+ if self.is_running():
+ self._app_is_running = False
+ print("\nStopping...\n")
+ GracefulExit()
+ print("\nServer stopped\n")
+ else:
+ print("\nServer is not running\n")
+
+
+class MjpegServer(Server):
+ # Alias for Server, to maintain backwards compatibility
+ pass
diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py
new file mode 100644
index 0000000..894bc02
--- /dev/null
+++ b/mjpeg_streamer/stream.py
@@ -0,0 +1,272 @@
+import asyncio
+import time
+import uuid
+from collections import deque
+from typing import Deque, Dict, List, Optional, Set, Tuple, Union
+
+import cv2
+import numpy as np
+
+
+class StreamBase:
+ def __init__(
+ self,
+ name: str,
+ fps: int = 30,
+ ) -> None:
+ self.name = name.casefold().replace(" ", "_")
+ self.fps = fps
+ self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8)
+ self._last_processed_frame: np.ndarray = cv2.imencode(
+ ".jpg", self._frame, [cv2.IMWRITE_JPEG_QUALITY, 1]
+ )[1]
+ self._lock: asyncio.Lock = asyncio.Lock()
+ self._frames_buffer: Deque[int] = deque(maxlen=fps)
+ self._bandwidth_last_modified_time: float = time.time()
+ self._active_viewers: Set[str] = set()
+ self._tasks: Dict[str, asyncio.Task] = {"_clear_bandwidth": None}
+
+ async def _ensure_background_tasks(self) -> None:
+ for task_name, task in self._tasks.items():
+ if task is None or task.done():
+ self._tasks[task_name] = asyncio.create_task(
+ eval(f"self.{task_name}()")
+ )
+
+ async def _clear_bandwidth(self) -> None:
+ while True:
+ await asyncio.sleep(1.0 / self.fps)
+ if (
+ len(self._frames_buffer) > 0
+ and time.time() - self._bandwidth_last_modified_time >= 1
+ ):
+ self._frames_buffer.clear()
+
+ async def _add_viewer(self, viewer_token: Optional[str] = None) -> str:
+ viewer_token = viewer_token or str(uuid.uuid4())
+ async with self._lock:
+ self._active_viewers.add(viewer_token)
+ return viewer_token
+
+ async def _remove_viewer(self, viewer_token: str) -> None:
+ async with self._lock:
+ self._active_viewers.discard(viewer_token)
+
+ async def _process_current_frame(self) -> np.ndarray:
+ self._last_processed_frame = self._frame
+ return self._frame
+
+ async def __check_encoding(self, frame: np.ndarray) -> str:
+ if isinstance(frame, np.ndarray) and frame.ndim == 1 and frame.size > 2:
+ # Check JPG header (0xFFD8) and footer (0xFFD9)
+ if (
+ frame[0] == 255
+ and frame[1] == 216
+ and frame[-2] == 255
+ and frame[-1] == 217
+ ):
+ return "jpg"
+ return "one-dim-non-jpg"
+ if isinstance(frame, np.ndarray):
+ return "multi-dim"
+ return "unknown"
+
+ async def _resize_and_encode_frame(
+ self, frame: np.ndarray, size: Tuple[int, int], quality: int
+ ) -> np.ndarray:
+ resized_frame = cv2.resize(frame, size)
+ if not await self.__check_encoding(resized_frame) == "jpg":
+ val, encoded_frame = cv2.imencode(
+ ".jpg", resized_frame, [cv2.IMWRITE_JPEG_QUALITY, quality]
+ )
+ if not val:
+ raise ValueError(
+ f"Error encoding frame. Format/shape: {await self.__check_encoding(resized_frame)}"
+ )
+ return encoded_frame
+
+ def settings(self) -> None:
+ for key, value in self.__dict__.items():
+ if key.startswith("_"):
+ continue
+ print(f"{key}: {value}")
+
+ def has_demand(self) -> bool:
+ return len(self._active_viewers) > 0
+
+ def active_viewers(self) -> int:
+ return len(self._active_viewers)
+
+ def get_bandwidth(self) -> float:
+ return sum(self._frames_buffer)
+
+ def set_fps(self, fps: int) -> None:
+ self.fps = fps
+ self._frames_buffer = deque(maxlen=fps)
+
+ async def _get_frame(self) -> np.ndarray:
+ await self._ensure_background_tasks() # A little hacky
+ if time.time() - self._bandwidth_last_modified_time <= 1.0 / self.fps:
+ return self._last_processed_frame # Prevents redundant processing
+ async with self._lock:
+ self._frames_buffer.append(len(self._last_processed_frame.tobytes()))
+ self._bandwidth_last_modified_time = time.time()
+ return await self._process_current_frame()
+
+ def set_frame(self, frame: np.ndarray) -> None:
+ self._frame = frame
+
+
+class Stream(StreamBase):
+ def __init__(
+ self,
+ name: str,
+ fps: int = 30,
+ size: Optional[Tuple[int, int]] = None,
+ quality: int = 50,
+ ) -> None:
+ super().__init__(name, fps)
+ self.size = size
+ self.quality = max(1, min(quality, 100))
+ self._last_processed_frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8)
+
+ async def _process_current_frame(self) -> np.ndarray:
+ frame = await self._resize_and_encode_frame(
+ self._frame,
+ self.size or (self._frame.shape[1], self._frame.shape[0]),
+ self.quality,
+ )
+ self._last_processed_frame = frame
+ return frame
+
+ def set_size(self, size: Tuple[int, int]) -> None:
+ self.size = size
+
+ def set_quality(self, quality: int) -> None:
+ self.quality = max(1, min(quality, 100))
+
+
+class ManagedStream(StreamBase):
+ def __init__(
+ self,
+ name: str,
+ source: Union[int, str] = 0,
+ fps: int = 30,
+ size: Optional[Tuple[int, int]] = None,
+ quality: int = 50,
+ mode: str = "fast-on-demand",
+ poll_delay_ms: Optional[Union[float, int]] = None,
+ ) -> None:
+ super().__init__(name, fps)
+ self.source = source
+ self.mode = mode
+ self._available_modes: List[str,] = ["fast-on-demand", "full-on-demand"]
+ if self.mode not in self._available_modes:
+ raise ValueError(f"Invalid mode. Available modes: {self._available_modes}")
+ self.size = size
+ self.quality = max(1, min(quality, 100))
+ self.poll_delay_seconds = poll_delay_ms / 1000.0 if poll_delay_ms else 1.0 / fps
+ self._cap_is_open: bool = False
+ self._cap: cv2.VideoCapture = None
+ self._is_running: bool = False
+ self._tasks["_manage_cap_state"] = None
+
+ async def _manage_cap_state(self) -> None:
+ while True:
+ await asyncio.sleep(self.poll_delay_seconds)
+ if self.mode == "full-on-demand":
+ if self.has_demand() and not self._cap_is_open:
+ async with self._lock:
+ await self.__open_cap()
+ elif not self.has_demand() and self._cap_is_open:
+ async with self._lock:
+ await self.__close_cap()
+ elif not self._cap_is_open:
+ async with self._lock:
+ await self.__open_cap()
+
+ async def __open_cap(self) -> None:
+ if not self._cap_is_open and self._is_running:
+ self._cap = cv2.VideoCapture(self.source)
+ if not self._cap.isOpened():
+ raise ValueError("Cannot open the capture device")
+ self._cap_is_open = True
+
+ async def __close_cap(self) -> None:
+ if self._cap_is_open and self._is_running:
+ self._cap.release()
+ self._cap_is_open = False
+
+ async def __read_frame(self) -> None:
+ if self._cap_is_open and self._is_running:
+ val, frame = self._cap.read()
+ if not val:
+ async with self._lock:
+ val, frame = self._cap.read()
+ if not val:
+ raise RuntimeError("Error reading frame")
+ self._frame = frame
+ else:
+ await self.__open_cap()
+
+ async def _process_current_frame(self) -> np.ndarray:
+ if not self.has_demand():
+ return self._last_processed_frame
+ await self.__read_frame()
+ frame = await self._resize_and_encode_frame(
+ self._frame,
+ self.size or (self._frame.shape[1], self._frame.shape[0]),
+ self.quality,
+ )
+ self._last_processed_frame = frame
+ return frame
+
+ async def _get_frame(self) -> np.ndarray:
+ if not self._is_running:
+ raise RuntimeError(
+ "Stream is not running, please call the start method first."
+ )
+ return await super()._get_frame()
+
+ def set_size(self, size: Tuple[int, int]) -> None:
+ self.size = size
+
+ def set_quality(self, quality: int) -> None:
+ self.quality = max(1, min(quality, 100))
+
+ def set_frame(self, frame: np.ndarray) -> None:
+ raise NotImplementedError(
+ "This method is not available for ManagedStream, use Stream or StreamBase instead."
+ )
+
+ def change_mode(self, mode: str) -> None:
+ if mode not in self._available_modes:
+ print(f"Invalid mode. Available modes: {self._available_modes}")
+ self.mode = mode
+
+ def change_source(self, source: Union[int, str]) -> None:
+ self.source = source
+ self.__close_cap()
+ self.__open_cap()
+
+ def set_poll_delay_ms(self, poll_delay_ms: float) -> None:
+ self.poll_delay_seconds = poll_delay_ms / 1000.0
+
+ def start(self) -> None:
+ if not self._is_running:
+ self._is_running = True
+ else:
+ print("Stream has already started")
+
+ def stop(self) -> None:
+ if self._is_running:
+ for task in self._tasks.values():
+ if task and not task.done():
+ task.cancel()
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(self.__close_cap())
+ loop.close() if loop.is_running() else None
+ self._is_running = False
+ self._cap_is_open = False
+ else:
+ print("Stream has already stopped")
diff --git a/perf/py-spy-results.md b/perf/py-spy-results.md
new file mode 100644
index 0000000..13627d6
--- /dev/null
+++ b/perf/py-spy-results.md
@@ -0,0 +1,39 @@
+# PY-SPY Tests
+
+Performance results gathered using py-spy. [Test scripts](../tests/) were run with the following command:
+
+```bash
+py-spy top -- python tests/.py
+```
+
+## Results
+
+*Python version: 3.11.8*
+
+### test_streambase:
+- no demand:
+``GIL: 0% Active: 5-15% Threads: 2``
+- demand:
+``GIL: 0% Active: 5-15% Threads: 2``
+
+### test_stream:
+- no demand:
+``GIL: 0% Active: 0-4% Threads: 2``
+- demand:
+``GIL: 0-2% Active: 3-10% Threads: 2``
+
+### test_full_od:
+ - initial no demand:
+ ``GIL: 0% Active: 0-1% Threads: 2``
+ - no demand:
+ ``GIL: 0% Active: 0% Threads: 2``
+ - demand:
+ ``GIL: 0-2% Active: 2-10% Threads: 2``
+
+### test_fast_od:
+ - initial no demand:
+ ``GIL: 0% Active: 0-1% Threads: 2``
+ - no demand:
+ ``GIL: 0% Active: 0-1% Threads: 2``
+ - demand:
+ ``GIL: 0-2% Active: 2-10% Threads: 2``
diff --git a/perf/usage-tests.md b/perf/usage-tests.md
new file mode 100644
index 0000000..fc01ac6
--- /dev/null
+++ b/perf/usage-tests.md
@@ -0,0 +1,53 @@
+# Usage Tests
+
+Results gathered from running the tests in the [tests](../tests/) directory. These results are very arbitrary and should not be taken as a definitive measure of performance. They are meant to give a rough idea of how the system behaves under different conditions.
+
+## Computer Specs
+
+```bash
+$ neofetch --off
+OS: Ubuntu 23.10 x86_64
+Host: Victus by HP Laptop 16-e0xxx
+Kernel: 6.5.0-28-generic
+Shell: zsh 5.9
+DE: GNOME 45.2
+Terminal: gnome-terminal
+CPU: AMD Ryzen 7 5800H with Radeon Graphics (16) @ 4.463GHz
+GPU: AMD ATI Radeon Vega Series / Radeon Vega Mobile Series
+GPU: NVIDIA GeForce RTX 3050 Ti Mobile
+Memory: 15313MiB
+```
+
+*Note:* Irrelevant information has been removed from the output.
+
+## Results
+
+*Python version: 3.11.8*
+
+### test_streambase:
+- no demand:
+``CPU: 14-16% Memory: 39.2 MB``
+- demand:
+``CPU: 15-19% Memory: 39.5 MB``
+
+### test_stream:
+- no demand:
+``CPU: 14-16% Memory: 39.7 MB``
+- demand:
+``CPU: 14-17% Memory: 41.7 MB``
+
+### test_full_od:
+ - initial no demand:
+ ``CPU: 0% Memory: 37 MB``
+ - no demand:
+ ``CPU: 0% Memory: 39.9 MB``
+ - demand:
+ ``CPU: 15-18% Memory: 41 MB``
+
+### test_fast_od:
+ - initial no demand:
+ ``CPU: 0% Memory: 37.2 MB``
+ - no demand:
+ ``CPU: 0-0.1% Memory: 41.1 MB``
+ - demand:
+ ``CPU: 15-18% Memory: 41.1 MB``
diff --git a/pyproject.toml b/pyproject.toml
index 34a047d..5bea27e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -54,7 +54,6 @@ dependencies = [
'aiohttp==3.8.6; python_version >= "3.6" and python_version <= "3.7"',
'aiohttp==3.9.1; python_version == "3.8"',
'aiohttp; python_version >= "3.9"',
- "netifaces",
'numpy==1.19.5; python_version == "3.6"',
'numpy==1.21.6; python_version == "3.7"',
'numpy==1.24.4; python_version == "3.8"',
@@ -79,9 +78,9 @@ isolated = true
path = "mjpeg_streamer/__init__.py"
[tool.ruff]
-exclude = ["examples"]
+exclude = ["examples", "test*"]
fix = false
-ignore = [
+lint.ignore = [
"TID252", # Relative imports are banned | __init__.py
"T201", # `print` found | TODO: Migrate to logging
"S104", # Possible binding to all interfaces | False positive
diff --git a/requirements-dev.txt b/requirements-dev.txt
new file mode 100644
index 0000000..82ed91f
--- /dev/null
+++ b/requirements-dev.txt
@@ -0,0 +1,4 @@
+hatch
+mypy
+pre-commit
+ruff
diff --git a/tests/test_fast_od.py b/tests/test_fast_od.py
new file mode 100644
index 0000000..7cb9518
--- /dev/null
+++ b/tests/test_fast_od.py
@@ -0,0 +1,24 @@
+import time
+
+from mjpeg_streamer import ManagedStream, Server
+
+server = Server(host="0.0.0.0", port=8080)
+stream = ManagedStream(
+ "test",
+ source=0,
+ fps=30,
+ size=(640, 480),
+ quality=50,
+ mode="fast-on-demand",
+ poll_delay_ms=None,
+)
+
+server.add_stream(stream)
+server.start()
+stream.start()
+
+print(stream.settings())
+
+while True:
+ time.sleep(1 / 30)
+ print(stream.get_bandwidth() / 1024, "KB/s", end="\r")
diff --git a/tests/test_full_od.py b/tests/test_full_od.py
new file mode 100644
index 0000000..a61dca0
--- /dev/null
+++ b/tests/test_full_od.py
@@ -0,0 +1,24 @@
+import time
+
+from mjpeg_streamer import ManagedStream, Server
+
+server = Server(host="0.0.0.0", port=8080)
+stream = ManagedStream(
+ "test",
+ source=0,
+ fps=30,
+ size=(640, 480),
+ quality=50,
+ mode="full-on-demand",
+ poll_delay_ms=None,
+)
+
+server.add_stream(stream)
+server.start()
+stream.start()
+
+print(stream.settings())
+
+while True:
+ time.sleep(1 / 30)
+ print(stream.get_bandwidth() / 1024, "KB/s", end="\r")
diff --git a/tests/test_stream.py b/tests/test_stream.py
new file mode 100644
index 0000000..8c382fc
--- /dev/null
+++ b/tests/test_stream.py
@@ -0,0 +1,17 @@
+import cv2
+
+from mjpeg_streamer import Server, Stream
+
+capture = cv2.VideoCapture(0)
+
+server = Server(host="0.0.0.0", port=8080)
+stream = Stream("test", fps=30, size=(640, 480), quality=50)
+server.add_stream(stream)
+server.start()
+
+print(stream.settings())
+
+while True:
+ frame = capture.read()[1]
+ stream.set_frame(frame)
+ print(stream.get_bandwidth() / 1024, "KB/s", end="\r")
diff --git a/tests/test_streambase.py b/tests/test_streambase.py
new file mode 100644
index 0000000..bf5a450
--- /dev/null
+++ b/tests/test_streambase.py
@@ -0,0 +1,19 @@
+import cv2
+
+from mjpeg_streamer import Server, StreamBase
+
+capture = cv2.VideoCapture(0)
+
+server = Server(host="0.0.0.0", port=8080)
+stream = StreamBase("test", fps=30)
+
+server.add_stream(stream)
+server.start()
+
+print(stream.settings())
+
+while True:
+ frame = capture.read()[1]
+ frame = cv2.imencode(".jpg", frame)[1]
+ stream.set_frame(frame)
+ print(stream.get_bandwidth() / 1024, "KB/s", end="\r")