Files
gobot/i2c-hub/uart-adapter/src/gobotrpc/util.py
2025-01-03 12:33:51 +01:00

59 lines
1.6 KiB (Stored with Git LFS)
Python

from jsonschema import validate
import enum
def check_gobotrpc_packge_json(data: dict):
validate(instance=data, schema={
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["Req", "Res", "Error"]
},
"rpc": {
"type": "string",
"enum": ["Vacum"]
},
"data": {"type": "object"}
},
"required": ["type", "data", "rpc"]
})
class GobotRPCNumber(enum.IntEnum):
VACUM = 0x8
class GobotRPCType(enum.IntEnum):
REQ = 0b00
RES = 0b01
ERROR = 0b10
def gen_gobotrpc_header(rpc: GobotRPCNumber, type: GobotRPCType, size: int) -> bytes:
n = ((rpc.value & 0b1111) << 4)
n |= ((type.value & 0b11) << 2)
return int.to_bytes(n, 1, "big") + int.to_bytes(size, 1, "big")
def extract_gobotrpc_header(data: bytes) -> tuple[GobotRPCNumber, GobotRPCType]:
n = int.from_bytes(data[:1], "big")
rpc = GobotRPCNumber((n >> 4) & 0b1111)
type = GobotRPCType((n >> 2) & 0b11)
sizes = data[1]
return GobotRPCNumber(rpc), GobotRPCType(type), sizes
def crc16(data: bytes, poly=0x8408):
'''
CRC-16-CCITT Algorithm
'''
data = bytearray(data)
crc = 0xFFFF
for b in data:
cur_byte = 0xFF & b
for _ in range(0, 8):
if (crc & 0x0001) ^ (cur_byte & 0x0001):
crc = (crc >> 1) ^ poly
else:
crc >>= 1
cur_byte >>= 1
crc = (~crc & 0xFFFF)
crc = (crc << 8) | ((crc >> 8) & 0xFF)
return crc & 0xFFFF