Skip to content

Commit 1342a95

Browse files
committed
Clean up a bit
1 parent 568ae1d commit 1342a95

File tree

1 file changed

+85
-106
lines changed

1 file changed

+85
-106
lines changed

mjpeg_streamer/stream.py

Lines changed: 85 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,51 @@ def __init__(
1414
name: str,
1515
fps: int = 30,
1616
) -> None:
17-
if type(self) is StreamBase:
18-
raise TypeError(
19-
"StreamBase is an abstract class and cannot be instantiated"
20-
)
21-
self.name = name.lower().casefold().replace(" ", "_")
17+
self.name = name.casefold().replace(" ", "_")
2218
self.fps = fps
2319
self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8)
2420
self._lock: asyncio.Lock = asyncio.Lock()
25-
self._frame_buffer: Deque[int,] = deque(maxlen=fps)
21+
self._frames_buffer: Deque[int] = deque(maxlen=fps)
2622
self._bandwidth_last_modified_time: float = time.time()
27-
self._deque_background_task: Optional[asyncio.Task] = None
28-
self._active_viewers: Set[str,] = set()
23+
self._active_viewers: Set[str] = set()
24+
self._bandwidth_background_task: Optional[asyncio.Task] = None
25+
26+
def __new__(self, *args, **kwargs):
27+
raise TypeError("Cannot instantiate an abstract class")
28+
29+
async def _ensure_background_tasks(self) -> None:
30+
if (
31+
self._bandwidth_background_task is None
32+
or self._bandwidth_background_task.done()
33+
):
34+
self._bandwidth_background_task = asyncio.create_task(
35+
self.__clear_bandwidth()
36+
)
2937

30-
async def __clear_deque(self) -> None:
38+
async def __clear_bandwidth(self) -> None:
3139
while True:
32-
await asyncio.sleep(1 / self.fps)
40+
await asyncio.sleep(1.0 / self.fps)
3341
if (
34-
len(self._frame_buffer) > 0
42+
len(self._frames_buffer) > 0
3543
and time.time() - self._bandwidth_last_modified_time >= 1
3644
):
37-
self._frame_buffer.clear()
45+
self._frames_buffer.clear()
3846

3947
async def _add_viewer(self, viewer_token: Optional[str] = None) -> str:
4048
viewer_token = viewer_token or str(uuid.uuid4())
41-
self._active_viewers.add(viewer_token)
49+
async with self._lock:
50+
self._active_viewers.add(viewer_token)
4251
return viewer_token
4352

4453
async def _remove_viewer(self, viewer_token: str) -> None:
45-
self._active_viewers.discard(viewer_token)
54+
async with self._lock:
55+
self._active_viewers.discard(viewer_token)
4656

47-
async def _ensure_background_tasks(self) -> None:
48-
if self._deque_background_task is None or self._deque_background_task.done():
49-
self._deque_background_task = asyncio.create_task(self.__clear_deque())
57+
async def _process_current_frame(self) -> np.ndarray:
58+
self._last_processed_frame = self._frame
59+
return self._frame
5060

51-
def _check_encoding(self, frame: np.ndarray) -> str:
61+
async def __check_encoding(self, frame: np.ndarray) -> str:
5262
if isinstance(frame, np.ndarray) and frame.ndim == 1 and frame.size > 2:
5363
# Check JPEG header (0xFFD8) and footer (0xFFD9)
5464
if (
@@ -63,6 +73,20 @@ def _check_encoding(self, frame: np.ndarray) -> str:
6373
return "multi-dim"
6474
return "unknown"
6575

76+
async def _resize_and_encode_frame(
77+
self, frame: np.ndarray, size: Tuple[int, int], quality: int
78+
) -> np.ndarray:
79+
resized_frame = cv2.resize(frame, size)
80+
if not await self.__check_encoding(resized_frame) == "jpeg":
81+
val, encoded_frame = cv2.imencode(
82+
".jpg", resized_frame, [cv2.IMWRITE_JPEG_QUALITY, quality]
83+
)
84+
if not val:
85+
raise ValueError(
86+
f"Error encoding frame. Format/shape: {await self.__check_encoding(resized_frame)}"
87+
)
88+
return encoded_frame
89+
6690
def settings(self) -> None:
6791
for key, value in self.__dict__.items():
6892
if key.startswith("_"):
@@ -76,32 +100,26 @@ def active_viewers(self) -> int:
76100
return len(self._active_viewers)
77101

78102
def get_bandwidth(self) -> float:
79-
return sum(self._frame_buffer)
103+
return sum(self._frames_buffer)
80104

81105
def set_fps(self, fps: int) -> None:
82106
self.fps = fps
107+
self._frames_buffer = deque(maxlen=fps)
83108

84-
# Method for delivering the frame to the StreamHandler
85109
async def _get_frame(self) -> np.ndarray:
86-
# A little hacky, if you have a better way, please let me know
87-
await self._ensure_background_tasks()
88-
# Checking the encoding here instead of set_frame
89-
# to avoid continous polling
90-
if self._check_encoding(self._frame) != "jpeg":
91-
raise ValueError(
92-
"Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG."
93-
)
94-
self._frame_buffer.append(len(self._frame.tobytes()))
95-
self._bandwidth_last_modified_time = time.time()
110+
await self._ensure_background_tasks() # A little hacky
111+
if time.time() - self._bandwidth_last_modified_time <= 1.0 / self.fps:
112+
return self._last_processed_frame
96113
async with self._lock:
97-
return self._frame
114+
self._frames_buffer.append(len(self._frame.tobytes()))
115+
self._bandwidth_last_modified_time = time.time()
116+
return await self._process_current_frame()
98117

99118
def set_frame(self, frame: np.ndarray) -> None:
100119
self._frame = frame
101120

102-
# Not very useful, but it's here for the sake of completeness
103-
def get_frame(self) -> np.ndarray:
104-
return self._frame
121+
122+
CustomStream = StreamBase
105123

106124

107125
class Stream(StreamBase):
@@ -112,59 +130,26 @@ def __init__(
112130
size: Optional[Tuple[int, int]] = None,
113131
quality: int = 50,
114132
) -> None:
133+
super().__init__(name, fps)
115134
self.size = size
116135
self.quality = max(1, min(quality, 100))
117136
self._last_processed_frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8)
118-
super().__init__(name, fps)
119137

120-
async def __process_current_frame(self) -> np.ndarray:
121-
frame = self._frame
122-
if not self._check_encoding(frame) == "jpeg":
123-
frame = cv2.resize(frame, self.size or (frame.shape[1], frame.shape[0]))
124-
val, frame = cv2.imencode(
125-
".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality]
126-
)
127-
if not val:
128-
raise ValueError("Error encoding frame")
129-
else:
130-
print(
131-
"The frame is already encoded, I will not encode nor resize it again. \
132-
Consider using CustomStream if you want to handle the processing yourself."
133-
)
134-
self._frame_buffer.append(len(frame.tobytes()))
135-
self._bandwidth_last_modified_time = time.time()
138+
async def _process_current_frame(self) -> np.ndarray:
139+
frame = await self._resize_and_encode_frame(
140+
self._frame,
141+
self.size or (self._frame.shape[1], self._frame.shape[0]),
142+
self.quality,
143+
)
136144
self._last_processed_frame = frame
137145
return frame
138146

139-
async def _get_frame(self) -> np.ndarray:
140-
if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps:
141-
return self._last_processed_frame
142-
await self._ensure_background_tasks()
143-
async with self._lock:
144-
return await self.__process_current_frame()
145-
146147
def set_size(self, size: Tuple[int, int]) -> None:
147148
self.size = size
148149

149150
def set_quality(self, quality: int) -> None:
150151
self.quality = max(1, min(quality, 100))
151152

152-
def set_frame(self, frame: np.ndarray) -> None:
153-
self._frame = frame
154-
155-
def get_frame(self) -> np.ndarray:
156-
return super().get_frame()
157-
158-
159-
class CustomStream(StreamBase):
160-
# Same as StreamBase, but with a friendly name
161-
def __init__(
162-
self,
163-
name: str,
164-
fps: int = 30,
165-
) -> None:
166-
super().__init__(name, fps)
167-
168153

169154
class ManagedStream(StreamBase):
170155
def __init__(
@@ -184,7 +169,7 @@ def __init__(
184169
raise ValueError(f"Invalid mode. Available modes: {self._available_modes}")
185170
self.size = size
186171
self.quality = max(1, min(quality, 100))
187-
self.poll_delay_seconds = poll_delay_ms / 1000 if poll_delay_ms else 1 / fps
172+
self.poll_delay_seconds = poll_delay_ms / 1000.0 if poll_delay_ms else 1.0 / fps
188173
self._cap_is_open: bool = False
189174
self._cap: cv2.VideoCapture = None
190175
self._cap_background_task: Optional[asyncio.Task] = None
@@ -201,20 +186,23 @@ async def __manage_cap_state(self) -> None:
201186
await asyncio.sleep(self.poll_delay_seconds)
202187
if self.mode == "full-on-demand":
203188
if self.has_demand() and not self._cap_is_open:
204-
self.__open_cap()
189+
async with self._lock:
190+
await self.__open_cap()
205191
elif not self.has_demand() and self._cap_is_open:
206-
self.__close_cap()
192+
async with self._lock:
193+
await self.__close_cap()
207194
elif not self._cap_is_open:
208-
self.__open_cap()
195+
async with self._lock:
196+
await self.__open_cap()
209197

210-
def __open_cap(self) -> None:
198+
async def __open_cap(self) -> None:
211199
if not self._cap_is_open and self._is_running:
212200
self._cap = cv2.VideoCapture(self.source)
213201
if not self._cap.isOpened():
214202
raise ValueError("Cannot open the capture device")
215203
self._cap_is_open = True
216204

217-
def __close_cap(self) -> None:
205+
async def __close_cap(self) -> None:
218206
if self._cap_is_open and self._is_running:
219207
self._cap.release()
220208
self._cap_is_open = False
@@ -223,42 +211,33 @@ async def __read_frame(self) -> None:
223211
if self._cap_is_open and self._is_running:
224212
val, frame = self._cap.read()
225213
if not val:
226-
raise ValueError("Error reading frame")
214+
async with self._lock:
215+
val, frame = self._cap.read()
216+
if not val:
217+
raise RuntimeError("Error reading frame")
227218
self._frame = frame
228219
else:
229-
self.__open_cap()
220+
await self.__open_cap()
230221

231-
async def __process_current_frame(self) -> np.ndarray:
222+
async def _process_current_frame(self) -> np.ndarray:
232223
if not self.has_demand():
233-
return self._frame
224+
return self._last_processed_frame
225+
print("reading frame")
234226
await self.__read_frame()
235-
frame = cv2.resize(
236-
self._frame, self.size or (self._frame.shape[1], self._frame.shape[0])
237-
)
238-
val, frame = cv2.imencode(
239-
".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality]
227+
frame = await self._resize_and_encode_frame(
228+
self._frame,
229+
self.size or (self._frame.shape[1], self._frame.shape[0]),
230+
self.quality,
240231
)
241-
if not val:
242-
if self._cap.getBackendName() == "FFMPEG":
243-
raise ValueError(
244-
"Seems like you are using a video file as the source. \
245-
The media might have ended."
246-
)
247-
raise ValueError("Error encoding frame")
248-
self._frame_buffer.append(len(frame.tobytes()))
249-
self._bandwidth_last_modified_time = time.time()
250232
self._last_processed_frame = frame
251233
return frame
252234

253235
async def _get_frame(self) -> np.ndarray:
254236
if not self._is_running:
255-
print("Stream is not running, please call the start method first.")
256-
return self._frame
257-
if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps:
258-
return self._last_processed_frame
259-
await self._ensure_background_tasks()
260-
async with self._lock:
261-
return await self.__process_current_frame()
237+
raise RuntimeError(
238+
"Stream is not running, please call the start method first."
239+
)
240+
return await super()._get_frame()
262241

263242
def set_size(self, size: Tuple[int, int]) -> None:
264243
self.size = size
@@ -282,7 +261,7 @@ def change_source(self, source: Union[int, str]) -> None:
282261
self.__open_cap()
283262

284263
def set_poll_delay_ms(self, poll_delay_ms: float) -> None:
285-
self.poll_delay_seconds = poll_delay_ms / 1000
264+
self.poll_delay_seconds = poll_delay_ms / 1000.0
286265

287266
def start(self) -> None:
288267
if not self._is_running:

0 commit comments

Comments
 (0)