Skip to content

Commit 4d7bcea

Browse files
committed
feat: screen recorder
1 parent 2e23e41 commit 4d7bcea

File tree

4 files changed

+135
-6
lines changed

4 files changed

+135
-6
lines changed

hmdriver2/_gesture.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import List, Union
55
from . import logger
66
from .utils import delay
7+
from .driver import Driver
78
from .proto import HypiumResponse, Point
89
from .exception import InjectGestureError
910

@@ -13,7 +14,7 @@ class _Gesture:
1314
SAMPLE_TIME_NORMAL = 50
1415
SAMPLE_TIME_MAX = 100
1516

16-
def __init__(self, driver: "Driver", sampling_ms=50): # type: ignore
17+
def __init__(self, driver: Driver, sampling_ms=50):
1718
"""
1819
Initialize a gesture object.
1920

hmdriver2/_screenrecord.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# -*- coding: utf-8 -*-
2+
3+
import socket
4+
import json
5+
import time
6+
import os
7+
import typing
8+
import subprocess
9+
import atexit
10+
import hashlib
11+
import threading
12+
import numpy as np
13+
from queue import Queue
14+
from datetime import datetime
15+
16+
import cv2
17+
18+
from . import logger
19+
from .driver import Driver
20+
from .exception import ScreenRecordError
21+
22+
23+
# Developing
24+
25+
26+
class _ScreenRecord:
27+
def __init__(self, driver: Driver):
28+
self._client = driver._client
29+
self.video_path = None
30+
31+
self.jpeg_queue = Queue()
32+
self.threads: typing.List[threading.Thread] = []
33+
self.stop_event = threading.Event()
34+
35+
def _send_message(self, api: str, args: list):
36+
_msg = {
37+
"module": "com.ohos.devicetest.hypiumApiHelper",
38+
"method": "Captures",
39+
"params": {
40+
"api": api,
41+
"args": args
42+
},
43+
"request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")
44+
}
45+
self._client._send_msg(_msg)
46+
47+
def start(self, video_path: str):
48+
49+
self.video_path = video_path
50+
51+
self._send_message("startCaptureScreen", [])
52+
53+
reply: str = self._client._recv_msg(1024, decode=True)
54+
if "true" in reply:
55+
record_th = threading.Thread(target=self._record_worker)
56+
writer_th = threading.Thread(target=self._video_writer)
57+
record_th.start()
58+
writer_th.start()
59+
self.threads.extend([record_th, writer_th])
60+
else:
61+
raise ScreenRecordError("Failed to start device screen capture.")
62+
63+
def _record_worker(self):
64+
"""Capture screen frames and save current frames."""
65+
66+
# JPEG start and end markers.
67+
start_flag = b'\xff\xd8'
68+
end_flag = b'\xff\xd9'
69+
buffer = bytearray()
70+
while not self.stop_event.is_set():
71+
try:
72+
buffer += self._client._recv_msg(4096 * 1024)
73+
except Exception as e:
74+
print(f"Error receiving data: {e}")
75+
break
76+
77+
start_idx = buffer.find(start_flag)
78+
end_idx = buffer.find(end_flag)
79+
while start_idx != -1 and end_idx != -1 and end_idx > start_idx:
80+
# Extract one JPEG image
81+
jpeg_image: bytearray = buffer[start_idx:end_idx + 2]
82+
self.jpeg_queue.put(jpeg_image)
83+
84+
buffer = buffer[end_idx + 2:]
85+
86+
# Search for the next JPEG image in the buffer
87+
start_idx = buffer.find(start_flag)
88+
end_idx = buffer.find(end_flag)
89+
90+
def _video_writer(self):
91+
"""Write frames to video file."""
92+
cv2_instance = None
93+
while not self.stop_event.is_set():
94+
if not self.jpeg_queue.empty():
95+
jpeg_image = self.jpeg_queue.get(timeout=0.1)
96+
img = cv2.imdecode(np.frombuffer(jpeg_image, np.uint8), cv2.IMREAD_COLOR)
97+
if cv2_instance is None:
98+
height, width = img.shape[:2]
99+
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
100+
cv2_instance = cv2.VideoWriter(self.video_path, fourcc, 10, (width, height))
101+
102+
cv2_instance.write(img)
103+
104+
if cv2_instance:
105+
cv2_instance.release()
106+
107+
def stop(self) -> str:
108+
try:
109+
self.stop_event.set()
110+
for t in self.threads:
111+
t.join()
112+
113+
self._send_message("stopCaptureScreen", [])
114+
self._client._recv_msg(1024, decode=True)
115+
except Exception as e:
116+
logger.error(f"An error occurred: {e}")
117+
118+
return self.video_path

hmdriver2/driver.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from ._uiobject import UiObject
1616
from .hdc import list_devices
1717
from .exception import DeviceNotFoundError
18-
from ._gesture import _Gesture
1918
from .proto import HypiumResponse, KeyCode, Point, DisplayRotation, DeviceInfo
2019

2120

@@ -270,10 +269,6 @@ def input_text(self, x, y, text: str):
270269
point = self._to_abs_pos(x, y)
271270
self.hdc.input_text(point.x, point.y, text)
272271

273-
@cached_property
274-
def gesture(self):
275-
return _Gesture(self)
276-
277272
def dump_hierarchy(self) -> Dict:
278273
"""
279274
Dump the UI hierarchy of the device screen.
@@ -282,3 +277,14 @@ def dump_hierarchy(self) -> Dict:
282277
Dict: The dumped UI hierarchy as a dictionary.
283278
"""
284279
return self.hdc.dump_hierarchy()
280+
281+
@cached_property
282+
def gesture(self):
283+
from ._gesture import _Gesture
284+
return _Gesture(self)
285+
286+
# @cached_property
287+
# def screenrecord(self):
288+
# # FIXME
289+
# from ._screenrecord import _ScreenRecord
290+
# return _ScreenRecord(self)

hmdriver2/exception.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@ class InvokeHypiumError(Exception):
2626

2727
class InjectGestureError(Exception):
2828
pass
29+
30+
31+
class ScreenRecordError(Exception):
32+
pass

0 commit comments

Comments
 (0)