mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-27 14:33:05 +00:00
添加AGV工作站设备驱动、注册表定义、批量转运编译器和消息定义。 包含跨工作站批量转运协议、AGV路径规划、容量分批等功能。 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
128 lines
4.4 KiB
Python
128 lines
4.4 KiB
Python
"""
|
||
AGV 通用转运工站 Driver
|
||
|
||
继承 WorkstationBase,通过 WorkstationNodeCreator 自动获得 ROS2WorkstationNode 能力。
|
||
Warehouse 作为 children 中的资源节点,由 attach_resource() 自动注册到 resource_tracker。
|
||
deck=None,不使用 PLR Deck 抽象。
|
||
"""
|
||
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from pylabrobot.resources import Deck
|
||
|
||
from unilabos.devices.workstation.workstation_base import WorkstationBase
|
||
from unilabos.resources.warehouse import WareHouse
|
||
from unilabos.utils import logger
|
||
|
||
|
||
class AGVTransportStation(WorkstationBase):
|
||
"""通用 AGV 转运工站
|
||
|
||
初始化链路(零框架改动):
|
||
ROS2DeviceNode.__init__():
|
||
issubclass(AGVTransportStation, WorkstationBase) → True
|
||
→ WorkstationNodeCreator.create_instance(data):
|
||
data["deck"] = None
|
||
→ DeviceClassCreator.create_instance(data) → AGVTransportStation(deck=None, ...)
|
||
→ attach_resource(): children 中 type="warehouse" → resource_tracker.add_resource(wh)
|
||
→ ROS2WorkstationNode(protocol_type=[...], children=[nav, arm], ...)
|
||
→ driver.post_init(ros_node):
|
||
self.carrier 从 resource_tracker 中获取 WareHouse
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
deck: Optional[Deck] = None,
|
||
children: Optional[List[Any]] = None,
|
||
route_table: Optional[Dict[str, Dict[str, str]]] = None,
|
||
device_roles: Optional[Dict[str, str]] = None,
|
||
**kwargs,
|
||
):
|
||
super().__init__(deck=None, **kwargs)
|
||
self.route_table: Dict[str, Dict[str, str]] = route_table or {}
|
||
self.device_roles: Dict[str, str] = device_roles or {}
|
||
|
||
# ============ 载具 (Warehouse) ============
|
||
|
||
@property
|
||
def carrier(self) -> Optional[WareHouse]:
|
||
"""从 resource_tracker 中找到 AGV 载具 Warehouse"""
|
||
if not hasattr(self, "_ros_node"):
|
||
return None
|
||
for res in self._ros_node.resource_tracker.resources:
|
||
if isinstance(res, WareHouse):
|
||
return res
|
||
return None
|
||
|
||
@property
|
||
def capacity(self) -> int:
|
||
"""AGV 载具总容量(slot 数)"""
|
||
wh = self.carrier
|
||
if wh is None:
|
||
return 0
|
||
return wh.num_items_x * wh.num_items_y * wh.num_items_z
|
||
|
||
@property
|
||
def free_slots(self) -> List[str]:
|
||
"""返回当前空闲 slot 名称列表"""
|
||
wh = self.carrier
|
||
if wh is None:
|
||
return []
|
||
ordering = getattr(wh, "_ordering", {})
|
||
return [name for name, site in ordering.items() if site.resource is None]
|
||
|
||
@property
|
||
def occupied_slots(self) -> Dict[str, Any]:
|
||
"""返回已占用的 slot → Resource 映射"""
|
||
wh = self.carrier
|
||
if wh is None:
|
||
return {}
|
||
ordering = getattr(wh, "_ordering", {})
|
||
return {name: site.resource for name, site in ordering.items() if site.resource is not None}
|
||
|
||
# ============ 路由查询 ============
|
||
|
||
def resolve_route(self, from_station: str, to_station: str) -> Dict[str, str]:
|
||
"""查询路由表,返回导航和机械臂指令
|
||
|
||
Args:
|
||
from_station: 来源工站 ID
|
||
to_station: 目标工站 ID
|
||
|
||
Returns:
|
||
{"nav_command": "...", "arm_pick": "...", "arm_place": "..."}
|
||
|
||
Raises:
|
||
KeyError: 路由表中未找到对应路线
|
||
"""
|
||
route_key = f"{from_station}->{to_station}"
|
||
if route_key not in self.route_table:
|
||
raise KeyError(f"路由表中未找到路线: {route_key}")
|
||
return self.route_table[route_key]
|
||
|
||
def get_device_id(self, role: str) -> str:
|
||
"""获取子设备 ID
|
||
|
||
Args:
|
||
role: 设备角色,如 "navigator", "arm"
|
||
|
||
Returns:
|
||
设备 ID 字符串
|
||
|
||
Raises:
|
||
KeyError: 未配置该角色的设备
|
||
"""
|
||
if role not in self.device_roles:
|
||
raise KeyError(f"未配置设备角色: {role},当前已配置: {list(self.device_roles.keys())}")
|
||
return self.device_roles[role]
|
||
|
||
# ============ 生命周期 ============
|
||
|
||
def post_init(self, ros_node) -> None:
|
||
super().post_init(ros_node)
|
||
wh = self.carrier
|
||
if wh is not None:
|
||
logger.info(f"AGV {ros_node.device_id} 载具已就绪: {wh.name}, 容量={self.capacity}")
|
||
else:
|
||
logger.warning(f"AGV {ros_node.device_id} 未发现 Warehouse 载具资源")
|