Files
gobot/i2c-hub/uart-adapter/src/gobot.py
2025-01-09 03:10:03 +01:00

179 lines
5.7 KiB (Stored with Git LFS)
Python

import logging
import threading
import queue
import time
import enum
from gobotrpc import GobotRPC_PackageParseError
from gobotrpc.rpc_packages import GobotRPCGetInfoRequest, GobotRPCPackage
from gobotrpc.rpc_packages_head import (
GobotRPCDropStoneRequest,
GobotRPCDropStoneResponse,
GobotRPCMoveZAxisRequest,
GobotHeadZAxisPosition,
GobotHeadStoneState
)
from gobotrpc.rpc_packages_corexy import (
GobotRPCHomeRequest,
GobotRPCHomeResponse,
GobotRPCReleaseMotorsRequest,
GobotRPCReleaseMotorsResponse,
GobotRPCSetPaddingRequest,
GobotRPCSetPaddingResponse,
GobotRPCGotoRequest,
GobotRPCGotoResponse,
)
from gobotrpc.rpc_packages import GobotRPCVacumRequest, GobotRPCVacumResponse
from uart_interface.serial import GobotRPC_CI_Hardware_Serial
from uart_interface.ci_highLevel import GobotRPC_CI
from uart_interface.ci_packages import CMDS, CI_ParserError, CI_RX_Package, CI_ERROR_TRANMISSION_Package
class GobotAddresses:
COREXY = 0x23
HEAD = 0x22
VACUUM = 0x21
class GobotHardware:
def __init__(self, ci: GobotRPC_CI, logger: logging.Logger):
self.ci = ci
self.running = True
self.logger = logger
self.transmission_lock = threading.Lock()
self.rpcQueues = {
GobotAddresses.COREXY: queue.Queue(),
GobotAddresses.HEAD: queue.Queue(),
GobotAddresses.VACUUM: queue.Queue()
}
self.rpcLocks = {
GobotAddresses.COREXY: threading.Lock(),
GobotAddresses.HEAD: threading.Lock(),
GobotAddresses.VACUUM: threading.Lock()
}
self.last_heartbeat = time.time()
self.rxThread = threading.Thread(target=self.__rxThread)
self.rxThread.daemon = True
self.rxThread.start()
def resetAdapter(self):
self.ci.resetAdapter()
def setPortMap(self, addr: int, port: int):
self.ci.setAddrPortMap(addr, port)
def send(self, pkg: GobotRPCPackage, addr: int) -> CI_RX_Package | None:
self.transmission_lock.acquire()
l = self.rpcLocks.get(addr)
if l is not None:
l.acquire()
self.ci.send_gobotrpc_package(pkg, addr)
q = self.rpcQueues.get(addr)
if q is not None:
return q.get()
def get_info(self, addr: int) -> CI_RX_Package | None:
pkg = self.send(GobotRPCGetInfoRequest(), addr)
return pkg if pkg.get_cmd() == CMDS.RX_PACKAGE else None
# Specific Commands
def corexy_home(self) -> tuple[int, int]:
pkg: CI_RX_Package = self.send(GobotRPCHomeRequest(), GobotAddresses.COREXY)
gorpc_pkg: GobotRPCHomeResponse = pkg.package
print(pkg)
return (gorpc_pkg.x, gorpc_pkg.y)
def corexy_goto(self, x: int, y: int, offset: bool):
pkg: CI_RX_Package = self.send(GobotRPCGotoRequest(x, y, offset), GobotAddresses.COREXY)
def corexy_set_padding(self, p0: tuple[int, int], p1: tuple[int, int], p2: tuple[int, int], dimensions: tuple[int, int]):
pkg: CI_RX_Package = self.send(GobotRPCSetPaddingRequest(p0, p1, p2, dimensions[0], dimensions[1]), GobotAddresses.COREXY)
def corexy_release_motors(self, enable: bool):
pkg: CI_RX_Package = self.send(GobotRPCReleaseMotorsRequest(enable), GobotAddresses.COREXY)
def head_drop_stone(self) -> GobotHeadStoneState:
pkg: CI_RX_Package = self.send(GobotRPCDropStoneRequest(), GobotAddresses.HEAD)
gorpc_pkg: GobotRPCDropStoneResponse = pkg.package
return gorpc_pkg.state
def head_move_z_axis(self, position: GobotHeadZAxisPosition):
pkg: CI_RX_Package = self.send(GobotRPCMoveZAxisRequest(position), GobotAddresses.HEAD)
def vacum_enable(self, enable: bool):
pkg: CI_RX_Package = self.send(GobotRPCVacumRequest(enable), GobotAddresses.VACUUM)
# Thread for receiving packages
def __update_heartbeat(self):
self.last_heartbeat = time.time()
def __rxThread(self):
while True:
try:
ci_package = self.ci.rx_ci_package()
except GobotRPC_PackageParseError as e:
self.logger.error(e)
continue
except CI_ParserError as e:
self.logger.error(e)
continue
cmd = ci_package.get_cmd()
match cmd:
case CMDS.HEARTBEAT:
#self.logger.debug(ci_package)
self.__update_heartbeat()
case CMDS.SUCESS_TRANMISSION:
if self.transmission_lock.locked():
self.transmission_lock.release()
self.logger.info(ci_package)
case CMDS.ERROR_TRANMISSION:
if self.transmission_lock.locked():
self.transmission_lock.release()
pkg: CI_ERROR_TRANMISSION_Package = ci_package
l = self.rpcLocks.get(pkg.addr)
q = self.rpcQueues.get(pkg.addr)
if q is not None and l is not None:
q.put(pkg)
if l is not None and l.locked():
l.release()
self.logger.error(ci_package)
case CMDS.RX_PACKAGE:
pkg: CI_RX_Package = ci_package
l = self.rpcLocks.get(pkg.addr)
q = self.rpcQueues.get(pkg.addr)
if q is not None and l is not None:
q.put(pkg)
if l is not None and l.locked():
l.release()
self.logger.info(ci_package)
case _:
self.logger.info(ci_package)