Skip to content

support harmony remote by mjpeg #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ UI Inspector for Android and iOS, help inspector element properties, and auto ge
```bash
pip install uiautodev
```
If you are working with HarmonyOS and need additional dependencies, you can install the harmony version with the following command:
```bash
pip install uiautodev[harmony]
```

# Usage
```bash
Expand Down
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ include = [

[tool.poetry.dependencies]
python = "^3.8"
setuptools = "*"
adbutils = ">=2.8.10,<3"
click = "^8.1.7"
pygments = ">=2"
Expand All @@ -28,6 +29,13 @@ httpx = "*"
uvicorn = ">=0.33.0"
rich = "*"
python-multipart = ">=0.0.18"
xdevice = { path = "uiautodev/binaries/xdevice-5.0.7.200.tar.gz" }
xdevice-devicetest = { path = "uiautodev/binaries/xdevice-devicetest-5.0.7.200.tar.gz" }
xdevice-ohos = { path = "uiautodev/binaries/xdevice-ohos-5.0.7.200.tar.gz" }
hypium = { path = "uiautodev/binaries/hypium-5.0.7.200.tar.gz" }

[tool.poetry.extras]
harmony = ["setuptools", "xdevice", "xdevice-devicetest", "xdevice-ohos", "hypium"]

[tool.poetry.scripts]
"uiauto.dev" = "uiautodev.__main__:main"
Expand Down
35 changes: 35 additions & 0 deletions uiautodev/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def get_features(platform: str) -> Dict[str, bool]:
features[feature_name] = True
return features


class InfoResponse(BaseModel):
version: str
description: str
Expand Down Expand Up @@ -171,5 +172,39 @@ async def unified_ws(websocket: WebSocket, serial: str):
logger.info(f"WebSocket closed for serial={serial}")


def get_harmony_mjpeg_server(serial: str):
from uiautodev.remote.harmony_scrcpy import HarmonyMjpegServer
logger.info("create harmony mjpeg server for %s", serial)
from hypium import UiDriver
driver = UiDriver.connect(device_sn=serial)
logger.info(f'device wake_up_display: {driver.wake_up_display()}')
return HarmonyMjpegServer(driver)


@app.websocket("/ws/harmony/mjpeg/{serial}")
async def unified_harmony_ws(websocket: WebSocket, serial: str):
"""
Args:
serial: device serial
websocket: WebSocket
"""
await websocket.accept()

try:
logger.info(f"WebSocket serial: {serial}")

# 获取 HarmonyScrcpyServer 实例
server = get_harmony_mjpeg_server(serial)
server.start()
await server.handle_ws(websocket)
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected by client.")
except Exception as e:
logger.exception(f"WebSocket error for serial={serial}: {e}")
await websocket.close(code=1000, reason=str(e))
finally:
logger.info(f"WebSocket closed for serial={serial}")


if __name__ == '__main__':
uvicorn.run("uiautodev.app:app", port=4000, reload=True, use_colors=True)
Binary file added uiautodev/binaries/hypium-5.0.7.200.tar.gz
Binary file not shown.
Binary file added uiautodev/binaries/xdevice-5.0.7.200.tar.gz
Binary file not shown.
Binary file not shown.
Binary file added uiautodev/binaries/xdevice-ohos-5.0.7.200.tar.gz
Binary file not shown.
197 changes: 197 additions & 0 deletions uiautodev/remote/harmony_scrcpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import asyncio
import json
import logging
import socket
from datetime import datetime
from threading import Thread

from fastapi import WebSocket
from hypium import KeyCode

logger = logging.getLogger(__name__)


class HarmonyMjpegServer:
"""
HarmonyMjpegServer is responsible for handling screen streaming functionality
for HarmonyOS devices that support ABC proxy (a communication interface).

It manages WebSocket clients, communicates with the ABC server over gRPC, and streams
the device's screen data in real-time to connected clients.

This server is specifically designed for devices running in 'abc mode' and requires that
the target device expose an `abc_proxy` attribute for communication.

Attributes:
device: The HarmonyOS device object.
driver: The controlling driver which may wrap the device.
abc_rpc_addr: Tuple containing the IP and port used to communicate with abc_proxy.
channel: The gRPC communication channel (initialized later).
clients: A set of connected WebSocket clients.
loop: Asyncio event loop used to run asynchronous tasks.
is_running: Boolean flag indicating if the streaming service is active.

Raises:
RuntimeError: If the connected device does not support abc_proxy.

References:
- Huawei HarmonyOS Python Guidelines:
https://developer.huawei.com/consumer/cn/doc/harmonyos-guides/hypium-python-guidelines
"""

def __init__(self, driver):
if hasattr(driver, "_device"):
device = driver._device
else:
device = driver
logger.info(f'device: {device}')
if not hasattr(device, "abc_proxy") or device.abc_proxy is None:
raise RuntimeError("Only abc mode can support screen recorder")
self.device = device
self.driver = driver
self.abc_rpc_addr = ("127.0.0.1", device.abc_proxy.port)
self.channel = None
self.clients = set()
self.loop = asyncio.get_event_loop()
self.is_running = False

def connect(self):
self.channel = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Copy link
Preview

Copilot AI Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider setting a timeout on the socket (e.g., using settimeout) to avoid potential blocking during long network reads in production environments.

Suggested change
self.channel = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.channel = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.channel.settimeout(10) # Set a timeout of 10 seconds for socket operations

Copilot uses AI. Check for mistakes.

self.channel.connect(self.abc_rpc_addr)

def start(self, timeout=3600):
if self.channel is None:
self.connect()
self.is_running = True
self.timeout = timeout
self.stop_capture_if_running()
msg_json = {'api': "startCaptureScreen", 'args': []}
full_msg = {
"module": "com.ohos.devicetest.hypiumApiHelper",
"method": "Captures",
"params": msg_json,
"request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")
}
full_msg_str = json.dumps(full_msg, ensure_ascii=False, separators=(',', ':'))
self.channel.sendall(full_msg_str.encode("utf-8") + b'\n')
reply = self.channel.recv(1024)
logger.info(f'reply: {reply}')
if b"true" in reply:
thread_record = Thread(target=self._record_worker)
thread_record.start()
else:
raise RuntimeError("Fail to start screen capture")

def stop_capture_if_running(self):
msg_json = {'api': "stopCaptureScreen", 'args': []}
full_msg = {
"module": "com.ohos.devicetest.hypiumApiHelper",
"method": "Captures",
"params": msg_json,
"request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")
}
full_msg_str = json.dumps(full_msg, ensure_ascii=False, separators=(',', ':'))
self.channel.sendall(full_msg_str.encode("utf-8") + b'\n')
reply = self.channel.recv(1024)
logger.info(f'stop reply: {reply}')

async def handle_ws(self, websocket: WebSocket):
self.clients.add(websocket)
serial = getattr(self.device, "device_sn", "unknown")
logger.info(f"[{serial}] WebSocket connected")

try:
while True:
message = await websocket.receive_text()
logger.info(f"Received message: {message}")
try:
data = json.loads(message)
if data.get('type') == 'touch':
action = data.get('action')
x, y = data.get('x'), data.get('y')
if action == 'normal':
self.driver.touch((x, y))
elif action == 'long':
self.driver.touch(target=(x, y), mode='long')
elif action == 'double':
self.driver.touch(target=(x, y), mode='double')
elif action == 'move':
self.driver.slide(
start=(data.get('x1'), data.get('y1')),
end=(data.get('x2'), data.get('y2')),
slide_time=0.1
)
elif data.get('type') == 'keyEvent':
event_number = data['eventNumber']
if event_number == 187:
self.driver.swipe_to_recent_task()
elif event_number == 3:
self.driver.go_home()
elif event_number == 4:
self.driver.go_back()
elif event_number == 224:
self.driver.wake_up_display()
elif data.get('type') == 'text':
detail = data.get('detail')
if detail == 'CODE_AC_BACK':
self.driver.press_key(KeyCode.DEL)
elif detail == 'CODE_AC_ENTER':
self.driver.press_key(KeyCode.ENTER)
else:
self.driver.shell(
f"uitest uiInput inputText {data.get('x')} {data.get('y')} {detail}")
except Exception as e:
logger.warning(f"Failed to handle message: {e}")
except Exception as e:
logger.info(f"WebSocket closed: {e}")
finally:
self.clients.discard(websocket)

def _record_worker(self):
tmp_data = b''
start_flag = b'\xff\xd8'
end_flag = b'\xff\xd9'
while self.is_running:
try:
result = self.channel.recv(4096 * 1024)
tmp_data += result
while start_flag in tmp_data and end_flag in tmp_data:
start_index = tmp_data.index(start_flag)
end_index = tmp_data.index(end_flag) + 2
frame = tmp_data[start_index:end_index]
tmp_data = tmp_data[end_index:]
asyncio.run_coroutine_threadsafe(self._broadcast(frame), self.loop)
except Exception as e:
logger.warning(f"Record worker error: {e}")
self.is_running = False
self.channel = None
break

async def _broadcast(self, data):
for client in self.clients.copy():
try:
await client.send_bytes(data)
except Exception as e:
logger.info(f"Send error, removing client: {e}")
self.clients.discard(client)

def stop(self):
self.is_running = False
if self.channel is None:
return
msg_json = {'api': "stopCaptureScreen", 'args': []}
full_msg = {
"module": "com.ohos.devicetest.hypiumApiHelper",
"method": "Captures",
"params": msg_json,
"request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")
}
full_msg_str = json.dumps(full_msg, ensure_ascii=False, separators=(',', ':'))
self.channel.sendall(full_msg_str.encode("utf-8") + b'\n')
reply = self.channel.recv(1024)
if b"true" not in reply:
logger.info("Fail to stop capture")
self.channel.close()
self.channel = None
for client in self.clients:
asyncio.run_coroutine_threadsafe(client.close(), self.loop)
Loading
Loading