Files
gobot/i2c-hub/backend/src/gobotRPC_ctrl_interface.py
2025-01-01 16:47:53 +01:00

148 lines
4.4 KiB (Stored with Git LFS)
Python

import serial
import threading
import janus
import asyncio
import enum
def isAscii(data: int):
return data <= 0x7F
class GoBotRPC_Numbers(enum.IntEnum):
PUSH_PACKAGE = 0x81
PERFORM_SCAN = 0x82
SCAN_RESULT = 0x83
PACKAGE_SLOT_UPDATE = 0x84
REQ_PACKAGE_SLOT_INFO = 0x85
HEADER_BEAT = 0xfe
GET_INFO = 0xff
class GoBotRPC_Package_SlotStatus():
compleated = False
in_use = False
slot_num = 0
device_addr = 0
timestamp = 0
bits_used = ""
def __init__(self):
pass
def parse_from_bytes(self, data: bytes):
self.compleated = bool(data[1] & 0b10)
self.in_use = bool(data[1] & 0b01)
self.slot_num = data[2]
self.device_addr = int.from_bytes(data[3:7], "big")
self.timestamp = int.from_bytes(data[7:11], "big")
self.bits_used = "{:032b}".format(int.from_bytes(data[11:15]))
def __str__(self):
return f"[{self.slot_num:02d}] {'1' if self.compleated else '0'}{'1' if self.bits_used else '0'} {self.device_addr:08x} {self.timestamp:010d} {self.bits_used}"
RPC_NUMBERS_SET = set(item.value for item in GoBotRPC_Numbers)
def isAscii(data: int):
return (data in range(0x20, 0x7F)) or (data in range(0x09, 0x0D))
PACKAGE_LENGTHS = {
GoBotRPC_Numbers.PUSH_PACKAGE: 10,
GoBotRPC_Numbers.PERFORM_SCAN: 2,
GoBotRPC_Numbers.SCAN_RESULT: 7,
GoBotRPC_Numbers.PACKAGE_SLOT_UPDATE: 15,
GoBotRPC_Numbers.REQ_PACKAGE_SLOT_INFO: 2,
GoBotRPC_Numbers.HEADER_BEAT: 1,
GoBotRPC_Numbers.GET_INFO: 2,
}
class gobotRPC_parser():
def __init__(self, in_queue: janus.AsyncQueue[bytes], out_queue: janus.AsyncQueue[bytes]):
self.in_queue = in_queue
self.out_queue = out_queue
self.running = True
self.buffer = bytearray()
async def parse_loop(self):
while self.running:
first_byte = await self.pull_bytes(1)
rpc_num = first_byte[0]
if rpc_num in RPC_NUMBERS_SET:
length = PACKAGE_LENGTHS.get(GoBotRPC_Numbers(rpc_num), 0)
data = await self.pull_bytes(length - 1)
if rpc_num == GoBotRPC_Numbers.PUSH_PACKAGE:
print("L",data[0])
data = data + await self.pull_bytes(data[0]-1)
print(f"RPC: {rpc_num:02x} {data.hex()}")
async def pull_bytes(self, n=1) -> bytes:
while len(self.buffer) < n:
self.buffer += await self.in_queue.get()
b = self.buffer[:n]
self.buffer = self.buffer[n:]
return b
async def pull_string(self):
res = bytearray()
for n,i in enumerate(self.buffer):
if isAscii(i):
res += bytes([i])
else:
self.buffer = self.buffer[n:]
return res
while True:
data = await self.pull_bytes(1)
for n,i in enumerate(data):
if isAscii(i):
res += bytes([i])
else:
self.buffer = data[n:]
return res
class gobotRPC_Interface():
def __init__(self, serial_port: str, baudrate: int = 9600):
self.ser = serial.Serial(serial_port, baudrate, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_ONE, timeout=0)
self.rxBuffer = bytearray()
self.parser_loop = asyncio.new_event_loop()
self.parse_in_queue = janus.Queue()
self.parse_out_queue = janus.Queue()
self.parser = gobotRPC_parser(self.parse_in_queue.async_q, self.parse_out_queue.async_q)
self.parserThread = threading.Thread(target=self.parserThreadFn, args=(self.parser_loop,))
self.parserThread.daemon = True
self.parserThread.start()
self.readThread = threading.Thread(target=self.readThreadFn)
self.readThread.daemon = True
self.readThread.start()
def write(self, data: bytes):
self.ser.write(data)
def parserThreadFn(self, loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
self.parser_loop.run_until_complete(self.parser.parse_loop())
def readThreadFn(self):
while True:
data = self.ser.read(256)
if len(data) > 0:
for i in data:
print(f"{i:02x}", end=" ", flush=True)
self.parse_in_queue.sync_q.put(data)
if __name__ == "__main__":
interface = gobotRPC_Interface("/dev/ttyACM0")
input()