135 lines
5.2 KiB
Python
135 lines
5.2 KiB
Python
from bproto_base import bproto_Package, calc_crc_sum
|
|
from bproto_error import bproto_Error, bproto_PackageErrorInvalidSize, bproto_PackageErrorInvalidMessageID, bproto_PackageErrorInvalidCRC
|
|
|
|
from HCP_protocol_enum import HcpEnumErrorCodes, HcpEnumInlineMotionUpdateDangerLvl, HcpEnumInlineErrorRecoveryStatus
|
|
from HCP_protocol_bitfield import HcpBitfieldEnables, HcpBitfieldInlineErrorEnables
|
|
from HCP_protocol_message_ids import MessageIds_HCP, MESSAGE_SIZE_MAP_HCP
|
|
|
|
from typing import Annotated, List
|
|
import struct
|
|
|
|
|
|
class HcpMessageMotionUpdate(bproto_Package):
|
|
ID = MessageIds_HCP.MSG_ID_HCP_MESSAGE_MOTION_UPDATE
|
|
SIZE = 58
|
|
|
|
speed: Annotated[List[int], 3]
|
|
stearing: float
|
|
name: str
|
|
enable: Annotated[List[bool], 4]
|
|
heading: str
|
|
enables: HcpBitfieldEnables
|
|
danger_lvl: HcpEnumInlineMotionUpdateDangerLvl
|
|
|
|
def __init__(self,
|
|
speed: Annotated[List[int], 3] = 0,
|
|
stearing: float = 0.0,
|
|
name: str = '',
|
|
enable: Annotated[List[bool], 4] = False,
|
|
heading: str = '',
|
|
enables: HcpBitfieldEnables = HcpBitfieldEnables(),
|
|
danger_lvl: HcpEnumInlineMotionUpdateDangerLvl = HcpEnumInlineMotionUpdateDangerLvl.WARNING,
|
|
):
|
|
self.speed = speed
|
|
self.stearing = stearing
|
|
self.name = name
|
|
self.enable = enable
|
|
self.heading = heading
|
|
self.enables = enables
|
|
self.danger_lvl = danger_lvl
|
|
|
|
def to_bytes(self) -> bytes:
|
|
res = struct.pack(">B", self.ID.value)
|
|
res += struct.pack('>iii', self.speed[0], self.speed[1], self.speed[2])
|
|
res += struct.pack('>f', self.stearing)
|
|
res += self.name.encode('ascii')[:32].ljust(32, b'\x00')
|
|
res += struct.pack('>BBBB', self.enable[0], self.enable[1], self.enable[2], self.enable[3])
|
|
res += self.heading.encode('ascii')[:1].ljust(1, b'\x00')
|
|
res += self.enables.to_bytes()
|
|
res += struct.pack('>B', self.danger_lvl.value)
|
|
res += calc_crc_sum(res)
|
|
return res
|
|
|
|
def from_bytes(self, data: bytes):
|
|
# msg_id = struct.unpack('>B', data[:1])
|
|
data = data[1:]
|
|
self.speed = list(struct.unpack('>iii', data[:12]))
|
|
data = data[12:]
|
|
self.stearing = struct.unpack('>f', data[:4])[0]
|
|
data = data[4:]
|
|
self.name = data[:32].decode('ascii').strip('\x00')
|
|
data = data[32:]
|
|
self.enable = [bool(i) for i in struct.unpack('>BBBB', data[:4])]
|
|
data = data[4:]
|
|
self.heading = data[:1].decode('ascii').strip('\x00')
|
|
data = data[1:]
|
|
self.enables = HcpBitfieldEnables().from_bytes(data[:1])
|
|
data = data[1:]
|
|
self.danger_lvl = HcpEnumInlineMotionUpdateDangerLvl(struct.unpack('>B', data[:1])[0])
|
|
data = data[1:]
|
|
# crc = data[:2]
|
|
return self
|
|
|
|
|
|
class HcpMessageError(bproto_Package):
|
|
ID = MessageIds_HCP.MSG_ID_HCP_MESSAGE_ERROR
|
|
SIZE = 5
|
|
|
|
recovery_status: HcpEnumInlineErrorRecoveryStatus
|
|
enables: HcpBitfieldInlineErrorEnables
|
|
|
|
def __init__(self,
|
|
recovery_status: HcpEnumInlineErrorRecoveryStatus = HcpEnumInlineErrorRecoveryStatus.YES,
|
|
enables: HcpBitfieldInlineErrorEnables = HcpBitfieldInlineErrorEnables(),
|
|
):
|
|
self.recovery_status = recovery_status
|
|
self.enables = enables
|
|
|
|
def to_bytes(self) -> bytes:
|
|
res = struct.pack(">B", self.ID.value)
|
|
res += struct.pack('>B', self.recovery_status.value)
|
|
res += self.enables.to_bytes()
|
|
res += calc_crc_sum(res)
|
|
return res
|
|
|
|
def from_bytes(self, data: bytes):
|
|
# msg_id = struct.unpack('>B', data[:1])
|
|
data = data[1:]
|
|
self.recovery_status = HcpEnumInlineErrorRecoveryStatus(struct.unpack('>B', data[:1])[0])
|
|
data = data[1:]
|
|
self.enables = HcpBitfieldInlineErrorEnables().from_bytes(data[:1])
|
|
data = data[1:]
|
|
# crc = data[:2]
|
|
return self
|
|
|
|
|
|
def parse_package(data: bytes) -> tuple[bproto_Package, int]:
|
|
if len(data) < 1:
|
|
raise bproto_PackageErrorInvalidSize("Package has no data")
|
|
|
|
msg_id = struct.unpack('>B', data[:1])
|
|
|
|
if msg_id not in MessageIds_HCP:
|
|
raise bproto_PackageErrorInvalidMessageID(f"Message ID '{msg_id}' is invaild")
|
|
|
|
msg_id = MessageIds_HCP(msg_id)
|
|
expected_size = MESSAGE_SIZE_MAP_HCP.get(msg_id) or 0
|
|
|
|
if expected_size > len(data):
|
|
raise bproto_PackageErrorInvalidSize(f"Package is to short for '{msg_id}' ({expected_size} > {len(data)})")
|
|
|
|
pkg_data = data[:expected_size]
|
|
|
|
crc_data = pkg_data[-2:]
|
|
|
|
if calc_crc_sum(pkg_data[:-2]) != crc_data:
|
|
raise bproto_PackageErrorInvalidCRC(f"CRC {crc_data.hex()} did not match calculated")
|
|
|
|
match(msg_id):
|
|
case MessageIds_HCP.MSG_ID_HCP_MESSAGE_MOTION_UPDATE:
|
|
return HcpMessageMotionUpdate().from_bytes(pkg_data), expected_size
|
|
case MessageIds_HCP.MSG_ID_HCP_MESSAGE_ERROR:
|
|
return HcpMessageError().from_bytes(pkg_data), expected_size
|
|
case _:
|
|
raise bproto_Error("Message ID could not be interpreted; this should not have happen; its a developer error! Create a issue")
|