diff --git a/cli.py b/cli.py index 93c3548..c54f6e9 100755 --- a/cli.py +++ b/cli.py @@ -5,6 +5,8 @@ import argparse import asyncio import logging +import re +from types import ModuleType from aiocmd import aiocmd @@ -27,7 +29,6 @@ Baudrate, ModbusEvents, Parity, - ProductId, ResetMode, StopBits, VMDBypassMode, @@ -47,17 +48,29 @@ from pyairios.models.brdg_02r13 import ( DEFAULT_SLAVE_ID as BRDG02R13_DEFAULT_SLAVE_ID, ) -from pyairios.models.vmd_02rps78 import VMD02RPS78 -from pyairios.models.vmn_05lm02 import VMN05LM02 + +LOGGER = logging.getLogger(__name__) + +# global variables available in all classes: +MODULES: dict[str, ModuleType] = {} +# dict with imported modules by class name +PRIDS: dict[str, int] = {} +# dict with pr_id's (expected productid) by class name +# DESCRIPTIONS: dict[str, str] = {} +# # dict with label description model, for use in UI class AiriosVMN05LM02CLI(aiocmd.PromptToolkitCmd): - """The VMN05LM02 CLI interface.""" + """The VMN_ modules common CLI interface.""" - def __init__(self, vmn: VMN05LM02) -> None: + def __init__(self, vmn) -> None: # TODO(eb): subclass aiocmd_type + """ + :param vmn: contains all details of this model + """ super().__init__() - self.prompt = f"[VMN-05LM02@{vmn.slave_id}]>> " self.vmn = vmn + self.class_pointer = str(vmn) + self.prompt = f"[{str(vmn)}]>> " async def do_received_product_id(self) -> None: """Print the received product ID from the device.""" @@ -72,37 +85,26 @@ async def do_requested_ventilation_speed(self) -> None: print(f"\t{res.status}") async def do_status(self) -> None: - """Print the device status.""" - res = await self.vmn.fetch_vmn_data() - print("Node data") - print("---------") - print(f" {'Product ID:': <25}{res['product_id']}") - print(f" {'Product Name:': <25}{res['product_name']}") - print(f" {'Software version:': <25}{res['sw_version']}") - print(f" {'RF address:': <25}{res['rf_address']}") - print("") - - print("Device data") - print("---------") - print(f" {'RF comm status:': <25}{res['rf_comm_status']}") - print(f" {'Battery status:': <25}{res['battery_status']}") - print(f" {'Fault status:': <25}{res['fault_status']}") - print(f" {'Bound status:': <25}{res['bound_status']}") - print(f" {'Value error status:': <25}{res['value_error_status']}") - print("") - - print("VMN-02LM11 data") - print("----------------") - print(f" {'Requested ventilation speed:': <40}{res['requested_ventilation_speed']}") + """Print the complete device status.""" + await self.vmn.print_data() class AiriosVMD02RPS78CLI(aiocmd.PromptToolkitCmd): """The VMD02RPS78 CLI interface.""" - def __init__(self, vmd: VMD02RPS78) -> None: + def __init__(self, vmd) -> None: + """ + :param vmd: contains all details of this model + """ super().__init__() - self.prompt = f"[VMD-02RPS78@{vmd.slave_id}]>> " self.vmd = vmd + self.class_pointer = str(vmd) + self.prompt = f"[{str(vmd)}]>> " + + async def do_received_product_id(self) -> None: + """Print the received product ID from the device.""" + res = await self.vmd.node_received_product_id() + print(f"0x{res.value:08X}") async def do_capabilities(self) -> None: """Print the device RF capabilities.""" @@ -110,91 +112,8 @@ async def do_capabilities(self) -> None: print(f"{res.value} ({res.status})") async def do_status(self) -> None: # pylint: disable=too-many-statements - """Print the device status.""" - res = await self.vmd.fetch_vmd_data() - print("Node data") - print("---------") - print(f" {'Product ID:': <25}{res['product_id']}") - print(f" {'Product Name:': <25}{res['product_name']}") - print(f" {'Software version:': <25}{res['sw_version']}") - print(f" {'RF address:': <25}{res['rf_address']}") - print("") - - print("Device data") - print("---------") - print(f" {'RF comm status:': <25}{res['rf_comm_status']}") - print(f" {'Battery status:': <25}{res['battery_status']}") - print(f" {'Fault status:': <25}{res['fault_status']}") - print(f" {'Bound status:': <25}{res['bound_status']}") - print(f" {'Value error status:': <25}{res['value_error_status']}") - print("") - - print("VMD-02RPS78 data") - print("----------------") - print(f" {'Error code:': <25}{res['error_code']}") - - print(f" {'Ventilation speed:': <25}{res['ventilation_speed']}") - print(f" {'Override remaining time:': <25}{res['override_remaining_time']}") - - print( - f" {'Supply fan speed:': <25}{res['supply_fan_speed']}% " - f"({res['supply_fan_rpm']} RPM)" - ) - print( - f" {'Exhaust fan speed:': <25}{res['exhaust_fan_speed']}% " - f"({res['exhaust_fan_rpm']} RPM)" - ) - - print(f" {'Indoor temperature:': <25}{res['indoor_air_temperature']}") - print(f" {'Outdoor temperature:': <25}{res['outdoor_air_temperature']}") - print(f" {'Exhaust temperature:': <25}{res['exhaust_air_temperature']}") - print(f" {'Supply temperature:': <25}{res['supply_air_temperature']}") - - print(f" {'Filter dirty:': <25}{res['filter_dirty']}") - print(f" {'Filter remaining:': <25}{res['filter_remaining_percent']} %") - print(f" {'Filter duration:': <25}{res['filter_duration_days']} days") - - print(f" {'Bypass position:': <25}{res['bypass_position']}") - print(f" {'Bypass status:': <25}{res['bypass_status']}") - print(f" {'Bypass mode:': <25}{res['bypass_mode']}") - - print(f" {'Defrost:': <25}{res['defrost']}") - print(f" {'Preheater:': <25}{res['preheater']}") - print(f" {'Postheater:': <25}{res['postheater']}") - print("") - - print(f" {'Preset speeds':<25}{'Supply':<10}{'Exhaust':<10}") - print(f" {'-------------':<25}") - print( - f" {'High':<25}{str(res['preset_high_fan_speed_supply']) + ' %':<10}" - f"{str(res['preset_high_fan_speed_exhaust']) + ' %':<10}" - ) - print( - f" {'Mid':<25}{str(res['preset_medium_fan_speed_supply']) + ' %':<10}" - f"{str(res['preset_medium_fan_speed_exhaust']) + ' %':<10}" - ) - print( - f" {'Low':<25}{str(res['preset_low_fan_speed_supply']) + ' %':<10}" - f"{str(res['preset_low_fan_speed_exhaust']) + ' %':<10}" - ) - print( - f" {'Standby':<25}{str(res['preset_standby_fan_speed_supply']) + ' %':<10}" - f"{str(res['preset_standby_fan_speed_exhaust']) + ' %':<10}" - ) - print("") - - print(" Setpoints") - print(" ---------") - print( - f" {'Frost protection preheater setpoint:':<40}" - f"{res['frost_protection_preheater_setpoint']} ºC" - ) - print(f" {'Preheater setpoint:': <40}{res['preheater_setpoint']} ºC") - print(f" {'Free ventilation setpoint:':<40}{res['free_ventilation_setpoint']} ºC") - print( - f" {'Free ventilation cooling offset:':<40}" - f"{res['free_ventilation_cooling_offset']} K" - ) + """Print the complete device status.""" + await self.vmd.print_data() async def do_error_code(self) -> None: """Print the current error code.""" @@ -303,43 +222,222 @@ async def do_filter_reset(self): await self.vmd.filter_reset() +class AiriosVMD07RPS13CLI(aiocmd.PromptToolkitCmd): + """The VMD07RPS13 ClimaRad Ventura V1 CLI interface.""" + + def __init__(self, vmd) -> None: + """ + :param vmd: contains all details of this model + """ + super().__init__() + self.vmd = vmd + self.class_pointer = str(vmd) + self.prompt = f"[{str(vmd)}]>> " + + async def do_received_product_id(self) -> None: + """Print the received product ID from the device.""" + res = await self.vmd.node_received_product_id() + print(f"0x{res.value:08X}") + + async def do_capabilities(self) -> None: + """Print the device RF capabilities.""" + res = await self.vmd.capabilities() + if res is not None: + print(f"{res.value} ({res.status})") + else: + print("N/A") + + async def do_status(self) -> None: # pylint: disable=too-many-statements + """Print the complete device status.""" + await self.vmd.print_data() + + async def do_error_code(self) -> None: + """Print the current error code.""" + res = await self.vmd.error_code() + print(f"{res}") + + async def do_vent_mode(self) -> None: + """Print the current ventilation mode.""" + res = await self.vmd.vent_mode() + print(f"{res}") + if res.status is not None: + print(f"{res.status}") + + async def do_rq_vent_mode(self) -> None: # failed + """Print the current requested ventilation mode.""" + res = await self.vmd.rq_vent_mode() + print(f"{res}") + + async def do_rq_vent_sub_mode(self) -> None: # works! + """Print the current requested ventilation sub mode.""" + res = await self.vmd.rq_vent_sub_mode() + print(f"{res}") + + async def do_rq_temp_vent_mode(self) -> None: + """Print the requested temp. ventilation mode.""" + res = await self.vmd.rq_temp_vent_mode() + print(f"{res}") + + async def do_rq_temp_vent_mode_set(self, preset: int) -> None: + """Change the requested ventilation mode. 0=Off, 1=Pause, 2=On, 3=Man1, 5=Man3, 8=Service""" + # s = VMDRequestedVentilationSpeed.parse(preset) + res = await self.vmd.set_rq_temp_vent_mode(preset) # (s) lookup? + print(f"{res}") + + async def do_rq_temp_vent_sub_mode(self) -> None: + """Print the requested temp. ventilation sub mode.""" + res = await self.vmd.rq_temp_vent_sub_mode() + print(f"{res}") + + async def do_rq_vent_mode_set(self, preset: int) -> None: + """Change the requested ventilation mode. 0=Off, 1=Pause, 2=On, 3=Man1, 5=Man3, 8=Service""" + # s = VMDRequestedVentilationSpeed.parse(preset) + res = await self.vmd.set_rq_vent_mode(preset) # (s) lookup? + print(f"{res}") + + async def do_rq_vent_sub_mode_set(self, preset: int) -> None: + """Change the requested ventilation sub mode. 0=Off, 201 - 205""" + # s = VMDRequestedVentilationSpeed.parse(preset) + res = await self.vmd.set_rq_vent_sub_mode(preset) # (s) lookup? + print(f"{res}") + + async def do_temp_vent_sub_mode(self) -> None: + """Print the current temp. ventilation sub mode""" + # s = VMDRequestedVentilationSpeed.parse(preset) + res = await self.vmd.temp_vent_sub_mode() # (s) lookup? + print(f"{res}") + + async def do_rq_temp_vent_sub_mode_set(self, preset: int) -> None: + """Change the requested ventilation sub mode. 0=Off, 201 - 205""" + # s = VMDRequestedVentilationSpeed.parse(preset) + res = await self.vmd.set_rq_temp_vent_sub_mode(preset) # (s) lookup? + print(f"{res}") + + async def do_ventilation_speed(self) -> None: + """Print the current ventilation speed.""" + res = await self.vmd.ventilation_speed() + print(f"{res}") + if res.status is not None: + print(f"{res.status}") + + async def do_indoor_hum(self): + """Print the indoor humidity level in %.""" + res = await self.vmd.indoor_humidity() + print(f"{res} %") + + async def do_outdoor_hum(self): + """Print the outdoor humidity level in %.""" + res = await self.vmd.outdoor_humidity() + print(f"{res} %") + + async def do_bypass_pos(self): + """Print the bypass position.""" + res = await self.vmd.bypass_position() + print(f"{res} {'Open' if res == 1 else 'Closed'}") + + async def do_base_vent_enabled(self): + """Print the base ventilation enabled: On/Off = 1/0.""" + res = await self.vmd.basic_vent_enable() + print(f"{res} {'On' if res.value == 1 else 'Off'}") + + async def do_base_vent_enabled_set(self, state: bool) -> None: + """Set the base ventilation enabled: on/off = 1/0.""" + if await self.vmd.set_basic_vent_enable(state): + await self.do_base_vent_enabled() + else: + print("Error setting base_vent_enabled") + + async def do_base_vent_level(self): + """Print the base ventilation level.""" + res = await self.vmd.basic_vent_level() + print(f"base_vent_level: {res}") + + async def do_base_vent_level_set(self, lvl: int) -> None: + """Set the base ventilation level.""" + if await self.vmd.set_basic_vent_level(lvl): + res = await self.vmd.basic_vent_level() + print(f"base_vent_level set to: {res.value}") + else: + print("Error setting base_vent_level") + + async def do_filter_remaining(self): + """Print the filter remaining.""" + r1 = await self.vmd.filter_remaining_percent() + r2 = await self.vmd.filter_remaining_days() + print(f"{r1.value} % ({r2.value} days)") + + async def do_co2_setpoint(self): + """Print the CO2 setpoint in ppm.""" + res = await self.vmd.co2_setpoint() + print(f"{res.value} ppm") + + async def do_co2_setpoint_set(self, setp: int) -> None: + """Change the CO2 setpoint in ppm. Factory default = 1000.""" + if await self.vmd.set_co2_setpoint(setp): + res = await self.vmd.co2_setpoint() + print(f"CO2 setpoint set to: {res.value} ppm") + else: + print("Error setting CO2 setpoint") + + # actions + + async def do_filter_reset(self): + """Reset the filter change timer.""" + await self.vmd.filter_reset() + + class AiriosBridgeCLI(aiocmd.PromptToolkitCmd): """The bridge CLI interface.""" def __init__(self, bridge: BRDG02R13) -> None: super().__init__() - self.prompt = f"[BRDG-02R13@{bridge.slave_id}]>> " + self.prompt = f"[{str(bridge)}]>> " self.bridge = bridge async def do_nodes(self) -> None: """Print the list of bound nodes.""" + LOGGER.debug("do_node starting") res = await self.bridge.nodes() for n in res: print(f"{n}") async def do_node(self, slave_id: str) -> None: """Manage a bound node.""" + LOGGER.debug("do_node fetch nodes") nodes = await self.bridge.nodes() + LOGGER.debug("do_node starting") node_info = None for n in nodes: + LOGGER.debug("do_node match slave_id") if int(slave_id) == int(n.slave_id): node_info = n break - + LOGGER.debug("node_info starting") if node_info is None: raise AiriosIOException(f"Node with address {slave_id} not bound") - if node_info.product_id == ProductId.VMD_02RPS78: - vmd = VMD02RPS78(node_info.slave_id, self.bridge.client) - await AiriosVMD02RPS78CLI(vmd).run() - return - - if node_info.product_id == ProductId.VMN_05LM02: - vmn = VMN05LM02(node_info.slave_id, self.bridge.client) - await AiriosVMN05LM02CLI(vmn).run() - return - - raise AiriosNotImplemented(f"{node_info.product_id} not implemented") + # find by product_id: {'VMD-07RPS13': 116867, 'VMD-02RPS78': 116882, 'VMN-05LM02': 116798} + # fetch models etc. from bridge. compare to src/pyairios/_init_.py + for key, _id in PRIDS.items(): + LOGGER.debug("Fetch _id for item: %s", key) + if node_info.product_id == _id: + # Can we use node["product_name"] as key? + _node = MODULES[key].Node(node_info.slave_id, self.bridge.client) + if key == "VMD-02RPS78": # dedicated CLI for each model + await AiriosVMD02RPS78CLI(_node).run() + return + if key == "VMD-07RPS13": # ClimaRad Ventura + LOGGER.debug("Node Ventura starts") + await AiriosVMD07RPS13CLI(_node).run() + return + if key == "VMN-05LM02": # Remote accessory + await AiriosVMN05LM02CLI(_node).run() + return + # add new models AiriosXXXXXXXXXCLI here to use them in CLI + + raise AiriosNotImplemented( + f"{node_info.product_id} not implemented. Drop new definitions in models/" + ) async def do_rf_sent_messages(self) -> None: """Print the RF sent messages.""" @@ -404,9 +502,13 @@ async def do_bind_controller( ) -> None: """Bind a new controller.""" slave_id = int(slave_id) - pid = ProductId(int(product_id)) + if product_id.startswith("0X") and len(product_id) > 2: + product_id = str(re.sub(r"0X", "0x", product_id)) + pid = int(product_id) # should recognize HEX 0x123 psn = None if product_serial is not None: + if product_serial.startswith("0X") and len(product_serial) > 2: + product_serial = str(re.sub(r"0X", "0x", product_serial)) psn = int(product_serial) await self.bridge.bind_controller(slave_id, pid, psn) @@ -414,7 +516,9 @@ async def do_bind_accessory(self, ctrl_slave_id, slave_id, product_id) -> None: """Bind a new accessory.""" ctrl_slave_id = int(ctrl_slave_id) slave_id = int(slave_id) - pid = ProductId(int(product_id)) + if product_id.startswith("0X") and len(product_id) > 2: + product_id = str(re.sub(r"0X", "0x", product_id)) + pid = int(product_id) # should recognize HEX 0x123 await self.bridge.bind_accessory(ctrl_slave_id, slave_id, pid) async def do_software_build_date(self) -> None: @@ -442,30 +546,8 @@ async def do_set_oem_code(self, number: int) -> None: await self.bridge.set_oem_code(int(number)) async def do_status(self) -> None: - """Print the device status.""" - res = await self.bridge.fetch_bridge() - print("Node data") - print("---------") - print(f" {'Product ID:': <25}{res['product_id']}") - print(f" {'Product Name:': <25}{res['product_name']}") - print(f" {'Software version:': <25}{res['sw_version']}") - print(f" {'RF address:': <25}{res['rf_address']}") - print("") - - print("Device data") - print("---------") - print(f" {'RF comm status:': <25}{res['rf_comm_status']}") - print(f" {'Battery status:': <25}{res['battery_status']}") - print(f" {'Fault status:': <25}{res['fault_status']}") - print("") - - print("BRDG-02R13 data") - print("----------------") - print(f" {'RF sent messages last hour': <40}{res['rf_sent_messages_last_hour']}") - print(f" {'RF sent messages current hour:': <40}{res['rf_sent_messages_current_hour']}") - print(f" {'RF load last hour:': <40}{res['rf_load_last_hour']}") - print(f" {'RF load current hour:': <40}{res['rf_load_current_hour']}") - print(f" {'Uptime:': <40}{res['power_on_time']}") + """Print the complete device status.""" + await self.bridge.print_data() class AiriosClientCLI(aiocmd.PromptToolkitCmd): # pylint: disable=too-few-public-methods @@ -488,6 +570,16 @@ async def do_bridge(self, address: str | None = None) -> None: else: _address = int(address) bridge = BRDG02R13(_address, self.client) + + # bridge.load_models() # is lazy loaded + global MODULES, PRIDS # pylint: disable=global-statement + MODULES = await bridge.models() + print(f"Loaded modules: {MODULES}") + PRIDS = await bridge.product_ids() + print(f"Loaded product_id's: {PRIDS}") + # DESCRIPTIONS = await bridge.model_descriptions() + # print(f"Supported models by key: {DESCRIPTIONS}") + await AiriosBridgeCLI(bridge).run() @@ -533,9 +625,12 @@ async def do_disconnect(self) -> None: self.client = None async def do_set_log_level(self, level: str) -> None: - "Set the log level: critical, fatal, error, warning, info or debug." + """Set the log level: critical, fatal, error, warning, info or debug.""" + if level is None: + return logging.basicConfig() log = logging.getLogger() + if level.casefold() == "critical".casefold(): log.setLevel(logging.CRITICAL) elif level.casefold() == "fatal".casefold(): diff --git a/src/pyairios/__init__.py b/src/pyairios/__init__.py index e2acce9..b2ba56a 100644 --- a/src/pyairios/__init__.py +++ b/src/pyairios/__init__.py @@ -1,9 +1,10 @@ """The Airios RF bridge API entrypoint.""" +import logging +from types import ModuleType + from pyairios.models.brdg_02r13 import BRDG02R13 from pyairios.models.brdg_02r13 import DEFAULT_SLAVE_ID as BRDG02R13_DEFAULT_SLAVE_ID -from pyairios.models.vmd_02rps78 import VMD02RPS78 -from pyairios.models.vmn_05lm02 import VMN05LM02 from .client import ( AiriosBaseTransport, @@ -13,11 +14,18 @@ AsyncAiriosModbusRtuClient, AsyncAiriosModbusTcpClient, ) -from .constants import BindingStatus, ProductId -from .data_model import AiriosBoundNodeInfo, AiriosData, AiriosNodeData +from .constants import BindingStatus +from .data_model import ( + AiriosBoundNodeInfo, + AiriosData, + AiriosDeviceData, + BRDG02R13Data, +) from .exceptions import AiriosException from .node import AiriosNode +LOGGER = logging.getLogger(__name__) + class Airios: """The Airios RF bridge API.""" @@ -47,6 +55,18 @@ async def node(self, slave_id: int) -> AiriosNode: """Get a node instance by its Modbus slave ID.""" return await self.bridge.node(slave_id) + async def airios_models(self) -> dict[str, ModuleType] | None: + """Get the list of supported models.""" + return await self.bridge.models() + + async def airios_prids(self) -> dict[str, int] | None: + """Get the list of supported product_id's.""" + return await self.bridge.product_ids() + + async def airios_model_descr(self) -> dict[str, str] | None: + """Get the list of supported model descriptions.""" + return await self.bridge.model_descriptions() + async def bind_status(self) -> BindingStatus: """Get the bind status.""" return await self.bridge.bind_status() @@ -54,7 +74,7 @@ async def bind_status(self) -> BindingStatus: async def bind_controller( self, slave_id: int, - product_id: ProductId, + product_id: int, product_serial: int | None = None, ) -> bool: """Bind a new controller to the bridge.""" @@ -64,7 +84,7 @@ async def bind_accessory( self, controller_slave_id: int, slave_id: int, - product_id: ProductId, + product_id: int, ) -> bool: """Bind a new accessory to the bridge.""" return await self.bridge.bind_accessory(controller_slave_id, slave_id, product_id) @@ -75,23 +95,24 @@ async def unbind(self, slave_id: int) -> bool: async def fetch(self) -> AiriosData: """Get the data from all nodes at once.""" - data: dict[int, AiriosNodeData] = {} + data: dict[int, AiriosDeviceData | BRDG02R13Data] = {} - brdg_data = await self.bridge.fetch_bridge() - if brdg_data["rf_address"] is None: + brdg_data = await self.bridge.fetch_bridge_data() + if brdg_data is None or brdg_data["rf_address"] is None: raise AiriosException("Failed to fetch node RF address") bridge_rf_address = brdg_data["rf_address"].value data[self.bridge.slave_id] = brdg_data - for node in await self.bridge.nodes(): - if node.product_id == ProductId.VMD_02RPS78: - vmd = VMD02RPS78(node.slave_id, self.bridge.client) - vmd_data = await vmd.fetch_vmd_data() - data[node.slave_id] = vmd_data - if node.product_id == ProductId.VMN_05LM02: - vmn = VMN05LM02(node.slave_id, self.bridge.client) - vmn_data = await vmn.fetch_vmn_data() - data[node.slave_id] = vmn_data + prids = await self.bridge.product_ids() + models = await self.bridge.models() + if prids is not None and models is not None: + for _node in await self.bridge.nodes(): # for each bound node (slow) + for key, _id in prids.items(): # find a matching model (quick) + if _id == _node.product_id and models[key] is not None: + LOGGER.debug("fetch_node_data for key: %s", key) + node_module = models[key].Node(_node.slave_id, self.bridge.client) + node_data = await node_module.fetch_node_data() # extended DeviceData + data[_node.slave_id] = node_data return AiriosData(bridge_rf_address=bridge_rf_address, nodes=data) diff --git a/src/pyairios/client.py b/src/pyairios/client.py index 9b988ce..0609105 100644 --- a/src/pyairios/client.py +++ b/src/pyairios/client.py @@ -1,4 +1,4 @@ -"""Async client for the Airios BRDB-02R13 Modbus gateway.""" +"""Async client for the Airios BRDG-02R13 Modbus gateway.""" from __future__ import annotations diff --git a/src/pyairios/constants.py b/src/pyairios/constants.py index 0530807..fcb8b8c 100644 --- a/src/pyairios/constants.py +++ b/src/pyairios/constants.py @@ -6,29 +6,18 @@ class ProductId(IntEnum): - """The product ID is a unique product identifier. - - The value is composed by three fields, product type + sub ID + manufacturer ID. + """ + The product ID is a unique product identifier. + The value is composed of three fields: product type + sub ID + manufacturer ID. + Replaced by pr_id dynamic dict, created during _init_ in BRDG """ - BRDG_02R13 = 0x0001C849 - VMD_02RPS78 = 0x0001C892 - VMN_05LM02 = 0x0001C83E - VMN_02LM11 = 0x0001C852 - VMD_07RPS13 = 0x0001C883 # ClimaRad VenturaV1X - - def __str__(self) -> str: - if self.value == self.BRDG_02R13: - return "BRDG-02R13" - if self.value == self.VMD_02RPS78: - return "VMD-02RPS78" - if self.value == self.VMN_05LM02: - return "VMN-05LM02" - if self.value == self.VMN_02LM11: - return "VMN-02LM11" - if self.value == self.VMD_07RPS13: - return "VMD-07RPS13" - raise ValueError(f"Unknown product ID value {self.value}") + # this info was moved to the models/ class files as pr_id + # get the dict from bridge by calling bridge.product_ids + # they will be unique as long as all files are in flat models/ dir + # new definitions will be picked up automatically when dropped there + # only Bridge product_id required as const, used during api init + BRDG_02R13 = 0x0001C849 # RF Bridge class BoundStatus(IntEnum): @@ -115,7 +104,7 @@ class Record: # pylint: disable=too-many-instance-attributes """RF statistic record.""" device_id: int - averate: int + average: int """Average received signal strength margin of RF beacon (dB).""" stddev: float """Standard deviation of received signal strength margin of RF beacon (.1 dB).""" @@ -368,6 +357,7 @@ class VMDHeater: class VMDCapabilities(Flag): """Ventilation unit capabilities.""" + NO_CAPABLE = 0x0000 PRE_HEATER_AVAILABLE = 0x0001 POST_HEATER_AVAILABLE = 0x0002 RESERVED = 0x0004 @@ -393,6 +383,53 @@ class VMDFaultStatus(IntEnum): FAN_FAILURE = 1 +class VMDOffOnMode(IntEnum): + """General use binary state.""" + + OFF = 0 + ON = 1 + UNKNOWN = 10 # not in specs + + +class VMDVentilationMode(IntEnum): + """Ventilation unit (Ventura) mode preset.""" + + OFF = 0 + PAUSE = 1 + ON = 2 + OVERRIDE_1 = 3 + OVERRIDE_2 = 4 + OVERRIDE_3 = 5 + OVERRIDE_4 = 6 + OVERRIDE_5 = 7 + SERVICE = 8 + RETYPE = 9 + UNKNOWN = 10 # not in specs + + def __str__(self) -> str: # pylint: disable=too-many-return-statements + if self.value == self.OFF: + return "Off" + if self.value == self.PAUSE: + return "Pause" + if self.value == self.ON: + return "On/Auto" + if self.value == self.OVERRIDE_1: + return "I (temporary override)" + if self.value == self.OVERRIDE_2: + return "II (temporary override)" + if self.value == self.OVERRIDE_3: + return "III (temporary override)" + if self.value == self.OVERRIDE_4: + return "IV (temporary override)" + if self.value == self.OVERRIDE_5: + return "V (temporary override)" + if self.value == self.SERVICE: + return "Service Mode" + if self.value == self.RETYPE: + return "Retype (see manual)" + raise ValueError(f"Unknown ventilation mode value {self.value}") + + class VMDVentilationSpeed(IntEnum): """Ventilation unit speed preset.""" @@ -590,5 +627,6 @@ def parse(cls, value: str): class VMDBypassPosition: """VMD bypass position sample.""" + # Ventura bp_position: 0 = closed, 100 = open position: int error: bool diff --git a/src/pyairios/data_model.py b/src/pyairios/data_model.py index f316d00..6c7b478 100644 --- a/src/pyairios/data_model.py +++ b/src/pyairios/data_model.py @@ -8,16 +8,8 @@ BatteryStatus, BoundStatus, FaultStatus, - ProductId, RFCommStatus, ValueErrorStatus, - VMDBypassMode, - VMDBypassPosition, - VMDErrorCode, - VMDHeater, - VMDRequestedVentilationSpeed, - VMDTemperature, - VMDVentilationSpeed, ) from pyairios.registers import Result @@ -27,7 +19,7 @@ class AiriosBoundNodeInfo: """Bridge bound node information.""" slave_id: int - product_id: ProductId + product_id: int rf_address: int @@ -36,7 +28,7 @@ class AiriosNodeData(TypedDict): slave_id: int rf_address: Result[int] | None - product_id: Result[ProductId] | None + product_id: Result[int] | None product_name: Result[str] | None sw_version: Result[int] | None rf_comm_status: Result[RFCommStatus] | None @@ -51,41 +43,8 @@ class AiriosDeviceData(AiriosNodeData): value_error_status: Result[ValueErrorStatus] | None -class VMD02RPS78Data(AiriosDeviceData): - """VMD-02RPS78 node data.""" - - error_code: Result[VMDErrorCode] | None - ventilation_speed: Result[VMDVentilationSpeed] | None - override_remaining_time: Result[int] | None - exhaust_fan_speed: Result[int] | None - supply_fan_speed: Result[int] | None - exhaust_fan_rpm: Result[int] | None - supply_fan_rpm: Result[int] | None - indoor_air_temperature: Result[VMDTemperature] | None - outdoor_air_temperature: Result[VMDTemperature] | None - exhaust_air_temperature: Result[VMDTemperature] | None - supply_air_temperature: Result[VMDTemperature] | None - filter_dirty: Result[int] | None - filter_remaining_percent: Result[int] | None - filter_duration_days: Result[int] | None - bypass_position: Result[VMDBypassPosition] | None - bypass_mode: Result[VMDBypassMode] | None - bypass_status: Result[int] | None - defrost: Result[int] | None - preheater: Result[VMDHeater] | None - postheater: Result[VMDHeater] | None - preheater_setpoint: Result[float] | None - free_ventilation_setpoint: Result[float] | None - free_ventilation_cooling_offset: Result[float] | None - frost_protection_preheater_setpoint: Result[float] | None - preset_high_fan_speed_supply: Result[int] | None - preset_high_fan_speed_exhaust: Result[int] | None - preset_medium_fan_speed_supply: Result[int] | None - preset_medium_fan_speed_exhaust: Result[int] | None - preset_low_fan_speed_supply: Result[int] | None - preset_low_fan_speed_exhaust: Result[int] | None - preset_standby_fan_speed_supply: Result[int] | None - preset_standby_fan_speed_exhaust: Result[int] | None +# Here only data_models for special devices. To simplify adding new models, +# 'normal' node data_models, all named DeviceData, are in their respective models/module file class BRDG02R13Data(AiriosNodeData): @@ -98,15 +57,9 @@ class BRDG02R13Data(AiriosNodeData): power_on_time: Result[timedelta] | None -class VMN05LM02Data(AiriosDeviceData): - """VMN-05LM02 node data.""" - - requested_ventilation_speed: Result[VMDRequestedVentilationSpeed] | None - - @dataclass class AiriosData: """Data from all bridge bound nodes.""" bridge_rf_address: int - nodes: dict[int, AiriosNodeData] + nodes: dict[int, AiriosDeviceData | BRDG02R13Data] diff --git a/src/pyairios/device.py b/src/pyairios/device.py index 06fb46d..5fa79ae 100644 --- a/src/pyairios/device.py +++ b/src/pyairios/device.py @@ -2,12 +2,13 @@ from __future__ import annotations +import re from typing import List from .client import AsyncAiriosModbusClient from .constants import BoundStatus, ValueErrorStatus from .data_model import AiriosDeviceData -from .node import AiriosNode +from .node import AiriosNode, _safe_fetch from .registers import ( I16Register, RegisterAccess, @@ -41,6 +42,10 @@ def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: ] self._add_registers(dev_registers) + def __str__(self) -> str: + prompt = str(re.sub(r"_", "-", self.__module__.upper())) + return f"{prompt}@{self.slave_id}" + async def device_bound_status(self) -> Result[BoundStatus]: """Get the device bound status.""" result = await self.client.get_register(self.regmap[Reg.BOUND_STATUS], self.slave_id) @@ -56,13 +61,13 @@ async def fetch_device(self) -> AiriosDeviceData: # pylint: disable=duplicate-c return AiriosDeviceData( slave_id=self.slave_id, - rf_address=await self._safe_fetch(self.node_rf_address), - product_id=await self._safe_fetch(self.node_product_id), - sw_version=await self._safe_fetch(self.node_software_version), - product_name=await self._safe_fetch(self.node_product_name), - rf_comm_status=await self._safe_fetch(self.node_rf_comm_status), - battery_status=await self._safe_fetch(self.node_battery_status), - fault_status=await self._safe_fetch(self.node_fault_status), - bound_status=await self._safe_fetch(self.device_bound_status), - value_error_status=await self._safe_fetch(self.device_value_error_status), + rf_address=await _safe_fetch(self.node_rf_address), + product_id=await _safe_fetch(self.node_product_id), + sw_version=await _safe_fetch(self.node_software_version), + product_name=await _safe_fetch(self.node_product_name), + rf_comm_status=await _safe_fetch(self.node_rf_comm_status), + battery_status=await _safe_fetch(self.node_battery_status), + fault_status=await _safe_fetch(self.node_fault_status), + bound_status=await _safe_fetch(self.device_bound_status), + value_error_status=await _safe_fetch(self.device_value_error_status), ) diff --git a/src/pyairios/models/brdg_02r13.py b/src/pyairios/models/brdg_02r13.py index 25ed33e..22898b9 100644 --- a/src/pyairios/models/brdg_02r13.py +++ b/src/pyairios/models/brdg_02r13.py @@ -12,7 +12,6 @@ BindingStatus, ModbusEvents, Parity, - ProductId, ResetMode, StopBits, ) @@ -22,9 +21,8 @@ AiriosException, AiriosInvalidArgumentException, ) -from pyairios.models.vmd_02rps78 import VMD02RPS78 -from pyairios.models.vmn_05lm02 import VMN05LM02 -from pyairios.node import AiriosNode +from pyairios.models.brdg_base import BrdgBase +from pyairios.node import AiriosNode, _safe_fetch from pyairios.node import Reg as NodeReg from pyairios.registers import ( DateTimeRegister, @@ -40,7 +38,7 @@ DEFAULT_SLAVE_ID = 207 -_LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) @dataclass @@ -135,13 +133,33 @@ class Reg(RegisterAddress): ADDRESS_NODE_32 = 43933 -class BRDG02R13(AiriosNode): +def pr_id() -> int: + """ + Get product_id for model BRDG-02R13. + Named as is to discern from node.product_id register. + :return: unique int + """ + return 0x0001C849 + + +def product_descr() -> str | tuple[str, ...]: + """ + Get description of product(s) using BRDG-02R13. + Human-readable text, used in e.g. HomeAssistant Binding UI. + :return: string or tuple of strings, starting with manufacturer + """ + return "Airios RS485 RF Gateway" + + +class BRDG02R13(BrdgBase): """Represents a BRDG-02R13 RF bridge.""" def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: """Initialize the BRDG-02R13 RF bridge instance.""" super().__init__(slave_id, client) + LOGGER.debug("Init RF Bridge") + brdg_registers: List[RegisterBase] = [ U16Register(Reg.CUSTOMER_PRODUCT_ID, RegisterAccess.READ | RegisterAccess.WRITE), DateTimeRegister(Reg.UTC_TIME, RegisterAccess.READ | RegisterAccess.WRITE), @@ -203,14 +221,16 @@ def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: U16Register(Reg.ADDRESS_NODE_32, RegisterAccess.READ), ] self._add_registers(brdg_registers) + # models are lazy loaded or initiated by caller def __str__(self) -> str: return f"BRDG-02R13@{self.slave_id}" + # node method doesn't work for Bridge module in CLI (contains the path too) async def bind_controller( self, slave_id: int, - product_id: ProductId, + _product_id: int, product_serial: int | None = None, ) -> bool: """Bind a new controller to the bridge.""" @@ -240,7 +260,7 @@ async def bind_controller( mode = BindingMode.OUTGOING_SINGLE_PRODUCT_PLUS_SERIAL ok = await self.client.set_register( - self.regmap[Reg.BINDING_PRODUCT_ID], product_id, self.slave_id + self.regmap[Reg.BINDING_PRODUCT_ID], _product_id, self.slave_id ) if not ok: raise AiriosBindingException("Failed to configure binding product ID") @@ -278,7 +298,7 @@ async def bind_accessory( self, controller_slave_id: int, slave_id: int, - product_id: ProductId, + _product_id: int, ) -> bool: """Bind a new accessory to the bridge.""" if controller_slave_id < 2 or controller_slave_id > 247: @@ -308,7 +328,7 @@ async def bind_accessory( raise AiriosBindingException(f"Bridge not ready for binding: {result.value}") ok = await self.client.set_register( - self.regmap[Reg.BINDING_PRODUCT_ID], product_id, self.slave_id + self.regmap[Reg.BINDING_PRODUCT_ID], _product_id, self.slave_id ) if not ok: raise AiriosBindingException("Failed to configure binding product ID") @@ -324,7 +344,7 @@ async def bind_accessory( ) async def nodes(self) -> List[AiriosBoundNodeInfo]: - """Get the list of bound nodes.""" + """Get the list of bound nodes registered on the bridge.""" reg_descs: List[RegisterBase] = [ self.regmap[Reg.ADDRESS_NODE_1], @@ -360,26 +380,28 @@ async def nodes(self) -> List[AiriosBoundNodeInfo]: self.regmap[Reg.ADDRESS_NODE_31], self.regmap[Reg.ADDRESS_NODE_32], ] - + LOGGER.debug("Starting Bridge.nodes()") nodes: List[AiriosBoundNodeInfo] = [] for item in reg_descs: + LOGGER.debug("Starting item node(%s)", self.slave_id) result = await self.client.get_register(item, self.slave_id) + LOGGER.debug("got result") if result is None or result.value is None: continue slave_id = result.value if slave_id == 0: continue - + LOGGER.debug("Starting self.client.get_register") result = await self.client.get_register(self.regmap[NodeReg.PRODUCT_ID], slave_id) if result is None or result.value is None: continue - try: - product_id = ProductId(result.value) - except ValueError: - _LOGGER.warning("Unknown product ID %s", result.value) + _product_id: int = -1 + for _id in self.prids.values(): + if _id == result.value: + _product_id = _id + if _product_id == -1: + LOGGER.warning("Unknown product ID %s", result.value) continue - else: - product_id = ProductId(result.value) result = await self.client.get_register(self.regmap[NodeReg.RF_ADDRESS], slave_id) if result is None or result.value is None: @@ -387,7 +409,7 @@ async def nodes(self) -> List[AiriosBoundNodeInfo]: rf_address = result.value info = AiriosBoundNodeInfo( - slave_id=slave_id, product_id=product_id, rf_address=rf_address + slave_id=slave_id, product_id=_product_id, rf_address=rf_address ) nodes.append(info) return nodes @@ -396,15 +418,14 @@ async def node(self, slave_id: int) -> AiriosNode: """Get a node instance by its Modbus slave ID.""" if slave_id == self.slave_id: - return self + return self # the bridge as node - for node in await self.nodes(): - if node.slave_id != slave_id: + for _node in await self.nodes(): + if _node.slave_id != slave_id: continue - if node.product_id == ProductId.VMD_02RPS78: - return VMD02RPS78(node.slave_id, self.client) - if node.product_id == ProductId.VMN_05LM02: - return VMN05LM02(node.slave_id, self.client) + key = str(_node.product_id) # compare to cli.py and _init_.py + LOGGER.debug("Fetch matching module for: %s", key) + return self.modules[key].Node(slave_id, self.client) raise AiriosException(f"Node {slave_id} not found") @@ -503,23 +524,40 @@ async def set_oem_code(self, code: int) -> bool: """ return await self.client.set_register(self.regmap[Reg.OEM_CODE], code, self.slave_id) - async def fetch_bridge(self) -> BRDG02R13Data: # pylint: disable=duplicate-code + async def fetch_bridge_data(self) -> BRDG02R13Data: # pylint: disable=duplicate-code """Fetch all bridge data at once.""" return BRDG02R13Data( slave_id=self.slave_id, - rf_address=await self._safe_fetch(self.node_rf_address), - product_id=await self._safe_fetch(self.node_product_id), - sw_version=await self._safe_fetch(self.node_software_version), - product_name=await self._safe_fetch(self.node_product_name), - rf_comm_status=await self._safe_fetch(self.node_rf_comm_status), - battery_status=await self._safe_fetch(self.node_battery_status), - fault_status=await self._safe_fetch(self.node_fault_status), - rf_sent_messages_last_hour=await self._safe_fetch(self.rf_sent_messages_last_hour), - rf_sent_messages_current_hour=await self._safe_fetch( - self.rf_sent_messages_current_hour - ), - rf_load_last_hour=await self._safe_fetch(self.rf_load_last_hour), - rf_load_current_hour=await self._safe_fetch(self.rf_load_current_hour), - power_on_time=await self._safe_fetch(self.power_on_time), + rf_address=await _safe_fetch(self.node_rf_address), + product_id=await _safe_fetch(self.node_product_id), + sw_version=await _safe_fetch(self.node_software_version), + product_name=await _safe_fetch(self.node_product_name), + rf_comm_status=await _safe_fetch(self.node_rf_comm_status), + battery_status=await _safe_fetch(self.node_battery_status), + fault_status=await _safe_fetch(self.node_fault_status), + rf_sent_messages_last_hour=await _safe_fetch(self.rf_sent_messages_last_hour), + rf_sent_messages_current_hour=await _safe_fetch(self.rf_sent_messages_current_hour), + rf_load_last_hour=await _safe_fetch(self.rf_load_last_hour), + rf_load_current_hour=await _safe_fetch(self.rf_load_current_hour), + power_on_time=await _safe_fetch(self.power_on_time), ) + + async def print_data(self) -> None: + """ + Print labels + states for this Bridge in CLI. + + :return: no confirmation, outputs to serial monitor + """ + res = await self.fetch_bridge_data() # customised per model + + super().print_base_data(res) + + print("BRDG-02R13 data") + print("----------------") + print(f" {'RF sent messages last hour': <40}{res['rf_sent_messages_last_hour']}") + print(f" {'RF sent messages current hour:': <40}{res['rf_sent_messages_current_hour']}") + print(f" {'RF load last hour:': <40}{res['rf_load_last_hour']}") + print(f" {'RF load current hour:': <40}{res['rf_load_current_hour']}") + print(f" {'Uptime:': <40}{res['power_on_time']}") + print("") diff --git a/src/pyairios/models/brdg_base.py b/src/pyairios/models/brdg_base.py new file mode 100644 index 0000000..ddef4f8 --- /dev/null +++ b/src/pyairios/models/brdg_base.py @@ -0,0 +1,188 @@ +"""Airios BRDG-BASE controller implementation.""" + +from __future__ import annotations + +import asyncio +import glob +import importlib.util +import logging +import os +import re +from types import ModuleType + +from pyairios.device import AiriosDevice +from pyairios.exceptions import AiriosException +from pyairios.registers import RegisterAddress + +LOGGER = logging.getLogger(__name__) + + +class Reg(RegisterAddress): + """Register set for BRDG-BASE controller node.""" + + +def pr_id() -> int: + """ + Get product_id for model BRDG- models. + Named as is to discern from node.product_id register. + :return: unique int + """ + # base class, should not be called + return 0x0 + + +def product_descr() -> str | tuple[str, ...]: + """ + Get description of product(s) using BRDG_xxxx. + Human-readable text, used in e.g. HomeAssistant Binding UI. + :return: string or tuple of strings, starting with manufacturer + """ + # base class, should not be called + return "-" + + +class BrdgBase(AiriosDevice): + """Base class for BRDG-xxx bridge nodes. + Only contains common Airios support methods, available to all bridge implementations. + """ + + modules: dict[str, ModuleType] = {} + # a dict with imported modules by model + prids: dict[str, int] = {} + # a dict with product_ids by model (replaces ProductId enum in const.py) + descriptions: dict[str, str] = {} + # a dict with label description model, for use in UI + modules_loaded: bool = False + + async def load_models(self) -> int: + """ + Analyse and import all VMx.py files from the models/ folder. + """ + if not self.modules_loaded: + loop = asyncio.get_running_loop() + # must call this async run_in_executor to prevent HA blocking call during file I/O. + modules_list = await loop.run_in_executor( + None, glob.glob, os.path.join(os.path.dirname(__file__), "*.py") + ) + # we are in models/ + check_id = [] + + for file_path in modules_list: + file_name = str(os.path.basename(file_path)) + if ( + file_name == "__init__.py" or file_name.endswith("_base.py") + # or file_name == "brdg_02r13.py" # bridges have sensors that need this info + ): # skip init and any base model definitions + continue + module_name = file_name.removesuffix(".py") + if module_name is None: + raise AiriosException(f"Failed to extract mod_name from filename {file_name}") + model_key: str = str(re.sub(r"_", "-", module_name).upper()) + if model_key is None: + raise AiriosException(f"Failed to create model_key from {module_name}") + + # using importlib, create a spec for each module: + module_spec = importlib.util.spec_from_file_location(module_name, file_path) + if module_spec is None or module_spec.loader is None: + raise AiriosException(f"Failed to load module {module_name}") + # store the spec in a dict by class name: + mod = importlib.util.module_from_spec(module_spec) + # load the module from the spec: + if mod is None: + raise AiriosException(f"Failed to load module_from_spec {module_name}") + module_spec.loader.exec_module(mod) + # store the imported module in dict: + self.modules[model_key] = mod + + # now we can use the module as if it were imported normally + # check correct loading by fetching the product_id + # (the int to check binding against) + _id = self.modules[model_key].pr_id() + # verify no duplicate product_id's + if _id in check_id: # product_id not unique among models + raise AiriosException( + f"Found duplicate product_id while collecting models:id {model_key}" + f"used by {self.modules[model_key].__name__} and by {mod.__name__}" + ) + self.prids[model_key] = _id + check_id.append(_id) # remember all added _id's to check for duplicates + self.descriptions[model_key] = self.modules[model_key].product_descr() + + LOGGER.debug("Loaded modules:") + LOGGER.debug(self.modules) # dict + LOGGER.info("Loaded product_id's:") + LOGGER.info(self.prids) # dict + LOGGER.info("Loaded products:") + LOGGER.info(self.descriptions) # dict + # all loaded up + self.modules_loaded = True + return len(self.modules) + + async def models(self) -> dict[str, ModuleType] | None: + """ + Util to fetch all supported models with their imported module class. + Must call this async run_in_executor to prevent HA blocking call during file I/O. + + :return: dict of all controller and accessory modules by key + """ + if not self.modules_loaded: + task = asyncio.create_task(self.load_models()) + await task + return self.modules + return self.modules + + async def model_descriptions(self) -> dict[str, str] | None: + """ + Util to fetch all supported model labels. + + :return: dict of all controller and accessory module labels by key + """ + if not self.modules_loaded: + task = asyncio.create_task(self.load_models()) + await task + return self.descriptions + return self.descriptions + + async def product_ids(self) -> dict[str, int] | None: + """ + Util to pick up all supported models with their productId. + + :return: dict of all controller and accessory definitions installed + """ + if not self.modules_loaded: + task = asyncio.create_task(self.load_models()) + await task + return self.prids + return self.prids + + def print_base_data(self, res) -> None: + """ + Print shared BRDG labels + states, in CLI. + + :return: no confirmation, outputs to serial monitor + """ + print("Node data") + print("---------") + print(f" {'Product ID:': <25}{res['product_id']} (0x{res['product_id']:08X})") + print(f" {'Product Name:': <25}{res['product_name']}") + print(f" {'Software version:': <25}{res['sw_version']}") + print(f" {'RF address:': <25}{res['rf_address']}") + print("") + + print("Device data") + print("---------") + print(f" {'RF comm status:': <25}{res['rf_comm_status']}") + print(f" {'Battery status:': <25}{res['battery_status']}") + print(f" {'Fault status:': <25}{res['fault_status']}") + print("") + + amount = 0 if self.modules is None else len(self.modules) + print(f"Loaded {amount} model files") + if self.modules is not None: + for key, mod in self.modules.items(): + report1 = f"{key[:3]}{':': <6}{key: <14}name: {mod.__name__: <14} descr.:" + report2 = f"{str(mod.product_descr()): <38} product_id: {mod.pr_id()}" + print(f" {report1}{report2}") + # print(f" {'ProductIDs:': <13}{self.prids}") + + print("----------------") diff --git a/src/pyairios/models/vmd_02rps78.py b/src/pyairios/models/vmd_02rps78.py index f7e6ce1..85f6c8a 100644 --- a/src/pyairios/models/vmd_02rps78.py +++ b/src/pyairios/models/vmd_02rps78.py @@ -1,9 +1,9 @@ -"""Airios VMD-02RPS78 controller implementation.""" +"""Airios VMD-02RPS78 Siber DF Optima 2 controller implementation.""" from __future__ import annotations +import logging import math -from dataclasses import dataclass from typing import List from pyairios.client import AsyncAiriosModbusClient @@ -19,9 +19,10 @@ VMDTemperature, VMDVentilationSpeed, ) -from pyairios.data_model import VMD02RPS78Data -from pyairios.device import AiriosDevice +from pyairios.data_model import AiriosDeviceData from pyairios.exceptions import AiriosInvalidArgumentException +from pyairios.models.vmd_base import VmdBase, VMDPresetFansSpeeds +from pyairios.node import _safe_fetch from pyairios.registers import ( FloatRegister, RegisterAccess, @@ -31,19 +32,11 @@ U16Register, ) +LOGGER = logging.getLogger(__name__) -@dataclass -class VMDPresetFansSpeeds: - """Preset fan speeds.""" - exhaust_fan_speed: Result[int] - """Exhaust fan speed (%)""" - supply_fan_speed: Result[int] - """Supply fan speed (%)""" - - -class Reg(RegisterAddress): - """Register set for VMD-02RPS78 controller node.""" +class Reg(RegisterAddress): # only override or add differences in VMD_BASE? + """Register set for VMD-02RPS78 Siber DF Optima 2 controller node.""" CURRENT_VENTILATION_SPEED = 41000 FAN_SPEED_EXHAUST = 41001 @@ -94,12 +87,69 @@ class Reg(RegisterAddress): FREE_VENTILATION_COOLING_OFFSET = 42015 -class VMD02RPS78(AiriosDevice): +class DeviceData(AiriosDeviceData): + """VMD-02RPS78 node data.""" + + error_code: Result[VMDErrorCode] | None + ventilation_speed: Result[VMDVentilationSpeed] | None + override_remaining_time: Result[int] | None + exhaust_fan_speed: Result[int] | None + supply_fan_speed: Result[int] | None + exhaust_fan_rpm: Result[int] | None + supply_fan_rpm: Result[int] | None + indoor_air_temperature: Result[VMDTemperature] | None + outdoor_air_temperature: Result[VMDTemperature] | None + exhaust_air_temperature: Result[VMDTemperature] | None + supply_air_temperature: Result[VMDTemperature] | None + filter_dirty: Result[int] | None + filter_remaining_percent: Result[int] | None + filter_duration_days: Result[int] | None + bypass_position: Result[VMDBypassPosition] | None + bypass_mode: Result[VMDBypassMode] | None + bypass_status: Result[int] | None + defrost: Result[int] | None + preheater: Result[VMDHeater] | None + postheater: Result[VMDHeater] | None + preheater_setpoint: Result[float] | None + free_ventilation_setpoint: Result[float] | None + free_ventilation_cooling_offset: Result[float] | None + frost_protection_preheater_setpoint: Result[float] | None + preset_high_fan_speed_supply: Result[int] | None + preset_high_fan_speed_exhaust: Result[int] | None + preset_medium_fan_speed_supply: Result[int] | None + preset_medium_fan_speed_exhaust: Result[int] | None + preset_low_fan_speed_supply: Result[int] | None + preset_low_fan_speed_exhaust: Result[int] | None + preset_standby_fan_speed_supply: Result[int] | None + preset_standby_fan_speed_exhaust: Result[int] | None + + +def pr_id() -> int: + """ + Get product_id for model VMD_02RPS78. + Named as is to discern from node.product_id register. + :return: unique int + """ + return 0x0001C892 + + +def product_descr() -> str | tuple[str, ...]: + """ + Get description of product(s) using VMD_02RPS78. + Human-readable text, used in e.g. HomeAssistant Binding UI. + :return: string or tuple of strings, starting with manufacturer + """ + return ("Siber DF Evo", "Siber DF Optima 2") + + +class Node(VmdBase): """Represents a VMD-02RPS78 controller node.""" def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: """Initialize the VMD-02RPS78 controller node instance.""" super().__init__(slave_id, client) + LOGGER.debug("Starting Siber Node(%s)", slave_id) + vmd_registers: List[RegisterBase] = [ U16Register(Reg.CURRENT_VENTILATION_SPEED, RegisterAccess.READ | RegisterAccess.STATUS), U16Register(Reg.FAN_SPEED_EXHAUST, RegisterAccess.READ | RegisterAccess.STATUS), @@ -196,9 +246,6 @@ def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: ] self._add_registers(vmd_registers) - def __str__(self) -> str: - return f"VMD-02RPS78@{self.slave_id}" - async def capabilities(self) -> Result[VMDCapabilities]: """Get the ventilation unit capabilities.""" regdesc = self.regmap[Reg.CAPABILITIES] @@ -604,64 +651,132 @@ async def set_preset_standby_fan_speed_exhaust(self, value: int) -> bool: self.regmap[Reg.FAN_SPEED_AWAY_EXHAUST], value, self.slave_id ) - async def fetch_vmd_data(self) -> VMD02RPS78Data: # pylint: disable=duplicate-code + async def fetch_node_data(self) -> DeviceData: # pylint: disable=duplicate-code """Fetch all controller data at once.""" - return VMD02RPS78Data( + return DeviceData( slave_id=self.slave_id, - rf_address=await self._safe_fetch(self.node_rf_address), - product_id=await self._safe_fetch(self.node_product_id), - sw_version=await self._safe_fetch(self.node_software_version), - product_name=await self._safe_fetch(self.node_product_name), - rf_comm_status=await self._safe_fetch(self.node_rf_comm_status), - battery_status=await self._safe_fetch(self.node_battery_status), - fault_status=await self._safe_fetch(self.node_fault_status), - bound_status=await self._safe_fetch(self.device_bound_status), - value_error_status=await self._safe_fetch(self.device_value_error_status), - error_code=await self._safe_fetch(self.error_code), - ventilation_speed=await self._safe_fetch(self.ventilation_speed), - exhaust_fan_speed=await self._safe_fetch(self.exhaust_fan_speed), - supply_fan_speed=await self._safe_fetch(self.supply_fan_speed), - exhaust_fan_rpm=await self._safe_fetch(self.exhaust_fan_rpm), - supply_fan_rpm=await self._safe_fetch(self.supply_fan_rpm), - override_remaining_time=await self._safe_fetch(self.override_remaining_time), - indoor_air_temperature=await self._safe_fetch(self.indoor_air_temperature), - outdoor_air_temperature=await self._safe_fetch(self.outdoor_air_temperature), - exhaust_air_temperature=await self._safe_fetch(self.exhaust_air_temperature), - supply_air_temperature=await self._safe_fetch(self.supply_air_temperature), - filter_dirty=await self._safe_fetch(self.filter_dirty), - filter_remaining_percent=await self._safe_fetch(self.filter_remaining), - filter_duration_days=await self._safe_fetch(self.filter_duration), - defrost=await self._safe_fetch(self.defrost), - bypass_position=await self._safe_fetch(self.bypass_position), - bypass_mode=await self._safe_fetch(self.bypass_mode), - bypass_status=await self._safe_fetch(self.bypass_status), - preheater=await self._safe_fetch(self.preheater), - postheater=await self._safe_fetch(self.postheater), - preheater_setpoint=await self._safe_fetch(self.preheater_setpoint), - free_ventilation_setpoint=await self._safe_fetch(self.free_ventilation_setpoint), - free_ventilation_cooling_offset=await self._safe_fetch( - self.free_ventilation_cooling_offset - ), - frost_protection_preheater_setpoint=await self._safe_fetch( + rf_address=await _safe_fetch(self.node_rf_address), + product_id=await _safe_fetch(self.node_product_id), + sw_version=await _safe_fetch(self.node_software_version), + product_name=await _safe_fetch(self.node_product_name), + rf_comm_status=await _safe_fetch(self.node_rf_comm_status), + battery_status=await _safe_fetch(self.node_battery_status), + fault_status=await _safe_fetch(self.node_fault_status), + bound_status=await _safe_fetch(self.device_bound_status), + value_error_status=await _safe_fetch(self.device_value_error_status), + error_code=await _safe_fetch(self.error_code), + ventilation_speed=await _safe_fetch(self.ventilation_speed), + exhaust_fan_speed=await _safe_fetch(self.exhaust_fan_speed), + supply_fan_speed=await _safe_fetch(self.supply_fan_speed), + exhaust_fan_rpm=await _safe_fetch(self.exhaust_fan_rpm), + supply_fan_rpm=await _safe_fetch(self.supply_fan_rpm), + override_remaining_time=await _safe_fetch(self.override_remaining_time), + indoor_air_temperature=await _safe_fetch(self.indoor_air_temperature), + outdoor_air_temperature=await _safe_fetch(self.outdoor_air_temperature), + exhaust_air_temperature=await _safe_fetch(self.exhaust_air_temperature), + supply_air_temperature=await _safe_fetch(self.supply_air_temperature), + filter_dirty=await _safe_fetch(self.filter_dirty), + filter_remaining_percent=await _safe_fetch(self.filter_remaining), + filter_duration_days=await _safe_fetch(self.filter_duration), + defrost=await _safe_fetch(self.defrost), + bypass_position=await _safe_fetch(self.bypass_position), + bypass_mode=await _safe_fetch(self.bypass_mode), + bypass_status=await _safe_fetch(self.bypass_status), + preheater=await _safe_fetch(self.preheater), + postheater=await _safe_fetch(self.postheater), + preheater_setpoint=await _safe_fetch(self.preheater_setpoint), + free_ventilation_setpoint=await _safe_fetch(self.free_ventilation_setpoint), + free_ventilation_cooling_offset=await _safe_fetch(self.free_ventilation_cooling_offset), + frost_protection_preheater_setpoint=await _safe_fetch( self.frost_protection_preheater_setpoint ), - preset_high_fan_speed_supply=await self._safe_fetch(self.preset_high_fan_speed_supply), - preset_high_fan_speed_exhaust=await self._safe_fetch( - self.preset_high_fan_speed_exhaust - ), - preset_medium_fan_speed_supply=await self._safe_fetch( - self.preset_medium_fan_speed_supply - ), - preset_medium_fan_speed_exhaust=await self._safe_fetch( - self.preset_medium_fan_speed_exhaust - ), - preset_low_fan_speed_supply=await self._safe_fetch(self.preset_low_fan_speed_supply), - preset_low_fan_speed_exhaust=await self._safe_fetch(self.preset_low_fan_speed_exhaust), - preset_standby_fan_speed_supply=await self._safe_fetch( - self.preset_standby_fan_speed_supply - ), - preset_standby_fan_speed_exhaust=await self._safe_fetch( + preset_high_fan_speed_supply=await _safe_fetch(self.preset_high_fan_speed_supply), + preset_high_fan_speed_exhaust=await _safe_fetch(self.preset_high_fan_speed_exhaust), + preset_medium_fan_speed_supply=await _safe_fetch(self.preset_medium_fan_speed_supply), + preset_medium_fan_speed_exhaust=await _safe_fetch(self.preset_medium_fan_speed_exhaust), + preset_low_fan_speed_supply=await _safe_fetch(self.preset_low_fan_speed_supply), + preset_low_fan_speed_exhaust=await _safe_fetch(self.preset_low_fan_speed_exhaust), + preset_standby_fan_speed_supply=await _safe_fetch(self.preset_standby_fan_speed_supply), + preset_standby_fan_speed_exhaust=await _safe_fetch( self.preset_standby_fan_speed_exhaust ), ) + + async def print_data(self) -> None: + """ + Print labels + states for this particular model, including VMD base fields, in CLI. + + :return: no confirmation, outputs to serial monitor + """ + + res = await self.fetch_node_data() # customised per model + + super().print_base_data(res) + + print("VMD-02RPS78 data") + print("----------------") + print(f" {'Error code:': <25}{res['error_code']}") + + print(f" {'Ventilation speed:': <25}{res['ventilation_speed']}") + # print(f" {'Override remaining time:': <25}{res['override_remaining_time']}") + + print( + f" {'Supply fan speed:': <25}{res['supply_fan_speed']}% " + f"({res['supply_fan_rpm']} RPM)" + ) + print( + f" {'Exhaust fan speed:': <25}{res['exhaust_fan_speed']}% " + f"({res['exhaust_fan_rpm']} RPM)" + ) + + print(f" {'Indoor temperature:': <25}{res['indoor_air_temperature']}") + print(f" {'Outdoor temperature:': <25}{res['outdoor_air_temperature']}") + print(f" {'Exhaust temperature:': <25}{res['exhaust_air_temperature']}") + print(f" {'Supply temperature:': <25}{res['supply_air_temperature']}") + + print(f" {'Filter dirty:': <25}{res['filter_dirty']}") + print(f" {'Filter remaining:': <25}{res['filter_remaining_percent']} %") + print(f" {'Filter duration:': <25}{res['filter_duration_days']} days") + + print(f" {'Bypass position:': <25}{res['bypass_position']}") + print(f" {'Bypass status:': <25}{res['bypass_status']}") + print(f" {'Bypass mode:': <25}{res['bypass_mode']}") + + print(f" {'Defrost:': <25}{res['defrost']}") + print(f" {'Preheater:': <25}{res['preheater']}") + print(f" {'Postheater:': <25}{res['postheater']}") + print("") + + print(f" {'Preset speeds':<25}{'Supply':<10}{'Exhaust':<10}") + print(f" {'-------------':<25}") + print( + f" {'High':<25}{str(res['preset_high_fan_speed_supply']) + ' %':<10}" + f"{str(res['preset_high_fan_speed_exhaust']) + ' %':<10}" + ) + print( + f" {'Mid':<25}{str(res['preset_medium_fan_speed_supply']) + ' %':<10}" + f"{str(res['preset_medium_fan_speed_exhaust']) + ' %':<10}" + ) + print( + f" {'Low':<25}{str(res['preset_low_fan_speed_supply']) + ' %':<10}" + f"{str(res['preset_low_fan_speed_exhaust']) + ' %':<10}" + ) + print( + f" {'Standby':<25}{str(res['preset_standby_fan_speed_supply']) + ' %':<10}" + f"{str(res['preset_standby_fan_speed_exhaust']) + ' %':<10}" + ) + print("") + + print(" Setpoints") + print(" ---------") + print( + f" {'Frost protection preheater setpoint:':<40}" + f"{res['frost_protection_preheater_setpoint']} ºC" + ) + print(f" {'Preheater setpoint:': <40}{res['preheater_setpoint']} ºC") + print(f" {'Free ventilation setpoint:':<40}{res['free_ventilation_setpoint']} ºC") + print( + f" {'Free ventilation cooling offset:':<40}" + f"{res['free_ventilation_cooling_offset']} K" + ) diff --git a/src/pyairios/models/vmd_07rps13.py b/src/pyairios/models/vmd_07rps13.py new file mode 100644 index 0000000..6de059c --- /dev/null +++ b/src/pyairios/models/vmd_07rps13.py @@ -0,0 +1,647 @@ +"""Airios VMD-07RPS13 ClimaRad Ventura V1 controller implementation.""" + +from __future__ import annotations + +import datetime +import logging +import math +from typing import List + +from pyairios.client import AsyncAiriosModbusClient +from pyairios.constants import ( + ValueStatusFlags, + ValueStatusSource, + VMDBypassPosition, + VMDCapabilities, + VMDErrorCode, + VMDHeater, + VMDHeaterStatus, + VMDRequestedVentilationSpeed, + VMDSensorStatus, + VMDTemperature, + VMDVentilationMode, + VMDVentilationSpeed, +) +from pyairios.data_model import AiriosDeviceData +from pyairios.exceptions import AiriosInvalidArgumentException +from pyairios.models.vmd_base import VmdBase +from pyairios.node import _safe_fetch +from pyairios.registers import ( + FloatRegister, + RegisterAccess, + RegisterAddress, + RegisterBase, + Result, + ResultStatus, + U8Register, + U16Register, +) + +LOGGER = logging.getLogger(__name__) + + +# Linking the registers: +# Reg: +# model set of RF Bridge register addresses, by named address keyword +# Data: +# model dict of register address + Result type by name (formerly in data_model.py) +# Node.vmd_registers: +# instance list of register type (size) + name key from Reg + access R/W + + +class Reg(RegisterAddress): # only override or add differences in VMD_BASE + """The Register set for VMD-07RPS13 ClimaRad Ventura V1 controller node.""" + + # numbers between "#" are from oem docs + # R + BYPASS_POSITION = 41015 # 28 # 1, RO, uint8, "Bypass Position" + CO2_LEVEL = 41008 # 17 # 1, RO, uint16, "Main Room Exhaust CO2 Level" + ERROR_CODE = 41032 # 23 # 1, RO, uint8, "Error Code" + FAN_SPEED_EXHAUST = 41019 # 9 # 1, RO, uint8, "Main Room Fan speed Exhaust" + FAN_SPEED_SUPPLY = 41020 # 10 # 1, RO, uint8 "Main Room Fan speed Inlet" + FILTER_DIRTY = 41017 # 1, RO, uint8, "Filter Dirty" + FILTER_DURATION = 41029 # 1, RO, uint16, "Air Filter Time Duration" + FILTER_REMAINING_DAYS = 41028 # 1, RO, uint16, "Air Filter Time Remaining" + FILTER_REMAINING_PERCENT = 41030 # 1, RO, uint8, "Air Filter Time Percent" + FLOW_INLET = 41024 # L=2, RO, float, "Inlet Flow level" + FLOW_OUTLET = 41026 # L=2, RO, float, "Outlet Flow level" + HUMIDITY_INDOOR = 41007 # 15 # 1, RO, uint8, "Main Room Exhaust Humidity Level" + HUMIDITY_OUTDOOR = 41002 # 1, RO, uint8, "Outlet Humidity" + INLET_FLOW = 41024 # 2, RO, float, "Inlet Flow level" + OUTLET_FLOW = 41026 # 2, RO, float, "Outlet Flow level" + POST_HEATER_DEMAND = 41023 # 1, RO, uint8, "Postheater Demand" + TEMPERATURE_EXHAUST = 41005 # 12 # L=2, RO, float, "Main Room Exhaust Temperature" + TEMPERATURE_INLET = 41003 # L=2, RO, float, "Inlet Temperature" + TEMPERATURE_OUTLET = 41000 # L=2, RO, float, "Outlet Temperature" + TEMP_VENT_MODE = 41103 # 1,R, uint8, "Main Room Ventilation Mode" + TEMP_VENT_SUB_MODE = 41104 # 1, R, uint8, "Main Room Ventilation Sub Mode" + VENT_MODE = 41100 # 1, R, uint8, "Main Room Ventilation Mode" + VENT_SUB_MODE = 41101 # 1, R, uint8, "Main Room Ventilation Sub Mode" + VENT_SUB_MODE_EXH = 41102 # R, uint8, Main Room Vent. Sub Mode Cont. Exh. + # R/W + BASIC_VENT_ENABLE = 42000 # 1, RW, uint8, "Basic Ventilation Enable" + BASIC_VENT_LEVEL = 42001 # 1, RW, uint8, "Basic Ventilation Level" + CO2_CONTROL_SETPOINT = 42011 # 1, RW, uint16, "CO2 Control Setpoint" + FILTER_RESET = 41151 # 1, RW, uint8, "Reset Air Filter Timer" + OVERRIDE_TIME_MANUAL = 42009 # 115 # RW, uint16 "Temporary Override Duration" + PRODUCT_VARIANT = 42010 # 1,RW, uint8, "Product Variant" = 0? + REQ_TEMP_VENT_MODE = 41123 # 1,RW, uint8, "Main Room Temp.Ventilation Mode" + REQ_TEMP_VENT_SUB_MODE = 41124 # 1, RW, uint8, "Main Room Vent. Sub Mode" + # REQ_TEMP_VENT_SUB_MODE_EXH = (41125 # RW, uint8, Main Room Temp. Vent. Sub Mode Cont. Exh.) + REQ_VENT_MODE = 41120 # 1,RW, uint8, "Main Room Ventilation Mode" + REQ_VENT_SUB_MODE = 41121 # 1, RW, uint8, "Main Room Ventilation Sub Mode" + # REQ_VENT_SUB_MODE_EXH = 41122 # 1, R, uint8, "Main Room Vent. Sub Mode Cont. Exh." + # ROOM_INSTANCE = 41126 # 1, RW, uint8, "Room instance" send before setting REQ_xxx + SET_PERSON_PRESENT = 41150 # 1, RW, uint8, "Set Person Present" + SYSTEM_VENT_CONFIG = 42021 # 1, RW, uint8, "System Ventilation Configuration" + + +class DeviceData(AiriosDeviceData): + """ + VMD-07RPS13 ClimaRad Ventura V1C/V1D/V1X node data. + source: ClimaRad Modbus Registers Specs 2024 + """ + + basic_ventilation_enable: Result[int] | None + bypass_position: Result[VMDBypassPosition] | None + co2_control_setpoint: Result[int] | None + co2_level: Result[int] | None + error_code: Result[VMDErrorCode] | None + exhaust_air_temperature: Result[VMDTemperature] | None + exhaust_fan_speed: Result[int] | None + filter_dirty: Result[int] | None + filter_remaining_days: Result[int] | None + filter_remaining_percent: Result[int] | None + indoor_air_temperature: Result[VMDTemperature] | None + product_variant: Result[int] | None + supply_air_temperature: Result[VMDTemperature] | None + supply_fan_speed: Result[int] | None + system_ventilation_config: Result[int] | None + temp_ventilation_mode: Result[int] | None + temp_ventilation_sub_mode: Result[int] | None + ventilation_mode: Result[int] | None + ventilation_sub_mode: Result[int] | None + ventilation_speed: Result[int] | None # required for HA fan + + +def pr_id() -> int: + """ + Get product_id for model VMD_07RPS13. + Named as is to discern from node.product_id register. + :return: unique int + """ + return 0x0001C883 + + +def product_descr() -> str | tuple[str, ...]: + """ + Get description of product(s) using VMD_07RPS13. + Human-readable text, used in e.g. HomeAssistant Binding UI. + :return: string or tuple of strings, starting with manufacturer + """ + return "ClimaRad Ventura V1" + + +class Node(VmdBase): + """Represents a VMD-07RPS13 Ventura V1 controller node. HACK in client to access WRITE""" + + def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: + """Initialize the VMD-07RPS13 Ventura controller node instance.""" + super().__init__(slave_id, client) + LOGGER.debug("Starting Ventura Node(%s)", slave_id) + + vmd_registers: List[RegisterBase] = [ + FloatRegister(Reg.TEMPERATURE_EXHAUST, (RegisterAccess.READ | RegisterAccess.STATUS)), + FloatRegister(Reg.TEMPERATURE_INLET, (RegisterAccess.READ | RegisterAccess.STATUS)), + FloatRegister(Reg.TEMPERATURE_OUTLET, (RegisterAccess.READ | RegisterAccess.STATUS)), + # FloatRegister(Reg.TEMPERATURE_SUPPLY, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register( + Reg.BASIC_VENT_ENABLE, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), + U8Register( + Reg.BASIC_VENT_LEVEL, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), + U8Register(Reg.BYPASS_POSITION, (RegisterAccess.READ | RegisterAccess.STATUS)), + U16Register(Reg.CO2_CONTROL_SETPOINT, (RegisterAccess.READ | RegisterAccess.WRITE)), + U16Register(Reg.CO2_LEVEL, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.ERROR_CODE, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.FAN_SPEED_EXHAUST, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.FAN_SPEED_SUPPLY, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.FILTER_DIRTY, (RegisterAccess.READ | RegisterAccess.STATUS)), + # U16Register(Reg.FILTER_DURATION, (RegisterAccess.READ | RegisterAccess.STATUS)), + U16Register(Reg.FILTER_REMAINING_DAYS, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.FILTER_REMAINING_PERCENT, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.FILTER_RESET, (RegisterAccess.WRITE | RegisterAccess.STATUS)), + U8Register(Reg.HUMIDITY_INDOOR, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.HUMIDITY_OUTDOOR, (RegisterAccess.READ | RegisterAccess.STATUS)), + U16Register(Reg.OVERRIDE_TIME_MANUAL, (RegisterAccess.READ | RegisterAccess.WRITE)), + U8Register(Reg.POST_HEATER_DEMAND, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register( + Reg.PRODUCT_VARIANT, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), # UINT8? + U8Register( + Reg.REQ_TEMP_VENT_MODE, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), + U8Register( + Reg.REQ_TEMP_VENT_SUB_MODE, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), + U8Register( + Reg.REQ_VENT_MODE, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), + U8Register( + Reg.REQ_VENT_SUB_MODE, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), + # U8Register(Reg.ROOM_INSTANCE, + # (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS)), + U8Register( + Reg.SYSTEM_VENT_CONFIG, + (RegisterAccess.READ | RegisterAccess.WRITE | RegisterAccess.STATUS), + ), + U8Register(Reg.TEMP_VENT_MODE, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.TEMP_VENT_SUB_MODE, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.VENT_MODE, (RegisterAccess.READ | RegisterAccess.STATUS)), + U8Register(Reg.VENT_SUB_MODE, (RegisterAccess.READ | RegisterAccess.STATUS)), + ] + self._add_registers(vmd_registers) + + # getters and setters + + async def capabilities(self) -> Result[VMDCapabilities] | None: + """Get the ventilation unit capabilities. + Capabilities register not supported on VMD-07RPS13, so must simulate""" + # Ventura capabilities: + _caps = VMDCapabilities.NO_CAPABLE # | VMDCapabilities.AUTO_MODE_CAPABLE + return Result( + _caps, + ResultStatus( + datetime.timedelta(1000), ValueStatusSource.UNKNOWN, ValueStatusFlags.VALID + ), + ) + + async def system_ventilation_configuration(self) -> Result[int]: + """Get the system ventilation configuration status.""" + return await self.client.get_register(self.regmap[Reg.SYSTEM_VENT_CONFIG], self.slave_id) + + async def vent_mode(self) -> Result[int]: + """Get the ventilation mode status. 0=Off, 1=Pause, 2=On, 3=Man1, 5=Man3, 8=Service""" + # seen: 0 (with temp_vent_mode 3) | 1 = Pause | 2 = On (with vent_sub_mode 48) + return await self.client.get_register(self.regmap[Reg.VENT_MODE], self.slave_id) + + async def rq_vent_mode(self) -> Result[int]: + """Get the ventilation mode status. 0=Off, 2=On, 3=Man1, 5=Man3, 8=Service""" + return await self.client.get_register(self.regmap[Reg.REQ_VENT_MODE], self.slave_id) + + async def set_rq_vent_mode(self, mode: int) -> bool: # : VMDVentilationMode) -> bool: + """Set the ventilation mode. 0=Off, 1=Pause, 2=On, 3=Man1, 5=Man3, 8=Service""" + if mode == VMDVentilationMode.UNKNOWN: + raise AiriosInvalidArgumentException(f"Invalid ventilation mode {mode}") + return await self.client.set_register(self.regmap[Reg.REQ_VENT_MODE], mode, self.slave_id) + + async def rq_vent_sub_mode(self) -> Result[int]: + """Get the ventilation mode status. 0=Off/Pause, 2=On, 3=Man1, 5=Man3, 8=Service""" + return await self.client.get_register(self.regmap[Reg.REQ_VENT_SUB_MODE], self.slave_id) + + async def set_rq_vent_sub_mode(self, mode: int) -> bool: # : VMDVentilationMode) -> bool: + """Set the ventilation mode. 0=Off/Pause, 48=Auto""" + if mode == VMDVentilationMode.UNKNOWN: + raise AiriosInvalidArgumentException(f"Invalid ventilation mode {mode}") + return await self.client.set_register( + self.regmap[Reg.REQ_VENT_SUB_MODE], mode, self.slave_id + ) + + async def rq_temp_vent_mode(self) -> Result[int]: + """Get the temp ventilation mode status. 0=Off, 1=Pause, 2=On, 3=Man1, 5=Man3, 8=Service""" + return await self.client.get_register(self.regmap[Reg.REQ_TEMP_VENT_MODE], self.slave_id) + + async def set_rq_temp_vent_mode(self, mode: int) -> bool: # : VMDVentilationMode) -> bool: + """Set the temp ventilation mode. 0=Off, 1=Pause, 2=On, 3=Man1, 5=Man3, 8=Service""" + if mode == VMDVentilationMode.UNKNOWN: + raise AiriosInvalidArgumentException(f"Invalid ventilation mode {mode}") + return await self.client.set_register( + self.regmap[Reg.REQ_TEMP_VENT_MODE], mode, self.slave_id + ) + + async def rq_temp_vent_sub_mode(self) -> Result[int]: + """Get the temp ventilation sub mode status. 0=Off/Pause, 201, ...""" + return await self.client.get_register( + self.regmap[Reg.REQ_TEMP_VENT_SUB_MODE], self.slave_id + ) + + async def set_rq_temp_vent_sub_mode(self, mode: int) -> bool: # : VMDVentilationMode) -> bool: + """Set the temp ventilation sub mode. 0=Off/Pause, 201, ...""" + if mode == VMDVentilationMode.UNKNOWN: + raise AiriosInvalidArgumentException(f"Invalid ventilation mode {mode}") + return await self.client.set_register( + self.regmap[Reg.REQ_TEMP_VENT_SUB_MODE], mode, self.slave_id + ) + + async def vent_sub_mode(self) -> Result[int]: + """Get the ventilation sub mode status.""" + # seen: 0 | 48 + return await self.client.get_register(self.regmap[Reg.VENT_SUB_MODE], self.slave_id) + + async def temp_vent_mode(self) -> Result[int]: + """Get the temporary ventilation mode status.""" + # seen: 0 (with Ventilation mode != 0) | 3 + return await self.client.get_register(self.regmap[Reg.TEMP_VENT_MODE], self.slave_id) + + async def temp_vent_sub_mode(self) -> Result[int]: + """Get the temporary ventilation sub mode status.""" + # seen: 0 (with temp_vent_mode 3) | 201 | 202 | .. | 205 + return await self.client.get_register(self.regmap[Reg.TEMP_VENT_SUB_MODE], self.slave_id) + + # can't access these 2 entries in register dump + # async def room_instance(self) -> Result[int]: + # """Get the room_instance: 1 = Main or 2 = Secondary""" + # return await self.client.get_register(self.regmap[Reg.ROOM_INSTANCE], self.slave_id) + # + # async def set_room_instance(self, mode: int) -> bool: + # """Set the room_instance: 1 = Main or 2 = Secondary""" + # return await self.client.set_register( + # self.regmap[Reg.ROOM_INSTANCE], mode, self.slave_id + # ) + + async def bypass_position(self) -> Result[VMDBypassPosition]: + """Get the bypass position.""" + regdesc = self.regmap[Reg.BYPASS_POSITION] + result = await self.client.get_register(regdesc, self.slave_id) + error = result.value > 120 + return Result(VMDBypassPosition(result.value, error), result.status) + + async def basic_vent_enable(self) -> Result[int]: + """Get base ventilation enabled.""" + return await self.client.get_register(self.regmap[Reg.BASIC_VENT_ENABLE], self.slave_id) + + async def set_basic_vent_enable(self, mode: int) -> bool: + """Set base ventilation enabled=1/disabled=0.""" + return await self.client.set_register( + self.regmap[Reg.BASIC_VENT_ENABLE], mode, self.slave_id + ) + + async def basic_vent_level(self) -> Result[int]: + """Get base ventilation level.""" + return await self.client.get_register(self.regmap[Reg.BASIC_VENT_LEVEL], self.slave_id) + + async def set_basic_vent_level(self, level: int) -> bool: + """Set the base ventilation level.""" + return await self.client.set_register( + self.regmap[Reg.BASIC_VENT_LEVEL], level, self.slave_id + ) + + async def ventilation_speed(self) -> Result[VMDVentilationSpeed]: + """Get the ventilation unit active speed preset.""" + + # Automatic: + # Ventilation mode: 2 + # Ventilation sub mode: 48 + # Temp. Ventil. mode: 0 + # Temp. Ventil. sub mode: 0 + # + # Pause: + # Ventilation mode: 1 + # Ventilation sub mode: 0 + # Temp. Ventil. mode: 1 + # Temp. Ventil. sub mode: 0 + # + # Manual/temp 1: + # Ventilation mode: 0 + # Ventilation sub mode: 0 + # Temp. Ventil. mode: 3 + # Temp. Ventil. sub mode: 201 + # + # Manual/temp 2: + # Ventilation mode: 0 + # Ventilation sub mode: 0 + # Temp. Ventil. mode: 3 + # Temp. Ventil. sub mode: 202 + # + # Ventilation mode: 0 + # Ventilation sub mode: 0 + # Temp. Ventil. mode: 3 + # Temp. Ventil. sub mode: 203 + # + # Stand 5 == Boost >> + # Ventilation mode: 0 + # Ventilation sub mode: 0 + # Temp. Ventil. mode: 3 + # Temp. Ventil. sub mode: 205 + + mode = await self.client.get_register(self.regmap[Reg.VENT_MODE], self.slave_id) + man_step = await self.client.get_register( + self.regmap[Reg.TEMP_VENT_SUB_MODE], self.slave_id + ) + speed = VMDVentilationSpeed.OFF + if mode.value == 2: + speed = VMDVentilationSpeed.AUTO + elif mode.value == 1: + speed = VMDVentilationSpeed.AWAY + elif mode.value == 0: + if man_step.value <= 202: + speed = VMDVentilationSpeed.OVERRIDE_LOW + elif man_step.value <= 203: + speed = VMDVentilationSpeed.OVERRIDE_MID + elif man_step.value <= 205: + speed = VMDVentilationSpeed.OVERRIDE_HIGH + return Result(speed, mode.status) + + async def set_ventilation_speed(self, speed: VMDRequestedVentilationSpeed) -> bool: + """Set the ventilation unit speed (temp 8H) preset.""" + md = 0 # VMDVentilationSpeed.OFF, PAUSE? + if speed == VMDRequestedVentilationSpeed.AUTO: + md = 0 + elif speed == VMDRequestedVentilationSpeed.AWAY: + md = 0 + elif speed == VMDRequestedVentilationSpeed.LOW: + md = 202 + elif speed == VMDRequestedVentilationSpeed.MID: + md = 203 + elif speed == VMDRequestedVentilationSpeed.HIGH: + md = 205 + + return await self.client.set_register( # check why no immediate change + self.regmap[Reg.REQ_VENT_SUB_MODE], md, self.slave_id + ) + + async def product_variant(self) -> Result[int]: + """Get the product variant.""" + regdesc = self.regmap[Reg.PRODUCT_VARIANT] + return await self.client.get_register(regdesc, self.slave_id) + + async def co2_level(self) -> Result[int]: + """Get the CO2 level (in ppm).""" + regdesc = self.regmap[Reg.CO2_LEVEL] + return await self.client.get_register(regdesc, self.slave_id) + + async def co2_setpoint(self) -> Result[int]: + """Get the CO2 control setpoint (in ppm).""" + regdesc = self.regmap[Reg.CO2_CONTROL_SETPOINT] + return await self.client.get_register(regdesc, self.slave_id) + + async def set_co2_setpoint(self, setpnt: int) -> bool: + """Set the CO2 control setpoint (in ppm).""" + return await self.client.set_register( + self.regmap[Reg.CO2_CONTROL_SETPOINT], setpnt, self.slave_id + ) + + async def filter_duration(self) -> Result[int]: + """Get the filter duration (in days).""" + return await self.client.get_register(self.regmap[Reg.FILTER_DURATION], self.slave_id) + + async def filter_remaining_days(self) -> Result[int]: + """Get the filter remaining lifetime (in days).""" + return await self.client.get_register(self.regmap[Reg.FILTER_REMAINING_DAYS], self.slave_id) + + async def filter_remaining_percent(self) -> Result[int]: + """Get the filter remaining lifetime (in %).""" + return await self.client.get_register( + self.regmap[Reg.FILTER_REMAINING_PERCENT], self.slave_id + ) + + async def filter_reset(self) -> bool: + """Reset the filter dirty status.""" + return await self.client.set_register(self.regmap[Reg.FILTER_RESET], 0, self.slave_id) + + async def filter_dirty(self) -> Result[int]: + """Get the filter dirty status.""" + return await self.client.get_register(self.regmap[Reg.FILTER_DIRTY], self.slave_id) + + async def error_code(self) -> Result[VMDErrorCode]: + """Get the ventilation unit error code.""" + regdesc = self.regmap[Reg.ERROR_CODE] + result = await self.client.get_register(regdesc, self.slave_id) + return Result(VMDErrorCode(result.value), result.status) + + async def indoor_humidity(self) -> Result[int]: + """Get the indoor humidity (%)""" + return await self.client.get_register(self.regmap[Reg.HUMIDITY_INDOOR], self.slave_id) + + async def outdoor_humidity(self) -> Result[int]: + """Get the outdoor humidity (%)""" + return await self.client.get_register(self.regmap[Reg.HUMIDITY_OUTDOOR], self.slave_id) + + async def exhaust_fan_speed(self) -> Result[int]: + """Get the exhaust fan speed (%)""" + return await self.client.get_register(self.regmap[Reg.FAN_SPEED_EXHAUST], self.slave_id) + + async def supply_fan_speed(self) -> Result[int]: + """Get the supply fan speed (%)""" + return await self.client.get_register(self.regmap[Reg.FAN_SPEED_SUPPLY], self.slave_id) + + async def indoor_air_temperature(self) -> Result[VMDTemperature]: + """Get the indoor air temperature. + + This is exhaust temperature before the heat exchanger. + """ + regdesc = self.regmap[Reg.TEMPERATURE_EXHAUST] + result = await self.client.get_register(regdesc, self.slave_id) + if math.isnan(result.value): + status = VMDSensorStatus.UNAVAILABLE + elif result.value < -273.0: + status = VMDSensorStatus.ERROR + else: + status = VMDSensorStatus.OK + return Result(VMDTemperature(round(result.value, 2), status), result.status) + + async def exhaust_air_temperature(self) -> Result[VMDTemperature]: + """Get the exhaust air temperature. + + This is the exhaust temperature after the heat exchanger. + """ + regdesc = self.regmap[Reg.TEMPERATURE_OUTLET] + result = await self.client.get_register(regdesc, self.slave_id) + if math.isnan(result.value): + status = VMDSensorStatus.UNAVAILABLE + elif result.value < -273.0: + status = VMDSensorStatus.ERROR + else: + status = VMDSensorStatus.OK + return Result(VMDTemperature(round(result.value, 2), status), result.status) + + async def supply_air_temperature(self) -> Result[VMDTemperature]: + """Get the supply air temperature. + + This is the supply temperature after the heat exchanger. + """ + regdesc = self.regmap[Reg.TEMPERATURE_INLET] + result = await self.client.get_register(regdesc, self.slave_id) + if math.isnan(result.value): + status = VMDSensorStatus.UNAVAILABLE + elif result.value < -273.0: + status = VMDSensorStatus.ERROR + else: + status = VMDSensorStatus.OK + return Result(VMDTemperature(round(result.value, 2), status), result.status) + + async def postheater(self) -> Result[VMDHeater]: + """Get the postheater level.""" + regdesc = self.regmap[Reg.POST_HEATER_DEMAND] + result = await self.client.get_register(regdesc, self.slave_id) + status = VMDHeaterStatus.UNAVAILABLE if result.value == 0xEF else VMDHeaterStatus.OK + return Result(VMDHeater(result.value, status), result.status) + + async def fetch_node_data(self) -> DeviceData: # pylint: disable=duplicate-code + """Fetch all controller data at once.""" + + return DeviceData( + slave_id=self.slave_id, + # node data from pyairios node + rf_address=await _safe_fetch(self.node_rf_address), + product_id=await _safe_fetch(self.node_product_id), + sw_version=await _safe_fetch(self.node_software_version), + product_name=await _safe_fetch(self.node_product_name), + # device data + rf_comm_status=await _safe_fetch(self.node_rf_comm_status), + battery_status=await _safe_fetch(self.node_battery_status), + fault_status=await _safe_fetch(self.node_fault_status), + bound_status=await _safe_fetch(self.device_bound_status), + value_error_status=await _safe_fetch(self.device_value_error_status), + # VMD-07RPS13 data + product_variant=await _safe_fetch(self.product_variant), + error_code=await _safe_fetch(self.error_code), + system_ventilation_config=await _safe_fetch(self.system_ventilation_configuration), + ventilation_mode=await _safe_fetch(self.vent_mode), + ventilation_sub_mode=await _safe_fetch(self.vent_sub_mode), + # ventilation_sub_mode_exh=await _safe_fetch(self.vent_sub_mode_exh), # failed + temp_ventilation_mode=await _safe_fetch(self.temp_vent_mode), + temp_ventilation_sub_mode=await _safe_fetch(self.temp_vent_sub_mode), + # temp_ventilation_sub_mode_exh=await _safe_fetch(self.temp_vent_sub_mode_exh), # failed + exhaust_fan_speed=await _safe_fetch(self.exhaust_fan_speed), + supply_fan_speed=await _safe_fetch(self.supply_fan_speed), + indoor_air_temperature=await _safe_fetch(self.indoor_air_temperature), + exhaust_air_temperature=await _safe_fetch(self.exhaust_air_temperature), + supply_air_temperature=await _safe_fetch(self.supply_air_temperature), + filter_dirty=await _safe_fetch(self.filter_dirty), + filter_remaining_days=await _safe_fetch(self.filter_remaining_days), + filter_remaining_percent=await _safe_fetch(self.filter_remaining_percent), + bypass_position=await _safe_fetch(self.bypass_position), + basic_ventilation_enable=await _safe_fetch(self.basic_vent_enable), + co2_level=await _safe_fetch(self.co2_level), + co2_control_setpoint=await _safe_fetch(self.co2_setpoint), + ventilation_speed=await _safe_fetch(self.ventilation_speed), + ) + + async def print_data(self) -> None: + """ + Print labels + states for this particular model, including VMD base fields, in CLI. + + :return: no confirmation, outputs to serial monitor + """ + + res = await self.fetch_node_data() # customised per model + + super().print_base_data(res) + + print("VMD-07RPS13 data") + print("----------------") + print(f" {'Product Variant:': <25}{res['product_variant']}") + print(f" {'Error code:': <25}{res['error_code']}") + print("") + print(f" {'Ventilation mode:': <25}{res['ventilation_mode']}") + print(f" {'Ventilation sub mode:': <25}{res['ventilation_sub_mode']}") + print(f" {'Temp. Ventil. mode:': <25}{res['temp_ventilation_mode']}") + print(f" {'Temp. Ventil. sub mode:': <25}{res['temp_ventilation_sub_mode']}") + # + print( + f" {'Supply fan speed:': <25}{res['supply_fan_speed']}% " + # f"({res['supply_fan_rpm']} RPM)" + ) + print( + f" {'Exhaust fan speed:': <25}{res['exhaust_fan_speed']}% " + # f"({res['exhaust_fan_rpm']} RPM)" + ) + + print(f" {'Indoor temperature:': <25}{res['indoor_air_temperature']}") + # print(f" {'Outdoor temperature:': <25}{res['outdoor_air_temperature']}") + print(f" {'Exhaust temperature:': <25}{res['exhaust_air_temperature']}") + print(f" {'Supply temperature:': <25}{res['supply_air_temperature']}") + + print(f" {'CO2 level:':<40}{res['co2_level']} ppm") + + print(f" {'Filter dirty:': <25}{res['filter_dirty']}") + print(f" {'Filter remaining days:': <25}{res['filter_remaining_days']} days") + print(f" {'Filter remaining perc.:': <25}{res['filter_remaining_percent']}%") + + print( + f" {'Bypass position:': <25}{'Open ' if res == 1 else 'Closed '}{ + res['bypass_position'] + }" + ) + print(f" {'Base ventil. enabled:': <25}{res['basic_ventilation_enable']}") + + # print(f" {'Postheater:': <25}{res['postheater']}") + # print("") + + # print(f" {'Preset speeds':<25}{'Supply':<10}{'Exhaust':<10}") + # print(f" {'-------------':<25}") + # print( + # f" {'High':<25}{str(res['preset_high_fan_speed_supply']) + ' %':<10}" + # f"{str(res['preset_high_fan_speed_exhaust']) + ' %':<10}" + # ) + # print( + # f" {'Mid':<25}{str(res['preset_medium_fan_speed_supply']) + ' %':<10}" + # f"{str(res['preset_medium_fan_speed_exhaust']) + ' %':<10}" + # ) + # print( + # f" {'Low':<25}{str(res['preset_low_fan_speed_supply']) + ' %':<10}" + # f"{str(res['preset_low_fan_speed_exhaust']) + ' %':<10}" + # ) + # print( + # f" {'Standby':<25}{str(res['preset_standby_fan_speed_supply']) + ' %':<10}" + # f"{str(res['preset_standby_fan_speed_exhaust']) + ' %':<10}" + # ) + print("") + + print(" Setpoints") + print(" ---------") + print(f" {'CO2 control setpoint:':<40}{res['co2_control_setpoint']} ppm") + # print( + # f" {'Free ventilation cooling offset:':<40}" + # f"{res['free_ventilation_cooling_offset']} K" + # ) diff --git a/src/pyairios/models/vmd_base.py b/src/pyairios/models/vmd_base.py new file mode 100644 index 0000000..ccc1068 --- /dev/null +++ b/src/pyairios/models/vmd_base.py @@ -0,0 +1,107 @@ +"""Airios VMD-BASE controller implementation.""" + +from __future__ import annotations + +import datetime +from dataclasses import dataclass, field + +from pyairios.constants import ValueStatusFlags, ValueStatusSource, VMDCapabilities +from pyairios.device import AiriosDevice +from pyairios.registers import ( + RegisterAddress, + Result, + ResultStatus, +) + + +@dataclass +class VMDPresetFansSpeeds: + """Preset fan speeds.""" + + # this must load from vmd_base to prevent None error + exhaust_fan_speed: Result[int] = field() + """Exhaust fan speed (%)""" + supply_fan_speed: Result[int] = field() + """Supply fan speed (%)""" + + def __post_init__(self): + if self.exhaust_fan_speed is None: + self.exhaust_fan_speed = Result(-1) + if self.supply_fan_speed is None: + self.supply_fan_speed = Result(-1) + + +class Reg(RegisterAddress): + """Register set for VMD-BASE controller node.""" + + +def pr_id() -> int: + """ + Get product_id for model VMD- models. + Named as is to discern from node.product_id register. + :return: unique int + """ + # base class, should not be called + return 0x0 + + +def product_descr() -> str | tuple[str, ...]: + """ + Get description of product(s) using VMD_xxxx. + Human-readable text, used in e.g. HomeAssistant Binding UI. + :return: string or tuple of strings, starting with manufacturer + """ + # base class, should not be called + return "-" + + +class VmdBase(AiriosDevice): + """Base class for VMD-xxx controller nodes.""" + + # no VMD-common registers found, leave here as example for new models that do + # def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: + # """Initialize the VMD-x controller node instance.""" + # super().__init__(slave_id, client) + + # vmd_registers: List[RegisterBase] = [ + # U16Register(Reg.CURRENT_VENTILATION_SPEED, RegisterAccess.READ_STATUS), + # ... + # ] + # self._add_registers(vmd_registers) + + async def capabilities(self) -> Result[VMDCapabilities] | None: + """Get the ventilation unit capabilities. + If Capabilities register not supported on model, must simulate""" + # not all fans support capabilities register call, must return basics + _caps = VMDCapabilities.NO_CAPABLE + return Result( + _caps, + ResultStatus( + datetime.timedelta(1000), ValueStatusSource.UNKNOWN, ValueStatusFlags.VALID + ), + ) + + def print_base_data(self, res) -> None: + """ + Print shared VMD labels + states, in CLI. + + :return: no confirmation, outputs to serial monitor + """ + print("Node data") + print("---------") + print(f" {'Product ID:': <25}{res['product_id']} (0x{res['product_id']:08X})") + print(f" {'Product Name:': <25}{res['product_name']}") + print(f" {'Software version:': <25}{res['sw_version']}") + print(f" {'RF address:': <25}{res['rf_address']}") + print("") + + print("Device data") + print("---------") + print(f" {'RF comm status:': <25}{res['rf_comm_status']}") + print(f" {'Battery status:': <25}{res['battery_status']}") + print(f" {'Fault status:': <25}{res['fault_status']}") + print(f" {'Bound status:': <25}{res['bound_status']}") + print(f" {'Value error status:': <25}{res['value_error_status']}") + print("") + + print("----------------") diff --git a/src/pyairios/models/vmn_05lm02.py b/src/pyairios/models/vmn_05lm02.py index 2fbd00f..5607a82 100644 --- a/src/pyairios/models/vmn_05lm02.py +++ b/src/pyairios/models/vmn_05lm02.py @@ -2,12 +2,15 @@ from __future__ import annotations +import logging +import re from typing import List from pyairios.client import AsyncAiriosModbusClient from pyairios.constants import VMDRequestedVentilationSpeed -from pyairios.data_model import VMN05LM02Data +from pyairios.data_model import AiriosDeviceData from pyairios.device import AiriosDevice +from pyairios.node import _safe_fetch from pyairios.registers import ( RegisterAccess, RegisterAddress, @@ -16,6 +19,8 @@ U16Register, ) +LOGGER = logging.getLogger(__name__) + class Reg(RegisterAddress): """Register set for VMN-05LM02 remote node.""" @@ -23,12 +28,37 @@ class Reg(RegisterAddress): REQUESTED_VENTILATION_SPEED = 41000 -class VMN05LM02(AiriosDevice): - """Represents a VMN-05LM02 remote node.""" +class DeviceData(AiriosDeviceData): + """VMN-05LM02 remote node data.""" + + requested_ventilation_speed: Result[VMDRequestedVentilationSpeed] | None + + +def pr_id() -> int: + """ + Get product_id for model VMN_05LM02. + Named as is to discern from node.product_id register. + :return: unique int + """ + return 0x0001C83E + + +def product_descr() -> str | tuple[str, ...]: + """ + Get description of product(s) using VMN_05LM02. + Human-readable text, used in e.g. HomeAssistant Binding UI. + :return: string or tuple of strings, starting with manufacturer + """ + return "Siber 4 button Remote" + + +class Node(AiriosDevice): + """Represents a VMN-05LM02 Siber 4 button remote node.""" def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: """Initialize the VMN-05LM02 node instance.""" super().__init__(slave_id, client) + LOGGER.debug("Starting Siber Remote Node(%s)", slave_id) vmn_registers: List[RegisterBase] = [ U16Register( Reg.REQUESTED_VENTILATION_SPEED, RegisterAccess.READ | RegisterAccess.STATUS @@ -37,7 +67,8 @@ def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: self._add_registers(vmn_registers) def __str__(self) -> str: - return f"VMN-05LM02@{self.slave_id}" + prompt = str(re.sub(r"_", "-", self.__module__.upper())) + return f"{prompt}@{self.slave_id}" async def requested_ventilation_speed(self) -> Result[VMDRequestedVentilationSpeed]: """Get the requested ventilation speed.""" @@ -45,19 +76,49 @@ async def requested_ventilation_speed(self) -> Result[VMDRequestedVentilationSpe result = await self.client.get_register(regdesc, self.slave_id) return Result(VMDRequestedVentilationSpeed(result.value), result.status) - async def fetch_vmn_data(self) -> VMN05LM02Data: # pylint: disable=duplicate-code + async def fetch_node_data(self) -> DeviceData: # pylint: disable=duplicate-code """Get the node device data at once.""" - return VMN05LM02Data( + return DeviceData( slave_id=self.slave_id, - rf_address=await self._safe_fetch(self.node_rf_address), - product_id=await self._safe_fetch(self.node_product_id), - sw_version=await self._safe_fetch(self.node_software_version), - product_name=await self._safe_fetch(self.node_product_name), - rf_comm_status=await self._safe_fetch(self.node_rf_comm_status), - battery_status=await self._safe_fetch(self.node_battery_status), - fault_status=await self._safe_fetch(self.node_fault_status), - bound_status=await self._safe_fetch(self.device_bound_status), - value_error_status=await self._safe_fetch(self.device_value_error_status), - requested_ventilation_speed=await self._safe_fetch(self.requested_ventilation_speed), + rf_address=await _safe_fetch(self.node_rf_address), + product_id=await _safe_fetch(self.node_product_id), + sw_version=await _safe_fetch(self.node_software_version), + product_name=await _safe_fetch(self.node_product_name), + rf_comm_status=await _safe_fetch(self.node_rf_comm_status), + battery_status=await _safe_fetch(self.node_battery_status), + fault_status=await _safe_fetch(self.node_fault_status), + bound_status=await _safe_fetch(self.device_bound_status), + value_error_status=await _safe_fetch(self.device_value_error_status), + requested_ventilation_speed=await _safe_fetch(self.requested_ventilation_speed), ) + + async def print_data(self) -> None: + """ + Print labels + states for this particular model in CLI. + + :return: no confirmation, outputs to serial monitor + """ + res = await self.fetch_node_data() # customised per model + + print("Node data") + print("---------") + print(f" {'Product ID:': <25}{res['product_id']} (0x{res['product_id']:08X})") + print(f" {'Product Name:': <25}{res['product_name']}") + print(f" {'Software version:': <25}{res['sw_version']}") + print(f" {'RF address:': <25}{res['rf_address']}") + print("") + + print("Device data") + print("---------") + print(f" {'RF comm status:': <25}{res['rf_comm_status']}") + print(f" {'Battery status:': <25}{res['battery_status']}") + print(f" {'Fault status:': <25}{res['fault_status']}") + print(f" {'Bound status:': <25}{res['bound_status']}") + print(f" {'Value error status:': <25}{res['value_error_status']}") + print("") + + # super().print_data(res) # no superclass set up for VMN yet + print("VMN-05LM02 data") + print("----------------") + print(f" {'Requested ventilation speed:': <40}{res['requested_ventilation_speed']}") diff --git a/src/pyairios/node.py b/src/pyairios/node.py index 8800da3..c754047 100644 --- a/src/pyairios/node.py +++ b/src/pyairios/node.py @@ -5,7 +5,7 @@ from typing import Callable, Dict, List from .client import AsyncAiriosModbusClient -from .constants import BatteryStatus, FaultStatus, ProductId, RFCommStatus, RFStats +from .constants import BatteryStatus, FaultStatus, RFCommStatus, RFStats from .data_model import AiriosNodeData from .exceptions import AiriosException from .registers import ( @@ -21,7 +21,7 @@ U32Register, ) -_LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) class Reg(RegisterAddress): @@ -57,8 +57,16 @@ class Reg(RegisterAddress): FAULT_HISTORY_COMM_STATUS = 40307 +async def _safe_fetch(fetcher: Callable): + try: + result = await fetcher() + except AiriosException: + return None + return result + + class AiriosNode: - """Represents a RF node.""" + """Represents an RF node.""" client: AsyncAiriosModbusClient slave_id: int @@ -67,6 +75,8 @@ class AiriosNode: def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: """Initialize the node class instance.""" + LOGGER.debug("Init AiriosNode@%s", slave_id) + self.client = client self.slave_id = int(slave_id) node_registers: List[RegisterBase] = [ @@ -99,6 +109,7 @@ def __init__(self, slave_id: int, client: AsyncAiriosModbusClient) -> None: U32Register(Reg.FAULT_HISTORY_STATUS_INFO, RegisterAccess.READ), U16Register(Reg.FAULT_HISTORY_COMM_STATUS, RegisterAccess.READ), ] + LOGGER.debug("Add node registers") self._add_registers(node_registers) def _add_registers(self, reglist: List[RegisterBase]): @@ -111,15 +122,14 @@ async def node_rf_address(self) -> Result[int]: """Get the node RF address, also used as node serial number.""" return await self.client.get_register(self.regmap[Reg.RF_ADDRESS], self.slave_id) - async def node_product_id(self) -> Result[ProductId]: + async def node_product_id(self) -> Result[int]: """Get the node product ID. This is the value assigned to the virtual node instance created by the bridge when - a device is bound. The actual received product ID from the real RF node can is + a device is bound. The actual received product ID from the real RF node is available in the RECEIVED_PRODUCT_ID register. """ - result = await self.client.get_register(self.regmap[Reg.PRODUCT_ID], self.slave_id) - return Result(ProductId(result.value), None) + return await self.client.get_register(self.regmap[Reg.PRODUCT_ID], self.slave_id) async def node_software_version(self) -> Result[int]: """Get the node software version.""" @@ -151,14 +161,13 @@ async def node_product_name(self) -> Result[str]: """Get the node product name.""" return await self.client.get_register(self.regmap[Reg.PRODUCT_NAME], self.slave_id) - async def node_received_product_id(self) -> Result[ProductId]: + async def node_received_product_id(self) -> Result[int]: """Get the received product ID. This is the value received from the bound node. If it does not match register - NODE_PRODUCT_ID a wrong product is bound. + PRODUCT_ID, a wrong product is bound. """ - result = await self.client.get_register(self.regmap[Reg.RECEIVED_PRODUCT_ID], self.slave_id) - return Result(ProductId(result.value), result.status) + return await self.client.get_register(self.regmap[Reg.RECEIVED_PRODUCT_ID], self.slave_id) async def node_rf_comm_status(self) -> Result[RFCommStatus]: """Get the node RF communication status.""" @@ -194,12 +203,12 @@ async def node_rf_stats(self) -> RFStats: for i in range(0, nrecs): ok = await self.client.set_register(self.regmap[Reg.RF_STATS_INDEX], i, self.slave_id) if not ok: - _LOGGER.warning("Failed to write %d to RF stats index register", i) + LOGGER.warning("Failed to write %d to RF stats index register", i) continue r = await self.client.get_register(self.regmap[Reg.RF_STATS_DEVICE], self.slave_id) device_id: int = r.value r = await self.client.get_register(self.regmap[Reg.RF_STATS_AVERAGE], self.slave_id) - averate: int = r.value + average: int = r.value r = await self.client.get_register(self.regmap[Reg.RF_STATS_STDDEV], self.slave_id) stddev: float = r.value r = await self.client.get_register(self.regmap[Reg.RF_STATS_MIN], self.slave_id) @@ -214,7 +223,7 @@ async def node_rf_stats(self) -> RFStats: age = datetime.timedelta(minutes=r.value) rec = RFStats.Record( device_id=device_id, - averate=averate, + average=average, stddev=stddev, minimum=minimum, maximum=maximum, @@ -225,23 +234,16 @@ async def node_rf_stats(self) -> RFStats: recs.append(rec) return RFStats(records=recs) - async def _safe_fetch(self, fetcher: Callable): - try: - result = await fetcher() - except AiriosException: - return None - return result - async def fetch_node(self) -> AiriosNodeData: # pylint: disable=duplicate-code """Fetch relevant node data at once.""" return AiriosNodeData( slave_id=self.slave_id, - rf_address=await self._safe_fetch(self.node_rf_address), - product_id=await self._safe_fetch(self.node_product_id), - sw_version=await self._safe_fetch(self.node_software_version), - product_name=await self._safe_fetch(self.node_product_name), - rf_comm_status=await self._safe_fetch(self.node_rf_comm_status), - battery_status=await self._safe_fetch(self.node_battery_status), - fault_status=await self._safe_fetch(self.node_fault_status), + rf_address=await _safe_fetch(self.node_rf_address), + product_id=await _safe_fetch(self.node_product_id), + sw_version=await _safe_fetch(self.node_software_version), + product_name=await _safe_fetch(self.node_product_name), + rf_comm_status=await _safe_fetch(self.node_rf_comm_status), + battery_status=await _safe_fetch(self.node_battery_status), + fault_status=await _safe_fetch(self.node_fault_status), ) diff --git a/src/pyairios/registers.py b/src/pyairios/registers.py index ded4198..8028f6e 100644 --- a/src/pyairios/registers.py +++ b/src/pyairios/registers.py @@ -1,6 +1,7 @@ """Register definitions.""" import datetime +import logging import struct import typing as t from dataclasses import dataclass @@ -11,6 +12,8 @@ from .constants import ValueStatusFlags, ValueStatusSource from .exceptions import AiriosDecodeError, AiriosInvalidArgumentException +LOGGER = logging.getLogger(__name__) + T = t.TypeVar("T") @@ -73,12 +76,12 @@ def __init__(self, address: RegisterAddress, length: int, access: RegisterAccess def decode(self, registers: list[int]) -> str: """Decode register bytes to value.""" - def registers_to_bytearray(registers: list[int]) -> bytearray: + def registers_to_bytearray(_registers: list[int]) -> bytearray: """Convert registers to bytes.""" - b = bytearray() - for x in registers: - b.extend(x.to_bytes(2, "big")) - return b + _b = bytearray() + for x in _registers: + _b.extend(x.to_bytes(2, "big")) + return _b b = registers_to_bytearray(registers) @@ -102,6 +105,17 @@ def encode(self, value: str) -> list[int]: class NumberRegister(RegisterBase[T]): """Base class for number registers.""" + min: int = 0 + max: int = 2**64 - 1 + + def clamp(self, value: int) -> int: + """Clamp provided value to datatype range.""" + if value < self.min or value > self.max: + raise AiriosInvalidArgumentException( + f"Entered value {value} is out of range [{self.min}..{self.max}]" + ) + return value + def decode(self, registers: list[int]) -> T: """Decode register bytes to value.""" result: T = t.cast( @@ -112,19 +126,37 @@ def decode(self, registers: list[int]) -> T: def encode(self, value: T) -> list[int]: """Encode value to register bytes.""" - if isinstance(value, int): - return ModbusClientMixin.convert_to_registers(value, self.datatype, word_order="little") - if isinstance(value, (bool, float)): - return ModbusClientMixin.convert_to_registers( - int(value), self.datatype, word_order="little" - ) - raise AiriosInvalidArgumentException(f"Unsupported type {type(value)}") + try: + if isinstance(value, (str, int, bool, float)): + reg_value = int(value) + else: + raise AiriosInvalidArgumentException(f"Unsupported type {type(value)}") + reg_value = self.clamp(reg_value) + except ValueError as ex: + msg = f"Invalid value {value}" + raise AiriosInvalidArgumentException(msg) from ex + return ModbusClientMixin.convert_to_registers(reg_value, self.datatype, word_order="little") + + +class U8Register(NumberRegister[int]): + """Unsigned 8-bit entry, sent to modbus as UINT16 register.""" + + datatype = ModbusClientMixin.DATATYPE.UINT16 + min = 0 + max = 2**8 - 1 + + def __init__(self, address: RegisterAddress, access: RegisterAccess) -> None: + """Initialize the U8Register instance.""" + description = RegisterDescription(address, 1, access) + super().__init__(description) class U16Register(NumberRegister[int]): """Unsigned 16-bit register.""" datatype = ModbusClientMixin.DATATYPE.UINT16 + min = 0 + max = 2**16 - 1 def __init__(self, address: RegisterAddress, access: RegisterAccess) -> None: """Initialize the U16Register instance.""" @@ -136,6 +168,8 @@ class I16Register(NumberRegister[int]): """Signed 16-bit register.""" datatype = ModbusClientMixin.DATATYPE.INT16 + min = 2**15 * -1 + max = 2**15 - 1 def __init__(self, address: RegisterAddress, access: RegisterAccess) -> None: """Initialize the I16Register instance.""" @@ -147,6 +181,8 @@ class U32Register(NumberRegister[int]): """Unsigned 32-bit register.""" datatype = ModbusClientMixin.DATATYPE.UINT32 + min = 0 + max = 2**32 - 1 def __init__(self, address: RegisterAddress, access: RegisterAccess) -> None: """Initialize the U32Register instance.""" @@ -158,6 +194,8 @@ class FloatRegister(NumberRegister[float]): """Float register.""" datatype = ModbusClientMixin.DATATYPE.FLOAT32 + min = 0 + max = 2**32 - 1 def __init__(self, address: RegisterAddress, access: RegisterAccess) -> None: """Initialize the FloatRegister instance."""