From 267bf18e2b7f33ee62ca4e51c7cf09f725479791 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Tue, 3 Feb 2026 19:37:40 +0100 Subject: [PATCH 1/4] Remove the parameter queue and use the write_pending flag instead With these changes, the data to be written is stored directly in the field object, so that the values can also be set directly in the field via `value`. --- luxtronik/cfi/interface.py | 37 ++++---- luxtronik/cfi/parameters.py | 1 - luxtronik/shi/interface.py | 2 + tests/cfi/test_cfi_interface.py | 20 +++- tests/cfi/test_cfi_parameters.py | 23 ++--- tests/shi/test_shi_interface.py | 151 ++++++++++++++++++++----------- tests/test_socket_interaction.py | 44 +++++---- 7 files changed, 180 insertions(+), 98 deletions(-) diff --git a/luxtronik/cfi/interface.py b/luxtronik/cfi/interface.py index 0d030f3b..27bd7616 100644 --- a/luxtronik/cfi/interface.py +++ b/luxtronik/cfi/interface.py @@ -162,23 +162,24 @@ def _write_and_read(self, parameters, data): return self._read(data) def _write(self, parameters): - for index, value in parameters.queue.items(): - if not isinstance(index, int) or not isinstance(value, int): - LOGGER.warning( - "%s: Parameter id '%s' or value '%s' invalid!", - self._host, - index, - value, - ) - continue - LOGGER.info("%s: Parameter '%d' set to '%s'", self._host, index, value) - self._send_ints(LUXTRONIK_PARAMETERS_WRITE, index, value) - cmd = self._read_int() - LOGGER.debug("%s: Command %s", self._host, cmd) - val = self._read_int() - LOGGER.debug("%s: Value %s", self._host, val) - # Flush queue after writing all values - parameters.queue = {} + for index, field in parameters.data.items(): + if field.write_pending: + field.write_pending = False + value = field.raw + if not isinstance(index, int) or not isinstance(value, int): + LOGGER.warning( + "%s: Parameter id '%s' or value '%s' invalid!", + self._host, + index, + value, + ) + continue + LOGGER.info("%s: Parameter '%d' set to '%s'", self._host, index, value) + self._send_ints(LUXTRONIK_PARAMETERS_WRITE, index, value) + cmd = self._read_int() + LOGGER.debug("%s: Command %s", self._host, cmd) + val = self._read_int() + LOGGER.debug("%s: Value %s", self._host, val) # Give the heatpump a short time to handle the value changes/calculations: time.sleep(WAIT_TIME_AFTER_PARAMETER_WRITE) @@ -285,6 +286,8 @@ def _parse(self, data_vector, raw_data): # remove all used indices from the list of undefined indices for index in range(definition.index, next_idx): undefined.discard(index) + # integrate_data() also resets the write_pending flag, + # intentionally only for read fields pair.integrate_data(raw_data, LUXTRONIK_CFI_REGISTER_BIT_SIZE) # create an unknown field for additional data diff --git a/luxtronik/cfi/parameters.py b/luxtronik/cfi/parameters.py index 345b7d1b..f75c00c1 100644 --- a/luxtronik/cfi/parameters.py +++ b/luxtronik/cfi/parameters.py @@ -33,7 +33,6 @@ def __init__(self, safe=True): """Initialize parameters class.""" super().__init__() self.safe = safe - self.queue = {} for d in PARAMETERS_DEFINITIONS: self._data.add(d, d.create_field()) diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py index 025bbc0c..53326943 100644 --- a/luxtronik/shi/interface.py +++ b/luxtronik/shi/interface.py @@ -321,6 +321,8 @@ def _integrate_data(self, telegrams_data): success = True for block, telegram, read_not_write in telegrams_data: if (read_not_write == READ): + # integrate_data() also resets the write_pending flag, + # intentionally only for read fields valid = block.integrate_data(telegram.data) if not valid: LOGGER.debug(f"Failed to integrate read data into {block}") diff --git a/tests/cfi/test_cfi_interface.py b/tests/cfi/test_cfi_interface.py index 755b9e59..6a839468 100644 --- a/tests/cfi/test_cfi_interface.py +++ b/tests/cfi/test_cfi_interface.py @@ -18,35 +18,53 @@ def test_parse(self): n = 2000 t = list(range(0, n + 1)) + parameters[0].write_pending = True lux._parse(parameters, t) p = parameters.get(n) assert p.name == f"unknown_parameter_{n}" assert p.raw == n + assert not p.write_pending + calculations[0].write_pending = True lux._parse(calculations, t) c = calculations.get(n) assert c.name == f"unknown_calculation_{n}" assert c.raw == n + assert not c.write_pending + visibilities[0].write_pending = True lux._parse(visibilities, t) v = visibilities.get(n) assert v.name == f"unknown_visibility_{n}" assert v.raw == n + assert not v.write_pending n = 10 t = list(range(0, n + 1)) + parameters[0].write_pending = True + parameters[20].write_pending = True + parameters[40].write_pending = True lux._parse(parameters, t) for definition, field in parameters.data.pairs(): if definition.index > n: assert field.raw is None + assert not field.write_pending + calculations[0].write_pending = True + calculations[20].write_pending = True + calculations[40].write_pending = True lux._parse(calculations, t) for definition, field in calculations.data.pairs(): if definition.index > n: assert field.raw is None + assert not field.write_pending + visibilities[0].write_pending = True + visibilities[20].write_pending = True + visibilities[40].write_pending = True lux._parse(visibilities, t) for definition, field in visibilities.data.pairs(): if definition.index > n: - assert field.raw is None \ No newline at end of file + assert field.raw is None + assert not field.write_pending \ No newline at end of file diff --git a/tests/cfi/test_cfi_parameters.py b/tests/cfi/test_cfi_parameters.py index 417e146c..e9930fab 100644 --- a/tests/cfi/test_cfi_parameters.py +++ b/tests/cfi/test_cfi_parameters.py @@ -15,11 +15,9 @@ def test_init(self): assert parameters.name == "parameter" assert parameters.parameters == parameters._data assert parameters.safe - assert len(parameters.queue) == 0 parameters = Parameters(False) assert not parameters.safe - assert len(parameters.queue) == 0 def test_data(self): """Test cases for the data dictionary""" @@ -84,18 +82,21 @@ def test_set(self): # Set something which does not exist parameters.set("BarFoo", 0) - assert len(parameters.queue) == 0 + assert parameters["BarFoo"] is None - # Set something which is not allowed to be set - parameters.set("ID_Transfert_LuxNet", 0) - assert len(parameters.queue) == 0 + # Set something which was previously (v0.3.14) not allowed to be set + parameters.set("ID_Transfert_LuxNet", 1) + assert parameters["ID_Transfert_LuxNet"].raw == 1 + assert parameters["ID_Transfert_LuxNet"].write_pending # Set something which is allowed to be set - parameters.set("ID_Einst_WK_akt", 0) - assert len(parameters.queue) == 1 + parameters.set("ID_Einst_WK_akt", 2) + assert parameters["ID_Einst_WK_akt"].raw == 20 + assert parameters["ID_Einst_WK_akt"].write_pending parameters = Parameters(safe=False) - # Set something which is not allowed to be set, but we are brave. - parameters.set("ID_Transfert_LuxNet", 0) - assert len(parameters.queue) == 1 + # Set something which was previously (v0.3.14) not allowed to be set, but we are brave. + parameters.set("ID_Transfert_LuxNet", 4) + assert parameters["ID_Transfert_LuxNet"].raw == 4 + assert parameters["ID_Transfert_LuxNet"].write_pending diff --git a/tests/shi/test_shi_interface.py b/tests/shi/test_shi_interface.py index dd1596a6..fa5ad2be 100644 --- a/tests/shi/test_shi_interface.py +++ b/tests/shi/test_shi_interface.py @@ -28,6 +28,10 @@ ) from tests.fake import FakeModbus +IDX_BLK = 0 +IDX_TLG = 1 +IDX_RNW = 2 + class TestLuxtronikSmartHomeData: @@ -244,8 +248,8 @@ def test_create_telegram(self): telegram = self.interface._create_telegram(block, "input", False) assert telegram is None - def test_create_telegrams(self): - blocks_list = [] + def create_contiguous_block_list(self): + block_list = [] blocks = ContiguousDataBlockList("holding", True) # block 1 @@ -260,86 +264,129 @@ def test_create_telegrams(self): # block 3 blocks.append_single(HOLDINGS_DEFINITIONS[10], HOLDINGS_DEFINITIONS[10].create_field()) - blocks_list.append(blocks) + block_list.append(blocks) blocks = ContiguousDataBlockList("holding", False) - # invalid block + # invalid block because of invalid data blocks.append_single(HOLDINGS_DEFINITIONS[12], HOLDINGS_DEFINITIONS[12].create_field()) + # block 4 field3 = HOLDINGS_DEFINITIONS[17].create_field() blocks.append_single(HOLDINGS_DEFINITIONS[17], field3) - blocks_list.append(blocks) + block_list.append(blocks) field3.raw = 17 - telegram_data = self.interface._create_telegrams(blocks_list) + assert len(block_list) == 2 + assert len(block_list[0]) == 3 + assert len(block_list[0][0]) == 2 + assert len(block_list[0][1]) == 1 + assert len(block_list[0][2]) == 1 + assert len(block_list[1]) == 2 + assert len(block_list[1][0]) == 1 + assert len(block_list[1][1]) == 1 + + return block_list + + def test_create_telegrams(self): + block_list = self.create_contiguous_block_list() + + telegram_data = self.interface._create_telegrams(block_list) assert len(telegram_data) == 4 + + # Note: telegram_data[block index][tuple index] + # Note: telegram_data[block index][IDX_BLK][part index] + # blocks - assert len(telegram_data[0][0]) == 2 - assert telegram_data[0][0].first_index == 10 - assert telegram_data[0][0].overall_count == 2 - assert len(telegram_data[1][0]) == 1 - assert telegram_data[1][0].first_index == 17 - assert telegram_data[1][0].overall_count == 1 - assert len(telegram_data[2][0]) == 1 - assert telegram_data[2][0].first_index == 10 - assert telegram_data[2][0].overall_count == 1 - assert len(telegram_data[3][0]) == 1 - assert telegram_data[3][0].first_index == 17 - assert telegram_data[3][0].overall_count == 1 + assert len(telegram_data[0][IDX_BLK]) == 2 + assert telegram_data[0][IDX_BLK].first_index == 10 + assert telegram_data[0][IDX_BLK].overall_count == 2 + assert len(telegram_data[1][IDX_BLK]) == 1 + assert telegram_data[1][IDX_BLK].first_index == 17 + assert telegram_data[1][IDX_BLK].overall_count == 1 + assert len(telegram_data[2][IDX_BLK]) == 1 + assert telegram_data[2][IDX_BLK].first_index == 10 + assert telegram_data[2][IDX_BLK].overall_count == 1 + assert len(telegram_data[3][IDX_BLK]) == 1 + assert telegram_data[3][IDX_BLK].first_index == 17 + assert telegram_data[3][IDX_BLK].overall_count == 1 # telegrams - assert telegram_data[0][1].count == 2 - assert telegram_data[1][1].count == 1 - assert telegram_data[2][1].count == 1 - assert telegram_data[3][1].count == 1 + assert telegram_data[0][IDX_TLG].count == 2 + assert telegram_data[1][IDX_TLG].count == 1 + assert telegram_data[2][IDX_TLG].count == 1 + assert telegram_data[3][IDX_TLG].count == 1 # read not write - assert telegram_data[0][2] - assert telegram_data[1][2] - assert telegram_data[2][2] - assert not telegram_data[3][2] + assert telegram_data[0][IDX_RNW] + assert telegram_data[1][IDX_RNW] + assert telegram_data[2][IDX_RNW] + assert not telegram_data[3][IDX_RNW] + + def test_integrate_data(self): + block_list = self.create_contiguous_block_list() + + telegram_data = self.interface._create_telegrams(block_list) # integrate - telegram_data[0][1].data = [18, 4] - telegram_data[1][1].data = [9] - telegram_data[2][1].data = [27] - telegram_data[3][0][0].field.write_pending = True + telegram_data[0][IDX_TLG].data = [18, 4] + telegram_data[0][IDX_BLK][0].field.write_pending = True + telegram_data[0][IDX_BLK][1].field.write_pending = False + telegram_data[1][IDX_TLG].data = [9] + telegram_data[2][IDX_TLG].data = [27] + telegram_data[3][IDX_BLK][0].field.write_pending = True valid = self.interface._integrate_data(telegram_data) assert valid # [index data, index for blocks, index for part] - assert telegram_data[0][0][0].field.raw == 18 - assert telegram_data[0][0][1].field.raw == 4 - assert telegram_data[1][0][0].field.raw == 9 - assert telegram_data[2][0][0].field.raw == 27 - assert not telegram_data[3][0][0].field.write_pending - assert telegram_data[3][0][0].field.raw == 17 # no update + assert telegram_data[0][IDX_BLK][0].field.raw == 18 + assert not telegram_data[0][IDX_BLK][0].field.write_pending + assert telegram_data[0][IDX_BLK][1].field.raw == 4 + assert not telegram_data[0][IDX_BLK][1].field.write_pending + assert telegram_data[1][IDX_BLK][0].field.raw == 9 + assert telegram_data[2][IDX_BLK][0].field.raw == 27 + assert telegram_data[3][IDX_BLK][0].field.raw == 17 # no update + assert not telegram_data[3][IDX_BLK][0].field.write_pending # integrate not available / None -> no error - telegram_data[0][1].data = [18, 4] - telegram_data[1][1].data = [LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE] - telegram_data[2][1].data = [None] + telegram_data[0][IDX_TLG].data = [19, 5] + telegram_data[0][IDX_BLK][0].field.write_pending = True + telegram_data[1][IDX_TLG].data = [LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE] + telegram_data[1][IDX_BLK][0].field.write_pending = True + telegram_data[2][IDX_TLG].data = [None] + telegram_data[2][IDX_BLK][0].field.write_pending = True + telegram_data[3][IDX_BLK][0].field.write_pending = True valid = self.interface._integrate_data(telegram_data) assert valid # [index data, index for blocks, index for part] - assert telegram_data[0][0][0].field.raw == 18 - assert telegram_data[0][0][1].field.raw == 4 - assert telegram_data[1][0][0].field.raw is None - assert telegram_data[2][0][0].field.raw is None - assert telegram_data[3][0][0].field.raw == 17 # no update + assert telegram_data[0][IDX_BLK][0].field.raw == 19 + assert not telegram_data[0][IDX_BLK][0].field.write_pending + assert telegram_data[0][IDX_BLK][1].field.raw == 5 + assert not telegram_data[0][IDX_BLK][1].field.write_pending + assert telegram_data[1][IDX_BLK][0].field.raw is None + assert not telegram_data[1][IDX_BLK][0].field.write_pending # update with none -> reset flag + assert telegram_data[2][IDX_BLK][0].field.raw is None + assert not telegram_data[2][IDX_BLK][0].field.write_pending # update with none -> reset flag + assert telegram_data[3][IDX_BLK][0].field.raw == 17 # no update + assert not telegram_data[3][IDX_BLK][0].field.write_pending # integrate too less -> error - telegram_data[0][1].data = [18] - telegram_data[1][1].data = [1] - telegram_data[2][1].data = [None] + telegram_data[0][IDX_TLG].data = [18] + telegram_data[0][IDX_BLK][0].field.write_pending = True + telegram_data[0][IDX_BLK][1].field.write_pending = True + telegram_data[1][IDX_TLG].data = [2] + telegram_data[1][IDX_BLK][0].field.write_pending = True + telegram_data[2][IDX_TLG].data = [None] valid = self.interface._integrate_data(telegram_data) assert not valid # [index data, index for blocks, index for part] - assert telegram_data[0][0][0].field.raw == 18 - assert telegram_data[0][0][1].field.raw == 4 # no update - assert telegram_data[1][0][0].field.raw == 1 - assert telegram_data[2][0][0].field.raw is None - assert telegram_data[3][0][0].field.raw == 17 # no update + assert telegram_data[0][IDX_BLK][0].field.raw == 19 # no update + assert telegram_data[0][IDX_BLK][0].field.write_pending # no update + assert telegram_data[0][IDX_BLK][1].field.raw == 5 # no update + assert telegram_data[0][IDX_BLK][1].field.write_pending # no update + assert telegram_data[1][IDX_BLK][0].field.raw == 2 + assert not telegram_data[1][IDX_BLK][0].field.write_pending + assert telegram_data[2][IDX_BLK][0].field.raw is None + assert telegram_data[3][IDX_BLK][0].field.raw == 17 # no update def test_prepare(self): definition = HOLDINGS_DEFINITIONS[2] diff --git a/tests/test_socket_interaction.py b/tests/test_socket_interaction.py index bf0a1577..9d001f21 100644 --- a/tests/test_socket_interaction.py +++ b/tests/test_socket_interaction.py @@ -96,21 +96,29 @@ def test_luxtronik_socket_interface(self): # Finally, writing p = Parameters() - p.queue = {0: 100, 1: 200} + p[1].raw = 100 + p[1].write_pending = True + p[2].raw = 200 + p[2].write_pending = True lux.write(p) s = FakeSocket.last_instance - assert s.written_values[0] == 100 - assert s.written_values[1] == 200 - assert len(p.queue) == 0 + assert s.written_values[1] == 100 + assert s.written_values[2] == 200 + assert not p[1].write_pending + assert not p[2].write_pending p = Parameters() - p.queue = {2: 300, 3: "test"} + p[3].raw = 300 + p[3].write_pending = True + p[4].raw = "test" + p[4].write_pending = True d = lux.write_and_read(p) s = FakeSocket.last_instance - assert s.written_values[2] == 300 + assert s.written_values[3] == 300 # Make sure that the non-int value is not written: - assert 3 not in s.written_values - assert len(p.queue) == 0 + assert 4 not in s.written_values + assert not p[3].write_pending + assert not p[4].write_pending assert self.check_luxtronik_data(d) def test_luxtronik(self): @@ -141,16 +149,18 @@ def test_luxtronik(self): ########################## # Test the write routine # ########################## - lux.parameters.queue = {0: 500} + lux.parameters[1].raw = 500 + lux.parameters[1].write_pending = True lux.write() s = FakeSocket.last_instance - assert s.written_values[0] == 500 + assert s.written_values[1] == 500 p = Parameters() - p.queue = {1: 501} + p[2].raw = 501 + p[2].write_pending = True lux.write(p) s = FakeSocket.last_instance - assert s.written_values[1] == 501 + assert s.written_values[2] == 501 # lux.write() and lux.write(p) should not read: assert self.check_luxtronik_data(lux, False) @@ -158,22 +168,24 @@ def test_luxtronik(self): ################################### # Test the write_and_read routine # ################################### - lux.parameters.queue = {2: 502} + lux.parameters[3].raw = 502 + lux.parameters[3].write_pending = True lux.write_and_read() # Currently write_and_read triggers two separate connections/operations s = FakeSocket.prev_instance - assert s.written_values[2] == 502 + assert s.written_values[3] == 502 # Now, the values should be read assert self.check_luxtronik_data(lux) self.clear_luxtronik_data(lux) - p.queue = {3: 503} + p[4].raw = 503 + p[4].write_pending = True lux.write_and_read(p) # Currently write_and_read triggers two separate connections/operations s = FakeSocket.prev_instance - assert s.written_values[3] == 503 + assert s.written_values[4] == 503 # Now, the values should be read assert self.check_luxtronik_data(lux) From 3e47f1ee3749ecf01d7e063f139bb17b47ba10bc Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Thu, 5 Feb 2026 23:15:42 +0100 Subject: [PATCH 2/4] Move the set method into the data_vector base class --- luxtronik/cfi/parameters.py | 15 --------------- luxtronik/data_vector.py | 14 ++++++++++++-- luxtronik/shi/vector.py | 21 +-------------------- 3 files changed, 13 insertions(+), 37 deletions(-) diff --git a/luxtronik/cfi/parameters.py b/luxtronik/cfi/parameters.py index f75c00c1..8f984ce7 100644 --- a/luxtronik/cfi/parameters.py +++ b/luxtronik/cfi/parameters.py @@ -39,18 +39,3 @@ def __init__(self, safe=True): @property def parameters(self): return self._data - - def set(self, target, value): - """Set parameter to new value.""" - index, parameter = self._lookup(target, with_index=True) - if index is not None: - if parameter.writeable or not self.safe: - raw = parameter.to_heatpump(value) - if isinstance(raw, int): - self.queue[index] = raw - else: - LOGGER.error("Value '%s' for Parameter '%s' not valid!", value, parameter.name) - else: - LOGGER.warning("Parameter '%s' not safe for writing!", parameter.name) - else: - LOGGER.warning("Parameter '%s' not found", target) diff --git a/luxtronik/data_vector.py b/luxtronik/data_vector.py index 97512c9b..9569eca8 100644 --- a/luxtronik/data_vector.py +++ b/luxtronik/data_vector.py @@ -8,6 +8,7 @@ ) from luxtronik.collections import LuxtronikFieldsDictionary +from luxtronik.datatypes import Base LOGGER = logging.getLogger(__name__) @@ -121,5 +122,14 @@ def get(self, target): return entry def set(self, target, value): - "TODO: Placeholder for future changes" - pass + """ + Set the value of a field to the given value. + + The value is set, even if the field marked as non-writeable. + No data validation is performed either. + """ + field = target + if not isinstance(field, Base): + field = self.get(target) + if field is not None: + field.value = value \ No newline at end of file diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index 36e7d39b..45eda78c 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -287,23 +287,4 @@ def get(self, def_name_or_idx, default=None): If multiple fields added for the same index/name, the last added takes precedence. """ - return self._data.get(def_name_or_idx, default) - - def set(self, def_field_name_or_idx, value): - """ - Set field to new value. - - The value is set, even if the field marked as non-writeable. - No data validation is performed either. - - Args: - def_field_name_or_idx (LuxtronikDefinition | Base | int | str): - Definition, name, or register index to be used to search for the field. - It is also possible to pass the field itself. - value (int | List[int]): Value to set - """ - field = def_field_name_or_idx - if not isinstance(field, Base): - field = self.get(def_field_name_or_idx) - if field is not None: - field.value = value \ No newline at end of file + return self._data.get(def_name_or_idx, default) \ No newline at end of file From 21c50234c0d71db8e68dda018e2fed4b918948b4 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Thu, 5 Feb 2026 23:23:08 +0100 Subject: [PATCH 3/4] Add a check_for_write() method as replacement for the previously removed checks within set() --- luxtronik/cfi/interface.py | 2 +- luxtronik/datatypes.py | 32 +++++++++++++++++++++++++++++++- luxtronik/shi/interface.py | 10 ++++------ 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/luxtronik/cfi/interface.py b/luxtronik/cfi/interface.py index 27bd7616..5a536d51 100644 --- a/luxtronik/cfi/interface.py +++ b/luxtronik/cfi/interface.py @@ -166,7 +166,7 @@ def _write(self, parameters): if field.write_pending: field.write_pending = False value = field.raw - if not isinstance(index, int) or not isinstance(value, int): + if not isinstance(index, int) or not field.check_for_write(parameters.safe): LOGGER.warning( "%s: Parameter id '%s' or value '%s' invalid!", self._host, diff --git a/luxtronik/datatypes.py b/luxtronik/datatypes.py index f8431c4b..f4ee08f6 100755 --- a/luxtronik/datatypes.py +++ b/luxtronik/datatypes.py @@ -1,6 +1,7 @@ """datatype conversions.""" import datetime +import logging import socket import struct @@ -14,6 +15,9 @@ from functools import total_ordering +LOGGER = logging.getLogger(__name__) + + @total_ordering class Base: """Base datatype, no conversions.""" @@ -83,6 +87,8 @@ def value(self): def value(self, value): """Converts the value into heatpump units and store it.""" self._raw = self.to_heatpump(value) + if self._raw is None: + LOGGER.warning(f"Value '{value}' not valid for field '{self.name}'") self.write_pending = True @property @@ -92,7 +98,7 @@ def raw(self): @raw.setter def raw(self, raw): - """Store the raw data.""" + """Store the raw data. For internal use only""" self._raw = raw self.write_pending = False @@ -141,6 +147,30 @@ def __lt__(self, other): and self.datatype_unit == other.datatype_unit ) + def check_for_write(self, safe=True): + """ + Returns true if the field is writable and the field data is valid. + + Args: + safe (bool, Default: True): Flag for blocking write operations + if the field is not marked as writable + + Returns: + bool: True if the data is writable, otherwise False. + """ + if self.writeable or not safe: + # We support integers + if isinstance(self._raw, int): + return True + # and list of integers + elif isinstance(self._raw, list) and all(isinstance(value, int) for value in self._raw): + return True + else: + LOGGER.error(f"Value of '{self.name}' invalid!") + else: + LOGGER.warning(f"'{self.name}' not safe for writing!") + return False + class SelectionBase(Base): """Selection base datatype, converts from and to list of codes.""" diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py index 53326943..238d41ab 100644 --- a/luxtronik/shi/interface.py +++ b/luxtronik/shi/interface.py @@ -381,16 +381,14 @@ def _prepare_write_field(self, definition, field, safe, data): if not field.write_pending and data is None: return False - # Abort if field is not writeable - if safe and not (definition.writeable and field.writeable): - LOGGER.warning("Field marked as non-writeable: " \ - + f"name={definition.name}, data={field.raw}") - return False - # Override the field's data with the provided data if data is not None: field.value = data + # Abort if field is not writeable or the value is invalid + if not field.check_for_write(safe): + return False + # Abort if insufficient data is provided if not get_data_arr(definition, field, LUXTRONIK_SHI_REGISTER_BIT_SIZE): LOGGER.warning("Data error / insufficient data provided: " \ From b73590ebc1f34eb8a45d187a4bdfa2d6d4b1929f Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Thu, 5 Feb 2026 23:37:58 +0100 Subject: [PATCH 4/4] Update README.md --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index f40a9c36..d56ff92f 100755 --- a/README.md +++ b/README.md @@ -238,16 +238,17 @@ from luxtronik import Luxtronik, Parameters l = Luxtronik('192.168.1.23', 8889) -# Queue a parameter change -# In this example, the domestic hot water temperature is set to 45 degrees. +# Set the value of a field +# In this example, the domestic hot water temperature +# is set (for the time being only in this field) to 45 degrees l.parameters.set("ID_Soll_BWS_akt", 45.0) -# Write all queued changes to the heat pump +# Then write the data of all changed fields to the to the heat pump l.write() # Another possibility to write parameters parameters = Parameters() -parameters.set("ID_Ba_Hz_akt", "Party") +parameters["ID_Ba_Hz_akt"] = "Party" l.write(parameters) # If you're not sure what values to write, you can get all available options: @@ -261,7 +262,7 @@ l.holdings["heating_mode"] = "Offset" # Set the value to activate the offset mod l.write() # Write down the values to the heatpump ``` -**NOTE:** Writing values to the heat pump is particulary dangerous as this is +**NOTE:** Writing values to the heat pump is particularly dangerous as this is an undocumented API. By default a safe guard is in place, which will prevent writing parameters that are not (yet) understood.