import serial import typing import threading import traceback import queue import gobotrpc import uart_interface from uart_interface.pares_packages import ci_package_from_bytes GOBOT_RPC_CALLBACK_TYPE = typing.Callable[[gobotrpc.GobotRPCPackage], None] class GobotRPC_CI_Hardware_Serial(uart_interface.GobotRPC_CI_Hardware): def __init__(self, port: str, baudrate: int): self.serial = serial.Serial(port, baudrate, timeout=0.1) self.rx_queue = queue.Queue() self.running = True self.read_thread = threading.Thread(target=self.__read_thread) self.read_thread.daemon = True self.read_thread.start() def __read_thread(self): while self.running: data = self.serial.read(256) if len(data) == 0: continue ci_package = None try: ci_package = ci_package_from_bytes(data) except Exception as e: print("Data:", data.hex()) traceback.print_exc() continue self.rx_queue.put(ci_package) def send_ci_package(self, ci_package: uart_interface.CI_Package): data = ci_package.to_bytes() self.__send(data) def rx_ci_package(self) -> uart_interface.CI_Package: return self.rx_queue.get() def __send(self, data: bytes): print("Sending:", data.hex()) self.serial.write(data) def close(self): self.running = False self.read_thread.join() self.serial.close()