diff --git a/han/common.py b/han/common.py index 08c1b25..b8c0c8d 100644 --- a/han/common.py +++ b/han/common.py @@ -11,6 +11,7 @@ class MeterMessageType(Enum): UNKNOWN = auto() HDLC_DLMS = auto() + WIRED_MB_HDLC = auto() DLMS = auto() P1 = auto() diff --git a/han/wiredmb.py b/han/wiredmb.py new file mode 100644 index 0000000..e940d58 --- /dev/null +++ b/han/wiredmb.py @@ -0,0 +1,163 @@ +"""Use this module to Wired M-Bus profile HDLC frames.""" +from __future__ import annotations + +import logging + +from han.common import MeterMessageBase, MeterMessageType, MeterReaderBase + +_LOGGER = logging.getLogger(__name__) + +FRAME_START: int = 0x68 +FRAME_END: int = 0x16 + + +class LongFrame(MeterMessageBase): + """Wired M-Bus profile HDLC frame.""" + + # Long Frame + # With the long frame, after the start character 68h, the length field + # (L field) is first transmitted twice, followed by the start character once + # again. After this, there follow the function field (C field), the address + # field (A field) and the control information field (CI field). The L field + # gives the quantity of the user data inputs plus 3 (for C,A,CI). After the + # user data inputs, the check sum is transmitted, which is built up over the + # same area as the length field, and in conclusion the stop character 16h + # is transmitted. + + def __init__(self, buffer: bytearray) -> None: + """Construct Wired M-Bus HdlcFrame.""" + self._frame_data = buffer + if len(buffer) <= 6: + raise ValueError("To short to be a Long Frame.") + if buffer[0] != FRAME_START: + raise ValueError(f"Frame must start with {hex(FRAME_START)}") + if buffer[3] != FRAME_START: + raise ValueError(f"Frame have {hex(FRAME_START)} in 4th position.") + if buffer[len(buffer) - 1] != FRAME_END: + raise ValueError(f"Frame must end with {hex(FRAME_END)}") + + @property + def message_type(self) -> MeterMessageType: + """Return MeterMessageType of message.""" + return MeterMessageType.WIRED_MB_HDLC + + @property + def is_valid(self) -> bool: + """Return True when valitation (checksum etc.) is successfull.""" + return self._calculate_cheksum() == self.cheksum_field + + @property + def as_bytes(self) -> bytes: + """Return frame data bytes.""" + return bytes(self._frame_data) + + @property + def payload(self) -> bytes: + """Frame payload as bytes.""" + return bytes(self._frame_data[7:-2]) + + @property + def len_field(self) -> int: + """Frame payload as bytes.""" + return self._frame_data[1] + + @property + def cheksum_field(self) -> int: + """Frame payload as bytes.""" + return self._frame_data[len(self._frame_data) - 2] + + @property + def function_field(self) -> int: + """Get function field (C).""" + return self._frame_data[4] + + @property + def address_field(self) -> int: + """Get address field (A).""" + return self._frame_data[5] + + @property + def controll_info_field(self) -> int: + """Get control information field (CI).""" + return self._frame_data[6] + + def _calculate_cheksum(self) -> int: + total = 0 + for byte in self._frame_data[4:-2]: + total = total + byte + return total % 256 + + +class FrameReader(MeterReaderBase[LongFrame]): + """Use this class to read Wired M-Bus profile HDLC-frames as stream of bytes.""" + + def __init__(self) -> None: + """Initialize FrameReader.""" + self._raw_data = bytearray() + self._buffer = bytearray() + self._is_int_hunt_mode = True + + @property + def is_in_hunt_mode(self) -> bool: + """Return True when reader is hunting for start of frame.""" + return True + + @staticmethod + def find_frame(buffer: bytes, start: int = 0) -> tuple[int, int]: + """Search for complete frame in buffer.""" + while start < len(buffer) - 9: + start_pos = buffer.find(FRAME_START, start) + if start_pos < 0: + break + start = start_pos + 1 + + if ( + len(buffer) > start_pos + 3 + and buffer[start_pos + 3] == FRAME_START + and buffer[start_pos + 1] == buffer[start_pos + 2] + ): + pos = start_pos + 4 + 2 + data_sum = 0 + while pos < len(buffer): + data_sum = data_sum + buffer[pos - 2] + if buffer[pos] == FRAME_END: + calculated_cheksum = data_sum % 256 + cheksum = buffer[pos - 1] + if calculated_cheksum == cheksum: + return (start_pos, pos) + pos = pos + 1 + + return (-1, -1) + + def read(self, data_chunk: bytes) -> list[LongFrame]: + """ + Call this function to read chunks of bytes. + + :param data_chunk: next bytes to parsed. + :return: frame when a frame is complete (both with correct and incorrect checksum). + """ + frames_received: list[LongFrame] = [] + + # if self.is_in_hunt_mode: + # flag_pos = data_chunk.find(FRAME_START) + # if flag_pos >= 0: + # # trim data before flag sequence + # self._buffer.extend(data_chunk[flag_pos:]) + # else: + self._buffer.extend(data_chunk) + + max_end = 0 + start = 0 + while start >= 0: + start, end = FrameReader.find_frame(self._buffer, start) + if end > 0: + max_end = end + frame = LongFrame(self._buffer[start : end + 1]) + frames_received.append(frame) + start = end + + if max_end > 0: + # trim buffer start + self._buffer = self._buffer[max_end + 1 :] + + return frames_received diff --git a/tests/test_wiredmb.py b/tests/test_wiredmb.py new file mode 100644 index 0000000..07b02f1 --- /dev/null +++ b/tests/test_wiredmb.py @@ -0,0 +1,150 @@ +"""Wired M-Bus profile tests.""" +# pylint: disable = no-self-use +from __future__ import annotations + +from han.wiredmb import FrameReader + +# From https://www.netz-noe.at/Download-(1)/Smart-Meter/218_9_SmartMeter_Kundenschnittstelle_lektoriert_14.aspx +TEST_FRAME_NETZNOE_HEX = ( + "68FAFA68" # M-Bus Start + "53FF000167DB08" + "4B464D6750000009" # System Title + "81F82000000023" + "88D5AB4F97515AAFC6B88D2F85DAA7A0E3C0C40D004535C397C9D037AB7DBDA3" # Data + "29107615444894A1A0DD7E85F02D496CECD3FF46AF5FB3C9229CFE8F3EE4606A" # Data + "B2E1F409F36AAD2E50900A4396FC6C2E083F373233A69616950758BFC7D63A9E" # Data + "9B6E99E21B2CBC2B934772CA51FD4D69830711CAB1F8CFF25F0A329337CBA519" # Data + "04F0CAED88D61968743C8454BA922EB00038182C22FE316D16F2A9F544D6F75D" # Data + "51A4E92A1C4EF8AB19A2B7FEAA32D0726C0ED80229AE6C0F7621A4209251ACE2" # Data + "B2BC66FF0327A653BB686C756BE033C7A281F1D2A7E1FA31C3983E15F8FD16CC" # Data + "5787E6F517166814146853FF110167419A3CFDA44BE438C96F0E38BF83D9" # Data + "83" # Checksum + "16" # M-Bus Stop +) +TEST_FRAMES_TEST_FRAME_NETZNOE = bytes.fromhex(TEST_FRAME_NETZNOE_HEX.replace(" ", "")) +TEST_KEY_NETZNOE_HEX = "36C66639E48A8CA4D6BC8B282A793BBB" +TEST_KEY_NETZNOE = bytes.fromhex(TEST_KEY_NETZNOE_HEX) + +TEST_FRAMES_SAGEMCOM_HEX = ( + "68 0d0d 68 53ff110167cd6bcb691353ff98 34 16" + "68 0101 68 53ff000167db085341475905e6d9" + "fd81f8200069d15055282ce997468261193e23788ae6e242d1d644ba2c3c" + "550e594702dc8dd41091676b769bf02f42bfd9d2fea2b3aa11b1bf7b8bb3" + "36fe7eb022d76010481b77aac2dc998dc2c45d78835392e86644cc3243a9" + "e822b20edfd839b3215be6a8f1835e855aa35d2b92ed59d7242ccc26aba6" + "0afe78b0e9d37c6db8320f36c0a09ba256730856ee9bad7cccf36bec1363" + "552a280e7a9bd92a6208d59cade8436d7aca8bddbfdb3fe1883f9db97c19" + "d3688c57ab82464b75b8f39e2c22062a9378565676516511c71278abf297" + "97512a16705630c9120008bc80556e4451a193cdcfba9adaca481974e470" + "1ead639916" + "68 0d0d 68 53ff110167212b325274004041 90 16" + "6801016853ff" + "000167db085341475905e6d9fd81f8200069d151a80c896b6823fe94573b" + "ab39569f26e5d9a7101cf3e40e2ec68d3f0afe8b54edacae84368672b6aa" + "0db39488c0374c7509536e3f44e1a928f8287ad4e0650afa46a908a63aee" + "c620b57ce8f8c1924084542ef499a90486429e2fe3d528929980ee10a496" + "7fbc726333324e1cff711c4b66ce489b46ffa536f2e6fe84e83856652e59" + "794b2aa184b4635325ea02f19b50a2cafbde22bbe824a57052f464f593d7" + "169da1906cf304c626956e603ec64af6babbac01fe2374263f7ef105bd76" + "2a7c34fafcef1f40466ef16fda367500e61abfc2b57f34d7374ca06ccca7" + "33ef9c4ab0e750670afb8518253167dd16" + "68 0d0d 68 53ff110167cd91da347ace84ad b0 16" + "68 0101 68 53ff000167db085341475905e6d9fd81f8200069" + "d1527f86d1dd40fdf72c67324f5d6956b08ffb04a9e603c6a76b7f86e7bf" + "2de6440942bfc8b392d1ebcaefb2785614f9326b8fa48e44db86b8d18382" + "67bcdf3d85de429ffa8869f406c8cb4ca8ffe67a44d3299790ca1b22f7d8" + "8d6ef769341c7f6ab6aa9596d04895020e76c0be3194a1726630e272d982" + "301e9a81dc23d7aa10e59119aa5b97f451211797e62e25d27c8de0d81019" + "edb62ab729ea4bb135f989651a4d45bec53fe9862414a35b20ad9f20a557" + "9fde82ad584d65354d4e74d20feade264f48ab3de0910d9c1de6c82b4fc2" + "3b53212a2682b27d9f9383919f4c0b473a48607ba3126d0b9a40778816" + "68 0d0d 68 53ff110167c791cbb97b23ac8d 7e 16" + "68 0101 68 53ff000167db085341475905e6d9fd81f8200069d1534f0c" + "66ddde9731c08be52bcec3995117" + "defeab9e2f1b7606718d2e9dbf546ee5b37bce053458226d538f956a0a6f" + "ea888718cd29b4b1998e0305eb1b619b360999e942d4d5bce757111e95fe" + "9b76cfc21a52ee702b1f6a52ca9085ffabd1f0e52090823e5e2e2560d0fb" + "74a17ca201c2ac404ac60f828c0cce18b8181def94cc5416d8641f60b334" + "b80e0c10561189d61e26911f85c4be556996dad0d69e694a8f10bf379768" + "553e92b1f17621bf340354dbf12c239fb77902e837ddaf157970c495c928" + "904e6fbcfa52e7fe27b5f36ec4c3c137cfc97ce8b1107f6b4d4988cd2b60" + "2251d75c5fe6aa0be12b16" + "68 0d0d 68 53ff110167c5b163a02439f557ed16" + "68 0101 68 53ff000167db085341475905e6d9fd81f8200069d1545bd24f76" + "372343d5d1c102a8e891416d6cf7e5101ed56a1b738c54b18732fb85a6f1" + "80ce40b9485f547f1f6a18f2549eedd91833dc52e214e1bf65fe7b589b37" + "89938d98af63fb90faa1028b8032bf7d9d614256f02e7ad11bb08809dc21" + "f66e0e5dd3d9b86669962d60cc552cf1e3a15b3056ab57fe5d6a4fe440cf" + "a16422c74e60ddda8e08172f49f2c80df5a0ed0627e0a5f91cb27a975e59" + "0accc8a925ab1f6ab4adde8bffaf9a4f7f44d300183457e315a2db4ed02a" + "205461604750b96c8ed0d47c27360c890b82dd142dbea19cd9df31f9bf46" + "a414264d8604285444f3499c12cf0215f94db7ab4ab2 16" + "68 0d0d 68 53ff1101677074a730844741be 50 16" + "68 0101 68 53ff000167db085341475905e6d9" + "fd81f8200069d15559041dd6a49628335da5a58f222f263c77f2943cd7ed" + "6524aa5d965e9571e31c99834031a13f2ebd1f32a7ce5f32590555acc9c0" + "9d2a59af093a8161bcdb4d60cf8d65bf6e06e57359fe1d1cd31c08ec5c8e" + "57aa3ee6066d7145ceabdc5c25ac1509bca2bde812c292c99ed138a40259" + "389838633b45b41a204d3405744650bea587d17a5f98919adae9fa1eaa72" + "10583c0a5d46814e57b298df8b228920af894269d80ea06ff9463395a19c" + "65ac22e2dde89b29c5fcf0fb1d6e4a0ca5088793df569ae51d8d8667f2c2" + "199a4862760bed310286cac53fd18503aba1bb05e7bfbf8f1dc3d740ca62" + "81e120 56 16" + "68 0d0d 68 53ff110167161191d77c8164a5 60 16" + "68 0101 6853ff" + "000167db085341475905e6d9fd81f8200069d1562c98ee9a75bce5969ebe" + "84a21800e19de6596ff11a7123dc4a8ea80aec3cf6cdb194a3967514bd63" + "0ab6df53643e8aefb5eff79dc6b9dc25889299bfc2eb163c50f96bbe4431" + "01f101c2543435e8335c46d52e0a37bc413b342f9c1a964473b2c02808c7" + "3b7aedf41d21bac431e27a7a3018f1d8980dca65d7c2bc1ea847efd15b44" + "3377260c3b6dbd806150414856ee7121fd144a62bfd1131a02e37a5db7a8" + "bfe6cd07ac7ec378feba4ef676e5240ed19792212cf3eb29beb5e3c07398" + "83e5741fbe9093237efbbe466c106e5ea938716b9d78991eff8fd38331ae" + "fc371bf09d57e5157d5b7bc606e600 a3 16" + "68 0d0d 68 53ff110167efc7499ad6b0e5e2 b1 16" + "68 0101 68 53ff000167db085341475905e6d9fd81f8200069" + "d1573adc24aa69f04330bd9560a4f8c76c276d91b6167e3285955c122770" + "1df8cb086ed88193c3c680d891971fb2b1fbbce88571908f5ca54bfcc94f" + "a3dac33e4fe1206cfaa3faf75dec6d8099582c81d32728aae4c05bddd0ae" + "1c1e975d91e7da1d7508d1865835a4fb150af54f4c7bfecdaec251a176a8" + "9b3d75bcde32e77b498dabaeb434ab2fa4b245f2a7705ef667bc5cbc00c9" + "d520c4737ae1942af91d2fd8ceb8e62b38a7e111958c975b840a0be6c84f" + "d3ee81c9da2a23514238eed87b203da3c9f19815cd2205954a4b668bae52" + "9de01fd34167468c71209efd9c8fceeaa02759221bbf810b6965af0e16" + "68 0d0d 68 53ff11016758fd6d08ed2c3d20 0b 16" + "68 0101 68 53ff000167db0853" + "41475905e6d9fd81f8200069d158293d242b13d20611e6ac5872ab4a19f0" + "9c750abb8c6a518f823b24b980eba4b4e4000b5860636c130beb89e16d3b" + "d92504a2cafb158bc9962111cdde48111f97306473e320ad5177a9c35d42" + "be3bffba7afb63baa0dce996005a48b9ecb0e3d3a6c4e6c802069254c634" + "8cfaa985ca894349ff16c6966c6e82cfcf8225deb016404a54fa09e6a282" + "cad0eff9310de0111b020dc11dccbf68f6ed38f2df58d9e5e027d1aa448b" + "e70fd06a9ed1632bca61a8160b90775b7ff5d9594ea2099ad450810b6d87" + "b2c88ef9d67b4b9ba714901cd07c73ed894364d109bfd0ef9b776beff716" + "17ef334fa275d9c46e b5 16" + "68 0d0d 68 53ff110167a334fb003563fe275a16" +) + +TEST_FRAMES_SAGEMCOM = bytes.fromhex(TEST_FRAMES_SAGEMCOM_HEX.replace(" ", "")) + + +class TestFrameReader: + """Test FrameReader.""" + + def test_sagemcom_frames(self): + """Test read Sagemcom frames.""" + frame_reader = FrameReader() + frames = frame_reader.read(TEST_FRAMES_SAGEMCOM) + assert frames is not None + for frame in frames: + assert frame is not None + assert frame.is_valid + + def test_nets_noe_frames(self): + """Test read Sagemcom frames.""" + frame_reader = FrameReader() + frames = frame_reader.read(TEST_KEY_NETZNOE) + assert frames is not None + for frame in frames: + assert frame is not None + assert frame.is_valid