198 lines
6.7 KiB
Python
198 lines
6.7 KiB
Python
import sys
|
|
sys.path.append('src/')
|
|
sys.path.append('test/')
|
|
sys.path.append('test/backend_output/python')
|
|
|
|
|
|
def test_HCP_encdec_MessageMotionUpdate():
|
|
# ignore: F401
|
|
from backend_output.python.HCP_protocol_packets import HcpMessageMotionUpdate
|
|
from backend_output.python.HCP_protocol_bitfield import HcpBitfieldEnables
|
|
from backend_output.python.HCP_protocol_enum import HcpEnumInlineMotionUpdateDangerLvl
|
|
|
|
src_msg = HcpMessageMotionUpdate(
|
|
speed=[2, 3, 4],
|
|
stearing=0.3,
|
|
name="Hello HCP!",
|
|
enable=[True, False, True, False],
|
|
heading="N",
|
|
enables=HcpBitfieldEnables(
|
|
a=True, b=False, c=True
|
|
),
|
|
danger_lvl=HcpEnumInlineMotionUpdateDangerLvl.ERROR
|
|
)
|
|
|
|
data = src_msg.to_bytes()
|
|
|
|
isinstance(data, bytes)
|
|
|
|
dest_msg = HcpMessageMotionUpdate().from_bytes(data)
|
|
|
|
assert isinstance(dest_msg.speed, list)
|
|
assert isinstance(dest_msg.speed[0], int)
|
|
assert src_msg.speed == dest_msg.speed
|
|
|
|
assert isinstance(dest_msg.stearing, float)
|
|
assert round(src_msg.stearing * 1000) == round(dest_msg.stearing * 1000)
|
|
|
|
assert isinstance(dest_msg.name, str)
|
|
assert src_msg.name == dest_msg.name
|
|
|
|
assert isinstance(dest_msg.enable, list)
|
|
assert isinstance(dest_msg.enable[0], bool)
|
|
assert src_msg.enable == dest_msg.enable
|
|
|
|
assert isinstance(dest_msg.heading, str)
|
|
assert src_msg.heading == dest_msg.heading
|
|
|
|
assert isinstance(dest_msg.enables.a, bool)
|
|
assert isinstance(dest_msg.enables.b, bool)
|
|
assert isinstance(dest_msg.enables.c, bool)
|
|
assert src_msg.enables.a == dest_msg.enables.a
|
|
assert src_msg.enables.b == dest_msg.enables.b
|
|
assert src_msg.enables.c == dest_msg.enables.c
|
|
|
|
assert isinstance(src_msg.danger_lvl, HcpEnumInlineMotionUpdateDangerLvl)
|
|
assert src_msg.danger_lvl.value == dest_msg.danger_lvl.value
|
|
|
|
|
|
def test_HCP_encdec_MessageMotionUpdate_failure():
|
|
# ignore: F401
|
|
from backend_output.python.HCP_protocol_packets import HcpMessageMotionUpdate
|
|
from backend_output.python.HCP_protocol_bitfield import HcpBitfieldEnables
|
|
from backend_output.python.HCP_protocol_enum import HcpEnumInlineMotionUpdateDangerLvl
|
|
from backend_output.python.bproto_error import bproto_PackageErrorInvalidMessageID, bproto_PackageErrorInvalidSize
|
|
|
|
src_msg = HcpMessageMotionUpdate(
|
|
speed=[2, 3, 4],
|
|
stearing=0.3,
|
|
name="Hello HCP!",
|
|
enable=[True, False, True, False],
|
|
heading="N",
|
|
enables=HcpBitfieldEnables(
|
|
a=True, b=False, c=True
|
|
),
|
|
danger_lvl=HcpEnumInlineMotionUpdateDangerLvl.ERROR
|
|
)
|
|
|
|
data = src_msg.to_bytes()
|
|
|
|
try:
|
|
HcpMessageMotionUpdate().from_bytes(b"\xaa" + data[1:])
|
|
except Exception as e:
|
|
isinstance(e, bproto_PackageErrorInvalidMessageID)
|
|
|
|
data = src_msg.to_bytes()
|
|
|
|
try:
|
|
HcpMessageMotionUpdate().from_bytes(data[:10])
|
|
except Exception as e:
|
|
isinstance(e, bproto_PackageErrorInvalidSize)
|
|
|
|
|
|
def test_HCP_encdec_HcpMessageError():
|
|
# ignore: F401
|
|
from backend_output.python.HCP_protocol_packets import HcpMessageError
|
|
from backend_output.python.HCP_protocol_bitfield import HcpBitfieldInlineErrorEnables
|
|
from backend_output.python.HCP_protocol_enum import HcpEnumInlineErrorRecoveryStatus
|
|
|
|
src_msg = HcpMessageError(
|
|
recovery_status=HcpEnumInlineErrorRecoveryStatus.YES,
|
|
enables=HcpBitfieldInlineErrorEnables(
|
|
test=True, asd=False, b=True, c=False, aa=True
|
|
)
|
|
)
|
|
|
|
data = src_msg.to_bytes()
|
|
|
|
isinstance(data, bytes)
|
|
|
|
dest_msg = HcpMessageError().from_bytes(data)
|
|
|
|
assert src_msg.recovery_status.value == dest_msg.recovery_status.value
|
|
|
|
assert src_msg.enables.test == dest_msg.enables.test
|
|
assert src_msg.enables.asd == dest_msg.enables.asd
|
|
assert src_msg.enables.b == dest_msg.enables.b
|
|
assert src_msg.enables.c == dest_msg.enables.c
|
|
assert src_msg.enables.aa == dest_msg.enables.aa
|
|
|
|
|
|
def test_HCP_encdec_HcpMessageError_failure():
|
|
# ignore: F401
|
|
from backend_output.python.HCP_protocol_packets import HcpMessageError
|
|
from backend_output.python.HCP_protocol_bitfield import HcpBitfieldInlineErrorEnables
|
|
from backend_output.python.HCP_protocol_enum import HcpEnumInlineErrorRecoveryStatus
|
|
from backend_output.python.bproto_error import bproto_PackageErrorInvalidMessageID, bproto_PackageErrorInvalidSize
|
|
|
|
src_msg = HcpMessageError(
|
|
recovery_status=HcpEnumInlineErrorRecoveryStatus.YES,
|
|
enables=HcpBitfieldInlineErrorEnables(
|
|
test=True, asd=False, b=True, c=False, aa=True
|
|
)
|
|
)
|
|
|
|
data = src_msg.to_bytes()
|
|
|
|
try:
|
|
HcpMessageError().from_bytes(b"\xaa" + data[1:])
|
|
except Exception as e:
|
|
isinstance(e, bproto_PackageErrorInvalidMessageID)
|
|
|
|
data = src_msg.to_bytes()
|
|
|
|
try:
|
|
HcpMessageError().from_bytes(data[:3])
|
|
except Exception as e:
|
|
isinstance(e, bproto_PackageErrorInvalidSize)
|
|
|
|
|
|
def test_generic_parse_package():
|
|
# ignore: F401
|
|
from backend_output.python.HCP_protocol_packets import HcpMessageError, HcpMessageMotionUpdate, parse_package
|
|
from backend_output.python.HCP_protocol_bitfield import HcpBitfieldInlineErrorEnables, HcpBitfieldEnables
|
|
from backend_output.python.HCP_protocol_enum import HcpEnumInlineErrorRecoveryStatus, HcpEnumInlineMotionUpdateDangerLvl
|
|
from backend_output.python.bproto_error import bproto_PackageErrorInvalidMessageID, bproto_PackageErrorInvalidSize
|
|
|
|
src_msg1 = HcpMessageError(
|
|
recovery_status=HcpEnumInlineErrorRecoveryStatus.YES,
|
|
enables=HcpBitfieldInlineErrorEnables(
|
|
test=True, asd=False, b=True, c=False, aa=True
|
|
)
|
|
)
|
|
|
|
src_msg2 = HcpMessageMotionUpdate(
|
|
speed=[2, 3, 4],
|
|
stearing=0.3,
|
|
name="Hello HCP!",
|
|
enable=[True, False, True, False],
|
|
heading="N",
|
|
enables=HcpBitfieldEnables(
|
|
a=True, b=False, c=True
|
|
),
|
|
danger_lvl=HcpEnumInlineMotionUpdateDangerLvl.ERROR
|
|
)
|
|
|
|
dest_msg1, parsed_size = parse_package(src_msg1.to_bytes())
|
|
assert parsed_size == 5
|
|
assert isinstance(dest_msg1, HcpMessageError)
|
|
|
|
dest_msg2, parsed_size = parse_package(src_msg2.to_bytes())
|
|
assert parsed_size == 58
|
|
assert isinstance(dest_msg2, HcpMessageMotionUpdate)
|
|
|
|
try:
|
|
parse_package(b"\xaa" + src_msg2.to_bytes()[1:])
|
|
except Exception as e:
|
|
isinstance(e, bproto_PackageErrorInvalidMessageID)
|
|
|
|
try:
|
|
parse_package(src_msg1.to_bytes()[:2])
|
|
except Exception as e:
|
|
isinstance(e, bproto_PackageErrorInvalidSize)
|
|
|
|
try:
|
|
parse_package(src_msg2.to_bytes()[:2])
|
|
except Exception as e:
|
|
isinstance(e, bproto_PackageErrorInvalidSize)
|