Skip to content

Commit 71c11e3

Browse files
authored
Merge pull request #103 from dknowles2/diagnostics
Add support for returning redacted diagnostics
2 parents 50c2e57 + c4932f1 commit 71c11e3

File tree

4 files changed

+222
-3
lines changed

4 files changed

+222
-3
lines changed

pyschlage/common.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
from __future__ import annotations
44

5+
from copy import deepcopy
56
from dataclasses import dataclass, field, fields
67
from threading import Lock as Mutex
8+
from typing import Any
79

810
from .auth import Auth
911

@@ -29,3 +31,31 @@ def _update_with(self, json, *args, **kwargs):
2931
with self._mu:
3032
for f in fields(new_obj):
3133
setattr(self, f.name, getattr(new_obj, f.name))
34+
35+
36+
def redact(json: dict[Any, Any], *, allowed: list[str]) -> dict[Any, Any]:
37+
"""Returns a copy of the given JSON dict with non-allowed keys redacted."""
38+
if len(allowed) == 1 and allowed[0] == "*":
39+
return deepcopy(json)
40+
41+
allowed_here = {}
42+
for allow in allowed:
43+
k, _, children = allow.partition(".")
44+
if k not in allowed_here:
45+
allowed_here[k] = []
46+
if not children:
47+
children = "*"
48+
allowed_here[k].append(children)
49+
50+
ret = {}
51+
for k, v in json.items():
52+
if isinstance(v, dict):
53+
ret[k] = redact(v, allowed=allowed_here.get(k, []))
54+
elif k in allowed_here:
55+
ret[k] = v
56+
else:
57+
if isinstance(v, list):
58+
ret[k] = ["<REDACTED>"]
59+
else:
60+
ret[k] = "<REDACTED>"
61+
return ret

pyschlage/lock.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
from __future__ import annotations
44

5-
from dataclasses import dataclass
5+
from dataclasses import dataclass, field
6+
from typing import Any
67

78
from .auth import Auth
89
from .code import AccessCode
9-
from .common import Mutable
10+
from .common import Mutable, redact
1011
from .exceptions import NotAuthenticatedError
1112
from .log import LockLog
1213
from .user import User
@@ -95,7 +96,9 @@ class Lock(Mutable):
9596
access_codes: dict[str, AccessCode] | None = None
9697
"""Access codes for this lock, keyed by their ID."""
9798

98-
_cat: str = ""
99+
_cat: str = field(default="", repr=False)
100+
101+
_json: dict[Any, Any] = field(default_factory=dict, repr=False)
99102

100103
@staticmethod
101104
def request_path(device_id: str | None = None) -> str:
@@ -149,6 +152,53 @@ def from_json(cls, auth: Auth, json: dict) -> Lock:
149152
mac_address=attributes.get("macAddress"),
150153
users=users,
151154
_cat=json["CAT"],
155+
_json=json,
156+
)
157+
158+
def get_diagnostics(self) -> dict[Any, Any]:
159+
"""Returns a redacted dict of the raw JSON for diagnostics purposes."""
160+
return redact(
161+
self._json,
162+
allowed=[
163+
"attributes.accessCodeLength",
164+
"attributes.actAlarmBuzzerEnabled",
165+
"attributes.actAlarmState",
166+
"attributes.actuationCurrentMax",
167+
"attributes.alarmSelection",
168+
"attributes.alarmSensitivity",
169+
"attributes.alarmState",
170+
"attributes.autoLockTime",
171+
"attributes.batteryChangeDate",
172+
"attributes.batteryLevel",
173+
"attributes.batteryLowState",
174+
"attributes.batterySaverConfig",
175+
"attributes.batterySaverState",
176+
"attributes.beeperEnabled",
177+
"attributes.bleFirmwareVersion",
178+
"attributes.firmwareUpdate",
179+
"attributes.homePosCurrentMax",
180+
"attributes.keypadFirmwareVersion",
181+
"attributes.lockAndLeaveEnabled",
182+
"attributes.lockState",
183+
"attributes.lockStateMetadata",
184+
"attributes.mainFirmwareVersion",
185+
"attributes.mode",
186+
"attributes.modelName",
187+
"attributes.periodicDeepQueryTimeSetting",
188+
"attributes.psPollEnabled",
189+
"attributes.timezone",
190+
"attributes.wifiFirmwareVersion",
191+
"attributes.wifiRssi",
192+
"connected",
193+
"connectivityUpdated",
194+
"created",
195+
"devicetypeId",
196+
"lastUpdated",
197+
"modelName",
198+
"name",
199+
"role",
200+
"timezone",
201+
],
152202
)
153203

154204
def _is_wifi_lock(self) -> bool:

tests/test_common.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
import pytest
6+
7+
from pyschlage import common
8+
9+
10+
@pytest.fixture
11+
def json_dict() -> dict[Any, Any]:
12+
return {
13+
"a": "foo",
14+
"b": 1,
15+
"c": {
16+
"c0": "foo",
17+
"c1": 1,
18+
"c2": {
19+
"c20": "foo",
20+
},
21+
"c3": ["foo"],
22+
},
23+
"d": ["foo"],
24+
}
25+
26+
27+
def test_redact_allow_asterisk(json_dict: dict[Any, Any]):
28+
assert common.redact(json_dict, allowed=["*"]) == json_dict
29+
30+
31+
def test_redact_allow_all(json_dict: dict[Any, Any]):
32+
assert common.redact(json_dict, allowed=["a", "b", "c.*", "d"]) == json_dict
33+
assert (
34+
common.redact(
35+
json_dict, allowed=["a", "b", "c.c0", "c.c1", "c.c2", "c.c3", "d"]
36+
)
37+
== json_dict
38+
)
39+
assert common.redact(json_dict, allowed=["a", "b", "c", "d"]) == json_dict
40+
41+
42+
def test_redact_all(json_dict: dict[Any, Any]):
43+
want = {
44+
"a": "<REDACTED>",
45+
"b": "<REDACTED>",
46+
"c": {
47+
"c0": "<REDACTED>",
48+
"c1": "<REDACTED>",
49+
"c2": {
50+
"c20": "<REDACTED>",
51+
},
52+
"c3": ["<REDACTED>"],
53+
},
54+
"d": ["<REDACTED>"],
55+
}
56+
assert common.redact(json_dict, allowed=[]) == want
57+
58+
59+
def test_redact_partial(json_dict: dict[Any, Any]):
60+
want = {
61+
"a": "foo",
62+
"b": 1,
63+
"c": {
64+
"c0": "foo",
65+
"c1": "<REDACTED>",
66+
"c2": {
67+
"c20": "<REDACTED>",
68+
},
69+
"c3": ["<REDACTED>"],
70+
},
71+
"d": ["<REDACTED>"],
72+
}
73+
assert common.redact(json_dict, allowed=["a", "b", "c.c0"])

tests/test_lock.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,72 @@ def test_from_json_wifi_lock_unavailable(
5858
assert lock.is_locked is None
5959
assert lock.is_jammed is None
6060

61+
def test_diagnostics(self, mock_auth: mock.Mock, lock_json: dict) -> None:
62+
lock = Lock.from_json(mock_auth, lock_json)
63+
want = {
64+
"CAT": "<REDACTED>",
65+
"SAT": "<REDACTED>",
66+
"attributes": {
67+
"CAT": "<REDACTED>",
68+
"SAT": "<REDACTED>",
69+
"accessCodeLength": 4,
70+
"actAlarmBuzzerEnabled": 0,
71+
"actAlarmState": 0,
72+
"actuationCurrentMax": 226,
73+
"alarmSelection": 0,
74+
"alarmSensitivity": 0,
75+
"alarmState": 0,
76+
"autoLockTime": 0,
77+
"batteryChangeDate": 1669017530,
78+
"batteryLevel": 95,
79+
"batteryLowState": 0,
80+
"batterySaverConfig": {"activePeriod": [], "enabled": 0},
81+
"batterySaverState": 0,
82+
"beeperEnabled": 1,
83+
"bleFirmwareVersion": "0118.000103.015",
84+
"diagnostics": {},
85+
"firmwareUpdate": {
86+
"status": {"additionalInfo": None, "updateStatus": None}
87+
},
88+
"homePosCurrentMax": 153,
89+
"keypadFirmwareVersion": "03.00.00250052",
90+
"lockAndLeaveEnabled": 1,
91+
"lockState": 1,
92+
"lockStateMetadata": {
93+
"UUID": None,
94+
"actionType": "periodicDeepQuery",
95+
"clientId": None,
96+
"name": None,
97+
},
98+
"macAddress": "<REDACTED>",
99+
"mainFirmwareVersion": "10.00.00264232",
100+
"mode": 2,
101+
"modelName": "__model_name__",
102+
"periodicDeepQueryTimeSetting": 60,
103+
"psPollEnabled": 1,
104+
"serialNumber": "<REDACTED>",
105+
"timezone": -20,
106+
"wifiFirmwareVersion": "03.15.00.01",
107+
"wifiRssi": -42,
108+
},
109+
"connected": True,
110+
"connectivityUpdated": "2022-12-04T20:58:22.000Z",
111+
"created": "2020-04-05T21:53:11.000Z",
112+
"deviceId": "<REDACTED>",
113+
"devicetypeId": "be489wifi",
114+
"lastUpdated": "2022-12-04T20:58:22.000Z",
115+
"macAddress": "<REDACTED>",
116+
"modelName": "__model_name__",
117+
"name": "Door Lock",
118+
"physicalId": "<REDACTED>",
119+
"relatedDevices": ["<REDACTED>"],
120+
"role": "owner",
121+
"serialNumber": "<REDACTED>",
122+
"timezone": -20,
123+
"users": ["<REDACTED>"],
124+
}
125+
assert lock.get_diagnostics() == want
126+
61127
def test_refresh(
62128
self, mock_auth: mock.Mock, lock_json: dict, access_code_json: dict
63129
) -> None:

0 commit comments

Comments
 (0)