Files
gobot/i2c-hub/uart-adapter/src/uart_interface/serial.py
2025-01-07 21:13:38 +01:00

79 lines
2.3 KiB (Stored with Git LFS)
Python

import serial
import typing
import threading
import traceback
import queue
import time
import gobotrpc
import uart_interface
from uart_interface.pares_packages import ci_package_from_bytes, CI_ParserError
from uart_interface.ci_packages import CI_SET_ADDR_PORT_MAP_Package
from gobotrpc import GobotRPC_PackageParseError
GOBOT_RPC_CALLBACK_TYPE = typing.Callable[[gobotrpc.GobotRPCPackage], None]
class GobotRPC_CI_Hardware_Serial(uart_interface.GobotRPC_CI_Hardware):
def __init__(self, port: str, baudrate: int):
self.serial = serial.Serial(port, baudrate, timeout=0.1)
self.rx_queue = queue.Queue()
self.running = True
self.read_thread = threading.Thread(target=self.__read_thread)
self.read_thread.daemon = True
self.read_thread.start()
def __read_thread(self):
data = b""
while self.running:
data = data + self.serial.read(256)
if len(data) == 0:
continue
ci_package = None
while len(data) > 1:
length = data[1]
data_package = data[:length]
try:
ci_package = ci_package_from_bytes(data_package)
data = data[length:]
except CI_ParserError as e:
print(e)
data = data[1:]
continue
except GobotRPC_PackageParseError as e:
print(e)
data = data[1:]
continue
except Exception as e:
data = data[1:]
print("Data:", data_package.hex())
traceback.print_exc()
continue
self.rx_queue.put(ci_package)
def send_ci_package(self, ci_package: uart_interface.CI_Package):
data = ci_package.to_bytes()
self.__send(data)
def rx_ci_package(self) -> uart_interface.CI_Package:
return self.rx_queue.get()
def __send(self, data: bytes):
print("Sending:", data.hex())
self.serial.write(data)
self.serial.flush()
time.sleep(0.05)
def close(self):
self.running = False
self.read_thread.join()
self.serial.close()