Skip to content

Commit 11da4eb

Browse files
committed
Improve mqtt client
1 parent bef55f7 commit 11da4eb

File tree

5 files changed

+131
-99
lines changed

5 files changed

+131
-99
lines changed

pyhon/appliance.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import re
44
from datetime import datetime, timedelta
55
from pathlib import Path
6-
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload, Callable
6+
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
77

88
from pyhon import diagnose, exceptions
99
from pyhon.appliances.base import ApplianceBase
@@ -43,7 +43,6 @@ def __init__(
4343
self._additional_data: Dict[str, Any] = {}
4444
self._last_update: Optional[datetime] = None
4545
self._default_setting = HonParameter("", {}, "")
46-
self._notify_function: Optional[Callable[[Any], None]] = None
4746

4847
try:
4948
self._extra: Optional[ApplianceBase] = importlib.import_module(
@@ -313,11 +312,3 @@ def sync_parameter(self, main: Parameter, target: Parameter) -> None:
313312
elif isinstance(target, HonParameterEnum):
314313
target.values = main.values
315314
target.value = main.value
316-
317-
def subscribe(self, notify_function: Callable[[Any], None]) -> None:
318-
self._notify_function = notify_function
319-
320-
def notify(self) -> None:
321-
self.sync_params_to_command("settings")
322-
if self._notify_function:
323-
self._notify_function(self.attributes)

pyhon/connection/api.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
from typing import Dict, Optional, Any, List, no_type_check, Type
88

99
from aiohttp import ClientSession
10-
from awscrt import mqtt5
1110
from typing_extensions import Self
1211

1312
from pyhon import const, exceptions
1413
from pyhon.appliance import HonAppliance
15-
from pyhon.connection import mqtt
1614
from pyhon.connection.auth import HonAuth
1715
from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
1816
from pyhon.connection.handler.hon import HonConnectionHandler
@@ -40,7 +38,6 @@ def __init__(
4038
self._hon_handler: Optional[HonConnectionHandler] = None
4139
self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
4240
self._session: Optional[ClientSession] = session
43-
self._mqtt_client: mqtt5.Client | None = None
4441

4542
async def __aenter__(self) -> Self:
4643
return await self.create()
@@ -269,10 +266,6 @@ async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
269266
result: Dict[str, Any] = await response.json()
270267
return result
271268

272-
async def subscribe_mqtt(self, appliances: list[HonAppliance]) -> None:
273-
if not self._mqtt_client:
274-
self._mqtt_client = await mqtt.start(self, appliances)
275-
276269
async def close(self) -> None:
277270
if self._hon_handler is not None:
278271
await self._hon_handler.close()

pyhon/connection/mqtt.py

Lines changed: 116 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import json
23
import logging
34
import secrets
@@ -10,91 +11,127 @@
1011
from pyhon.appliance import HonAppliance
1112

1213
if TYPE_CHECKING:
13-
from pyhon import HonAPI
14+
from pyhon import Hon
1415

1516
_LOGGER = logging.getLogger(__name__)
1617

17-
appliances: list[HonAppliance] = []
18-
19-
20-
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData) -> None:
21-
_LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))
22-
23-
24-
def on_lifecycle_connection_success(
25-
lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
26-
) -> None:
27-
_LOGGER.info(
28-
"Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
29-
)
30-
31-
32-
def on_lifecycle_attempting_connect(
33-
lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
34-
) -> None:
35-
_LOGGER.info(
36-
"Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
37-
)
38-
39-
40-
def on_lifecycle_connection_failure(
41-
lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
42-
) -> None:
43-
_LOGGER.info(
44-
"Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
45-
)
4618

19+
class MQTTClient:
20+
def __init__(self, hon: "Hon"):
21+
self._client: mqtt5.Client | None = None
22+
self._hon = hon
23+
self._api = hon.api
24+
self._appliances = hon.appliances
25+
self._connection = False
26+
self._watchdog_task: asyncio.Task[None] | None = None
27+
28+
@property
29+
def client(self) -> mqtt5.Client:
30+
if self._client is not None:
31+
return self._client
32+
raise AttributeError("Client is not set")
33+
34+
async def create(self) -> "MQTTClient":
35+
await self._start()
36+
self._subscribe_appliances()
37+
return self
38+
39+
def _on_lifecycle_stopped(
40+
self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData
41+
) -> None:
42+
_LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))
43+
44+
def _on_lifecycle_connection_success(
45+
self,
46+
lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
47+
) -> None:
48+
self._connection = True
49+
_LOGGER.info(
50+
"Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
51+
)
4752

48-
def on_lifecycle_disconnection(
49-
lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData,
50-
) -> None:
51-
_LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data))
53+
def _on_lifecycle_attempting_connect(
54+
self,
55+
lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
56+
) -> None:
57+
_LOGGER.info(
58+
"Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
59+
)
5260

61+
def _on_lifecycle_connection_failure(
62+
self,
63+
lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
64+
) -> None:
65+
_LOGGER.info(
66+
"Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
67+
)
5368

54-
def on_publish_received(data: mqtt5.PublishReceivedData) -> None:
55-
if not (data and data.publish_packet and data.publish_packet.payload):
56-
return
57-
payload = json.loads(data.publish_packet.payload.decode())
58-
topic = data.publish_packet.topic
59-
if topic and "appliancestatus" in topic:
69+
def _on_lifecycle_disconnection(
70+
self,
71+
lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData,
72+
) -> None:
73+
self._connection = False
74+
_LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data))
75+
76+
def _on_publish_received(self, data: mqtt5.PublishReceivedData) -> None:
77+
if not (data and data.publish_packet and data.publish_packet.payload):
78+
return
79+
payload = json.loads(data.publish_packet.payload.decode())
80+
topic = data.publish_packet.topic
6081
appliance = next(
61-
a for a in appliances if topic in a.info["topics"]["subscribe"]
82+
a for a in self._appliances if topic in a.info["topics"]["subscribe"]
83+
)
84+
if topic and "appliancestatus" in topic:
85+
for parameter in payload["parameters"]:
86+
appliance.attributes["parameters"][parameter["parName"]].update(
87+
parameter
88+
)
89+
appliance.sync_params_to_command("settings")
90+
self._hon.notify()
91+
elif topic and "connected" in topic:
92+
_LOGGER.info("Connected %s", appliance.nick_name)
93+
elif topic and "disconnected" in topic:
94+
_LOGGER.info("Disconnected %s", appliance.nick_name)
95+
elif topic and "discovery" in topic:
96+
_LOGGER.info("Discovered %s", appliance.nick_name)
97+
_LOGGER.info("%s - %s", topic, payload)
98+
99+
async def _start(self) -> None:
100+
self._client = mqtt5_client_builder.websockets_with_custom_authorizer(
101+
endpoint=const.AWS_ENDPOINT,
102+
auth_authorizer_name=const.AWS_AUTHORIZER,
103+
auth_authorizer_signature=await self._api.load_aws_token(),
104+
auth_token_key_name="token",
105+
auth_token_value=self._api.auth.id_token,
106+
client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}",
107+
on_lifecycle_stopped=self._on_lifecycle_stopped,
108+
on_lifecycle_connection_success=self._on_lifecycle_connection_success,
109+
on_lifecycle_attempting_connect=self._on_lifecycle_attempting_connect,
110+
on_lifecycle_connection_failure=self._on_lifecycle_connection_failure,
111+
on_lifecycle_disconnection=self._on_lifecycle_disconnection,
112+
on_publish_received=self._on_publish_received,
62113
)
63-
for parameter in payload["parameters"]:
64-
appliance.attributes["parameters"][parameter["parName"]].update(parameter)
65-
appliance.notify()
66-
_LOGGER.info("%s - %s", topic, payload)
67-
68-
69-
async def create_mqtt_client(api: "HonAPI") -> mqtt5.Client:
70-
client: mqtt5.Client = mqtt5_client_builder.websockets_with_custom_authorizer(
71-
endpoint=const.AWS_ENDPOINT,
72-
auth_authorizer_name=const.AWS_AUTHORIZER,
73-
auth_authorizer_signature=await api.load_aws_token(),
74-
auth_token_key_name="token",
75-
auth_token_value=api.auth.id_token,
76-
client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}",
77-
on_lifecycle_stopped=on_lifecycle_stopped,
78-
on_lifecycle_connection_success=on_lifecycle_connection_success,
79-
on_lifecycle_attempting_connect=on_lifecycle_attempting_connect,
80-
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
81-
on_lifecycle_disconnection=on_lifecycle_disconnection,
82-
on_publish_received=on_publish_received,
83-
)
84-
client.start()
85-
return client
86-
87-
88-
def subscribe(client: mqtt5.Client, appliance: HonAppliance) -> None:
89-
for topic in appliance.info.get("topics", {}).get("subscribe", []):
90-
client.subscribe(mqtt5.SubscribePacket([mqtt5.Subscription(topic)])).result(10)
91-
_LOGGER.info("Subscribed to topic %s", topic)
92-
93-
94-
async def start(api: "HonAPI", app: list[HonAppliance]) -> mqtt5.Client:
95-
client = await create_mqtt_client(api)
96-
global appliances # pylint: disable=global-statement
97-
appliances = app
98-
for appliance in appliances:
99-
subscribe(client, appliance)
100-
return client
114+
self.client.start()
115+
116+
def _subscribe_appliances(self) -> None:
117+
for appliance in self._appliances:
118+
self._subscribe(appliance)
119+
120+
def _subscribe(self, appliance: HonAppliance) -> None:
121+
for topic in appliance.info.get("topics", {}).get("subscribe", []):
122+
self.client.subscribe(
123+
mqtt5.SubscribePacket([mqtt5.Subscription(topic)])
124+
).result(10)
125+
_LOGGER.info("Subscribed to topic %s", topic)
126+
127+
async def start_watchdog(self) -> None:
128+
if not self._watchdog_task or self._watchdog_task.done():
129+
await asyncio.create_task(self._watchdog())
130+
131+
async def _watchdog(self) -> None:
132+
while True:
133+
await asyncio.sleep(5)
134+
if not self._connection:
135+
_LOGGER.info("Restart mqtt connection")
136+
await self._start()
137+
self._subscribe_appliances()

pyhon/hon.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
import logging
33
from pathlib import Path
44
from types import TracebackType
5-
from typing import List, Optional, Dict, Any, Type
5+
from typing import List, Optional, Dict, Any, Type, Callable
66

77
from aiohttp import ClientSession
88
from typing_extensions import Self
99

1010
from pyhon.appliance import HonAppliance
1111
from pyhon.connection.api import HonAPI
1212
from pyhon.connection.api import TestAPI
13+
from pyhon.connection.mqtt import MQTTClient
1314
from pyhon.exceptions import NoAuthenticationException
1415

1516
_LOGGER = logging.getLogger(__name__)
@@ -33,6 +34,8 @@ def __init__(
3334
self._test_data_path: Path = test_data_path or Path().cwd()
3435
self._mobile_id: str = mobile_id
3536
self._refresh_token: str = refresh_token
37+
self._mqtt_client: MQTTClient | None = None
38+
self._notify_function: Optional[Callable[[Any], None]] = None
3639

3740
async def __aenter__(self) -> Self:
3841
return await self.create()
@@ -120,7 +123,15 @@ async def setup(self) -> None:
120123
api = TestAPI(test_data)
121124
for appliance in await api.load_appliances():
122125
await self._create_appliance(appliance, api)
123-
await self.api.subscribe_mqtt(self.appliances)
126+
if not self._mqtt_client:
127+
self._mqtt_client = await MQTTClient(self).create()
128+
129+
def subscribe_updates(self, notify_function: Callable[[Any], None]) -> None:
130+
self._notify_function = notify_function
131+
132+
def notify(self) -> None:
133+
if self._notify_function:
134+
self._notify_function(None)
124135

125136
async def close(self) -> None:
126137
await self.api.close()

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
setup(
99
name="pyhOn",
10-
version="0.17.1",
10+
version="0.17.2",
1111
author="Andre Basche",
1212
description="Control hOn devices with python",
1313
long_description=long_description,

0 commit comments

Comments
 (0)