Skip to content

Commit e57fa88

Browse files
committed
This fixes #12 but only tried on Windows
1 parent cc76948 commit e57fa88

File tree

7 files changed

+91
-33
lines changed

7 files changed

+91
-33
lines changed

mjpeg_streamer/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .server import MjpegServer, Server
2-
from .stream import CustomStream, ManagedStream, Stream
2+
from .stream import ManagedStream, Stream, StreamBase
33

4-
__all__ = ["CustomStream", "ManagedStream", "MjpegServer", "Stream", "Server"]
4+
__all__ = ["StreamBase", "ManagedStream", "MjpegServer", "Stream", "Server"]
55
__version__ = "2024.2.8"

mjpeg_streamer/server.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ async def __call__(self, request: web.Request) -> web.StreamResponse:
2323
"Content-Type": "multipart/x-mixed-replace;boundary=image-boundary"
2424
},
2525
)
26-
await response.prepare(request)
26+
try:
27+
await response.prepare(request)
28+
except (ConnectionResetError, ConnectionAbortedError):
29+
pass
2730
if not viewer_token:
2831
viewer_token = await self._stream._add_viewer()
2932
response.set_cookie("viewer_token", viewer_token)
@@ -125,7 +128,4 @@ def stop(self) -> None:
125128

126129
class MjpegServer(Server):
127130
# Alias for Server, to maintain backwards compatibility
128-
def __init__(
129-
self, host: Union[str, List[str,]] = "localhost", port: int = 8080
130-
) -> None:
131-
super().__init__(host, port)
131+
pass

mjpeg_streamer/stream.py

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import time
33
import uuid
44
from collections import deque
5-
from typing import Deque, List, Optional, Set, Tuple, Union
5+
from typing import Deque, Dict, List, Optional, Set, Tuple, Union
66

77
import cv2
88
import numpy as np
@@ -14,29 +14,23 @@ def __init__(
1414
name: str,
1515
fps: int = 30,
1616
) -> None:
17-
if type(self) is StreamBase:
18-
raise TypeError(
19-
"StreamBase is an abstract class and cannot be instantiated."
20-
)
2117
self.name = name.casefold().replace(" ", "_")
2218
self.fps = fps
2319
self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8)
2420
self._lock: asyncio.Lock = asyncio.Lock()
2521
self._frames_buffer: Deque[int] = deque(maxlen=fps)
2622
self._bandwidth_last_modified_time: float = time.time()
2723
self._active_viewers: Set[str] = set()
28-
self._bandwidth_background_task: Optional[asyncio.Task] = None
24+
self._tasks: Dict[str, asyncio.Task] = {"_clear_bandwidth": None}
2925

3026
async def _ensure_background_tasks(self) -> None:
31-
if (
32-
self._bandwidth_background_task is None
33-
or self._bandwidth_background_task.done()
34-
):
35-
self._bandwidth_background_task = asyncio.create_task(
36-
self.__clear_bandwidth()
37-
)
27+
for task_name, task in self._tasks.items():
28+
if task is None or task.done():
29+
self._tasks[task_name] = asyncio.create_task(
30+
eval(f"self.{task_name}()")
31+
)
3832

39-
async def __clear_bandwidth(self) -> None:
33+
async def _clear_bandwidth(self) -> None:
4034
while True:
4135
await asyncio.sleep(1.0 / self.fps)
4236
if (
@@ -120,9 +114,6 @@ def set_frame(self, frame: np.ndarray) -> None:
120114
self._frame = frame
121115

122116

123-
CustomStream = StreamBase
124-
125-
126117
class Stream(StreamBase):
127118
def __init__(
128119
self,
@@ -163,6 +154,7 @@ def __init__(
163154
mode: str = "fast-on-demand",
164155
poll_delay_ms: Optional[Union[float, int]] = None,
165156
) -> None:
157+
super().__init__(name, fps)
166158
self.source = source
167159
self.mode = mode
168160
self._available_modes: List[str,] = ["fast-on-demand", "full-on-demand"]
@@ -175,14 +167,9 @@ def __init__(
175167
self._cap: cv2.VideoCapture = None
176168
self._cap_background_task: Optional[asyncio.Task] = None
177169
self._is_running: bool = False
178-
super().__init__(name, fps)
179-
180-
async def _ensure_background_tasks(self) -> None:
181-
await super()._ensure_background_tasks()
182-
if self._cap_background_task is None or self._cap_background_task.done():
183-
self._cap_background_task = asyncio.create_task(self.__manage_cap_state())
170+
self._tasks["_manage_cap_state"] = None
184171

185-
async def __manage_cap_state(self) -> None:
172+
async def _manage_cap_state(self) -> None:
186173
while True:
187174
await asyncio.sleep(self.poll_delay_seconds)
188175
if self.mode == "full-on-demand":
@@ -223,7 +210,6 @@ async def __read_frame(self) -> None:
223210
async def _process_current_frame(self) -> np.ndarray:
224211
if not self.has_demand():
225212
return self._last_processed_frame
226-
print("reading frame")
227213
await self.__read_frame()
228214
frame = await self._resize_and_encode_frame(
229215
self._frame,
@@ -248,7 +234,7 @@ def set_quality(self, quality: int) -> None:
248234

249235
def set_frame(self, frame: np.ndarray) -> None:
250236
raise NotImplementedError(
251-
"This method is not available for ManagedStream, use Stream or CustomStream instead."
237+
"This method is not available for ManagedStream, use Stream or StreamBase instead."
252238
)
253239

254240
def change_mode(self, mode: str) -> None:

tests/test_fast_od.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import time
2+
3+
from mjpeg_streamer import ManagedStream, Server
4+
5+
server = Server()
6+
stream = ManagedStream(
7+
"test",
8+
source=0,
9+
fps=30,
10+
size=(640, 480),
11+
quality=50,
12+
mode="fast-on-demand",
13+
poll_delay_ms=None,
14+
)
15+
16+
server.add_stream(stream)
17+
server.start()
18+
stream.start()
19+
20+
while True:
21+
time.sleep(1 / 30)

tests/test_full_od.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import time
2+
3+
from mjpeg_streamer import ManagedStream, Server
4+
5+
server = Server()
6+
stream = ManagedStream(
7+
"test",
8+
source=0,
9+
fps=30,
10+
size=(640, 480),
11+
quality=50,
12+
mode="full-on-demand",
13+
poll_delay_ms=None,
14+
)
15+
16+
server.add_stream(stream)
17+
server.start()
18+
stream.start()
19+
20+
while True:
21+
time.sleep(1 / 30)

tests/test_stream.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import cv2
2+
3+
from mjpeg_streamer import Server, Stream
4+
5+
capture = cv2.VideoCapture(0)
6+
7+
server = Server()
8+
stream = Stream("test", fps=30, size=(640, 480), quality=50)
9+
server.add_stream(stream)
10+
server.start()
11+
12+
while True:
13+
frame = capture.read()[1]
14+
stream.set_frame(frame)

tests/test_streambase.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import cv2
2+
3+
from mjpeg_streamer import Server, StreamBase
4+
5+
capture = cv2.VideoCapture(0)
6+
7+
server = Server()
8+
stream = StreamBase("test", fps=30)
9+
10+
server.add_stream(stream)
11+
server.start()
12+
13+
while True:
14+
frame = capture.read()[1]
15+
frame = cv2.imencode(".jpg", frame)[1]
16+
stream.set_frame(frame)

0 commit comments

Comments
 (0)