Skip to content

Commit 6b27706

Browse files
committed
Add support for user remote buttons
1 parent 55fba30 commit 6b27706

File tree

5 files changed

+149
-11
lines changed

5 files changed

+149
-11
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ peripherals = [
2525
zones = [1, 2, 4, 6] # list of zones in use
2626
areas = [1, 2] # list of areas in use
2727
outputs = [1] # list of outputs in use
28+
remotes = [1, 3] # list of remotes in use
29+
remote_buttons = { # list of user remotes associated with buttons
30+
1: [a] # use remote id `0` for "any remote"
31+
3: [b, c]
32+
}
33+
remote_buttons_timeout = 1000 # time, in milliseconds, that button ports stay `true` after pressed
2834
serial_port = "/dev/ttyUSB0"
2935
serial_baud = 9600 # this is the default
3036
ip_host = "192.168.1.2" # specify either this or serial_port, not both

qtoggleserver/paradox/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@
1818
PGM_ACTION_OFF_OVERRIDE = 'off_override'
1919
PGM_ACTION_ON_OVERRIDE = 'on_override'
2020
PGM_ACTION_PULSE = 'pulse'
21+
22+
DEFAULT_REMOTE_BUTTONS_TIMEOUT = 1000

qtoggleserver/paradox/output.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ async def read_value(self) -> bool:
4242

4343

4444
class OutputTamperPort(OutputPort):
45+
# TODO: this should actually be an output port
4546
TYPE = 'boolean'
4647
WRITABLE = False
4748

qtoggleserver/paradox/paradoxalarm.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ def __init__(
2727
areas: list[int] = None,
2828
zones: list[int] = None,
2929
outputs: list[int] = None,
30+
remotes: list[int] = None,
31+
remote_buttons: dict[str, str] = None,
32+
remote_buttons_timeout: int = constants.DEFAULT_REMOTE_BUTTONS_TIMEOUT,
3033
serial_port: Optional[str] = None,
3134
serial_baud: int = constants.DEFAULT_SERIAL_BAUD,
3235
ip_host: Optional[str] = None,
@@ -38,16 +41,19 @@ def __init__(
3841
self.setup_config()
3942
self.setup_logging()
4043

41-
self._areas = areas or []
42-
self._zones = zones or []
43-
self._outputs = outputs or []
44+
self._areas: list[int] = areas or []
45+
self._zones: list[int] = zones or []
46+
self._outputs: list[int] = outputs or []
47+
self._remotes: list[int] = remotes or []
48+
self._remote_buttons: dict[int, str] = {int(k): v for k, v in (remote_buttons or {}).items()}
49+
self._remote_buttons_timeout: int = remote_buttons_timeout
4450

45-
self._serial_port = serial_port
46-
self._serial_baud = serial_baud
47-
self._ip_host = ip_host
48-
self._ip_port = ip_port
49-
self._ip_password = ip_password
50-
self._panel_password = panel_password
51+
self._serial_port: Optional[str] = serial_port
52+
self._serial_baud: int = serial_baud
53+
self._ip_host: Optional[str] = ip_host
54+
self._ip_port: int = ip_port
55+
self._ip_password: str = ip_password
56+
self._panel_password: str = panel_password
5157

5258
self._paradox = None
5359
self._panel_task = None
@@ -130,8 +136,9 @@ async def connect(self) -> None:
130136
async def make_port_args(self) -> list[dict[str, Any]]:
131137
from .area import AreaAlarmPort, AreaArmedPort
132138
from .output import OutputTamperPort, OutputTroublePort
133-
from .zone import ZoneAlarmPort, ZoneOpenPort, ZoneTamperPort, ZoneTroublePort
139+
from .remote import AnyRemoteButtonPort, RemoteButtonPort
134140
from .system import SystemTroublePort
141+
from .zone import ZoneAlarmPort, ZoneOpenPort, ZoneTamperPort, ZoneTroublePort
135142

136143
port_args = []
137144
port_args += [{'driver': AreaAlarmPort, 'area': area} for area in self._areas]
@@ -144,6 +151,28 @@ async def make_port_args(self) -> list[dict[str, Any]]:
144151
port_args += [{'driver': ZoneTroublePort, 'zone': zone} for zone in self._zones]
145152
port_args += [{'driver': SystemTroublePort}]
146153

154+
for remote in self._remotes:
155+
for button in self._remote_buttons.get(remote, []):
156+
port_args.append(
157+
{
158+
'driver': RemoteButtonPort,
159+
'remote': remote,
160+
'button': button,
161+
'timeout': self._remote_buttons_timeout,
162+
}
163+
)
164+
165+
any_remote_buttons = self._remote_buttons.get(0, [])
166+
for button in any_remote_buttons:
167+
port_args.append(
168+
{
169+
'driver': AnyRemoteButtonPort,
170+
'remotes': self._remotes,
171+
'button': button,
172+
'timeout': self._remote_buttons_timeout,
173+
}
174+
)
175+
147176
return port_args
148177

149178
async def handle_connected(self) -> None:
@@ -254,7 +283,7 @@ def handle_paradox_property_change(self, change: Any) -> None:
254283
except Exception as e:
255284
self.error('property change handler execution failed: %s', e, exc_info=True)
256285

257-
def get_property(self, type_: str, id_: Optional[Union[str, int]], name: str) -> Property:
286+
def get_property(self, type_: str, id_: Optional[Union[str, int]], name: str) -> Optional[Property]:
258287
if type_ == 'system':
259288
return self._properties.get(type_, {}).get(name)
260289
else:

qtoggleserver/paradox/remote.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import time
2+
3+
from abc import ABCMeta
4+
5+
from .paradoxport import ParadoxPort
6+
from .typing import Property
7+
8+
9+
class RemotePort(ParadoxPort, metaclass=ABCMeta):
10+
def __init__(self, remote: int, *args, **kwargs) -> None:
11+
self.remote: int = remote
12+
13+
super().__init__(*args, **kwargs)
14+
15+
def make_id(self) -> str:
16+
return f'remote{self.remote}.{self.ID}'
17+
18+
def get_remote_label(self) -> str:
19+
return self.get_property('label') or f'Remote {self.remote}'
20+
21+
def get_property(self, name: str) -> Property:
22+
return self.get_peripheral().get_property('user', self.remote, name)
23+
24+
def get_properties(self) -> dict[str, Property]:
25+
return self.get_peripheral().get_properties('user', self.remote)
26+
27+
28+
class RemoteButtonPort(RemotePort):
29+
TYPE = 'boolean'
30+
WRITABLE = False
31+
32+
ID = 'button'
33+
34+
def __init__(self, button: str, timeout: int, *args, **kwargs) -> None:
35+
self.button: str = button
36+
self.timeout: int = timeout
37+
self.last_button_value: int = 0
38+
self.change_timestamp: int = 0
39+
40+
super().__init__(*args, **kwargs)
41+
42+
def make_id(self) -> str:
43+
return f'{super().make_id()}_{self.button}'
44+
45+
async def attr_get_default_display_name(self) -> str:
46+
return f'{super().get_remote_label()} Button {self.button.upper()}'
47+
48+
async def read_value(self) -> bool:
49+
now = int(time.time() * 1000)
50+
value = self.get_button_value()
51+
if value and value != self.last_button_value:
52+
self.debug('button value changed from %s to %s', self.last_button_value, value)
53+
self.last_button_value = value
54+
self.change_timestamp = now
55+
56+
return now - self.change_timestamp <= self.timeout
57+
58+
def get_button_value(self) -> int:
59+
return self.get_property(f'button_{self.button}') or 0
60+
61+
62+
class AnyRemoteButtonPort(RemotePort):
63+
TYPE = 'boolean'
64+
WRITABLE = False
65+
66+
ID = 'button'
67+
68+
def __init__(self, remotes: list[int], button: str, timeout: int, *args, **kwargs) -> None:
69+
self.remotes: list[int] = remotes
70+
self.button: str = button
71+
self.timeout: int = timeout
72+
self.last_button_values: dict[int, int] = {}
73+
self.change_timestamps: dict[int, int] = {}
74+
75+
super().__init__(remote=0, *args, **kwargs)
76+
77+
def make_id(self) -> str:
78+
return f'remote.{self.ID}_{self.button}'
79+
80+
async def attr_get_default_display_name(self) -> str:
81+
return f'Remote Button {self.button.upper()}'
82+
83+
async def read_value(self) -> bool:
84+
now = int(time.time() * 1000)
85+
for remote in self.remotes:
86+
value = self.get_button_value(remote)
87+
last_value = self.last_button_values.get(remote, 0)
88+
if value and value != last_value:
89+
self.debug('button value changed from %s to %s on remote %s', last_value, value, remote)
90+
self.last_button_values[remote] = value
91+
self.change_timestamps[remote] = now
92+
93+
for remote in self.remotes:
94+
if now - self.change_timestamps.get(remote, 0) <= self.timeout:
95+
return True
96+
97+
return False
98+
99+
def get_button_value(self, remote: int) -> int:
100+
return self.get_peripheral().get_property('user', remote, f'button_{self.button}')

0 commit comments

Comments
 (0)