Source code for modi_plus.util.message_util

import json
import struct
from base64 import b64encode, b64decode
from typing import Optional
from typing import Tuple


[docs]def parse_get_property_message(destination_id: int, property_type: int, property_frequency: int): return parse_message(0x03, 0, destination_id, (property_type, 0x00, property_frequency, 0x00))
[docs]def parse_set_property_message(destination_id: int, property_type: int, property_values: Tuple): data = [] for value_type, value in property_values: data += parse_data(value, value_type) return parse_message(0x04, property_type, destination_id, data)
[docs]def parse_message(command: int, source: int, destination: int, byte_data: Tuple = (None, None, None, None, None, None, None, None)): message = dict() message["c"] = command message["s"] = source message["d"] = destination message["b"] = __encode_bytes(byte_data) message["l"] = len(byte_data) return json.dumps(message, separators=(",", ":"))
def __extract_length(begin: int, src: Tuple) -> int: length = 1 for i in range(begin + 1, len(src)): if not src[i]: length += 1 else: break return length def __encode_bytes(byte_data: Tuple): idx = 0 data = bytearray(len(byte_data)) while idx < len(byte_data): if not byte_data[idx]: idx += 1 elif byte_data[idx] > 256: length = __extract_length(idx, byte_data) data[idx: idx + length] = int.to_bytes( byte_data[idx], byteorder="little", length=length, signed=True ) idx += length elif byte_data[idx] < 0: data[idx: idx + 4] = int.to_bytes( int(byte_data[idx]), byteorder="little", length=4, signed=True ) idx += 4 elif byte_data[idx] < 256: data[idx] = int(byte_data[idx]) idx += 1 return b64encode(bytes(data)).decode("utf8")
[docs]def decode_message(message: str): message = json.loads(message) command = message["c"] source = message["s"] destination = message["d"] data = message["b"] length = message["l"] return command, source, destination, data, length
[docs]def unpack_data(data: str, structure: Tuple = (1, 1, 1, 1, 1, 1, 1, 1)): data = bytearray(b64decode(data.encode("utf8"))) idx = 0 result = [] for size in structure: result.append(int.from_bytes(data[idx:idx + size], byteorder="little")) idx += size return result
[docs]def parse_data(values, data_type: str) -> Optional[Tuple]: if data_type == "string": return tuple(str.encode(values)) elif data_type == "float": return tuple(bytearray(struct.pack("f", values))) elif data_type == "bytes": return values elif data_type in ["s8", "s16", "s32"]: return tuple(int.to_bytes(int(values), byteorder="little", signed=True, length=(int(data_type[1:])) // 8)) elif data_type in ["u8", "u16", "u32"]: return tuple(int.to_bytes(int(values), byteorder="little", signed=False, length=(int(data_type[1:])) // 8)) else: # error type return None
[docs]def decode_data(data: str) -> float: return round(struct.unpack("f", bytes(unpack_data(data)[:4]))[0], 2)