diff --git a/.cursor/skills/add-workstation/SKILL.md b/.cursor/skills/add-workstation/SKILL.md new file mode 100644 index 00000000..534e5ba6 --- /dev/null +++ b/.cursor/skills/add-workstation/SKILL.md @@ -0,0 +1,626 @@ +--- +name: add-workstation +description: Guide for adding new workstations to Uni-Lab-OS (接入新工作站). Uses @device decorator + AST auto-scanning. Walks through workstation type, sub-device composition, driver creation, deck setup, and graph file. Use when the user wants to add a workstation, create a workstation driver, configure a station with sub-devices, or mentions 工作站/工站/station/workstation. +--- + +# Uni-Lab-OS 工作站接入指南 + +工作站(workstation)是组合多个子设备的大型设备,拥有独立的物料管理系统和工作流引擎。使用 `@device` 装饰器注册,AST 自动扫描生成注册表。 + +--- + +## 工作站类型 + +| 类型 | 基类 | 适用场景 | +| ------------------- | ----------------- | ---------------------------------- | +| **Protocol 工作站** | `ProtocolNode` | 标准化学操作协议(泵转移、过滤等) | +| **外部系统工作站** | `WorkstationBase` | 与外部 LIMS/MES 对接 | +| **硬件控制工作站** | `WorkstationBase` | 直接控制 PLC/硬件 | + +--- + +## @device 装饰器(工作站) + +工作站也使用 `@device` 装饰器注册,参数与普通设备一致: + +```python +@device( + id="my_workstation", # 注册表唯一标识(必填) + category=["workstation"], # 分类标签 + description="我的工作站", +) +``` + +如果一个工作站类支持多个具体变体,可使用 `ids` / `id_meta`,与设备的用法相同(参见 add-device SKILL)。 + +--- + +## 工作站驱动模板 + +### 模板 A:基于外部系统的工作站 + +```python +import logging +from typing import Dict, Any, Optional +from pylabrobot.resources import Deck + +from unilabos.registry.decorators import device, topic_config, not_action +from unilabos.devices.workstation.workstation_base import WorkstationBase + +try: + from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode +except ImportError: + ROS2WorkstationNode = None + + +@device(id="my_workstation", category=["workstation"], description="我的工作站") +class MyWorkstation(WorkstationBase): + _ros_node: "ROS2WorkstationNode" + + def __init__(self, config=None, deck=None, protocol_type=None, **kwargs): + super().__init__(deck=deck, **kwargs) + self.config = config or {} + self.logger = logging.getLogger("MyWorkstation") + self.api_host = self.config.get("api_host", "") + self._status = "Idle" + + @not_action + def post_init(self, ros_node: "ROS2WorkstationNode"): + super().post_init(ros_node) + self._ros_node = ros_node + + async def scheduler_start(self, **kwargs) -> Dict[str, Any]: + """注册为工作站动作""" + return {"success": True} + + async def create_order(self, json_str: str, **kwargs) -> Dict[str, Any]: + """注册为工作站动作""" + return {"success": True} + + @property + @topic_config() + def workflow_sequence(self) -> str: + return "[]" + + @property + @topic_config() + def material_info(self) -> str: + return "{}" +``` + +### 模板 B:Protocol 工作站 + +直接使用 `ProtocolNode`,通常不需要自定义驱动类: + +```python +from unilabos.devices.workstation.workstation_base import ProtocolNode +``` + +在图文件中配置 `protocol_type` 即可。 + +--- + +## 子设备访问(sub_devices) + +工站初始化子设备后,所有子设备实例存储在 `self._ros_node.sub_devices` 字典中(key 为设备 id,value 为 `ROS2DeviceNode` 实例)。工站的驱动类可以直接获取子设备实例来调用其方法: + +```python +# 在工站驱动类的方法中访问子设备 +sub = self._ros_node.sub_devices["pump_1"] + +# .driver_instance — 子设备的驱动实例(即设备 Python 类的实例) +sub.driver_instance.some_method(arg1, arg2) + +# .ros_node_instance — 子设备的 ROS2 节点实例 +sub.ros_node_instance._action_value_mappings # 查看子设备支持的 action +``` + +**常见用法**: + +```python +class MyWorkstation(WorkstationBase): + def my_protocol(self, **kwargs): + # 获取子设备驱动实例 + pump = self._ros_node.sub_devices["pump_1"].driver_instance + heater = self._ros_node.sub_devices["heater_1"].driver_instance + + # 直接调用子设备方法 + pump.aspirate(volume=100) + heater.set_temperature(80) +``` + +> 参考实现:`unilabos/devices/workstation/bioyond_studio/reaction_station/reaction_station.py` 中通过 `self._ros_node.sub_devices.get(reactor_id)` 获取子反应器实例并更新数据。 + +--- + +## 硬件通信接口(hardware_interface) + +硬件控制型工作站通常需要通过串口(Serial)、Modbus 等通信协议控制多个子设备。Uni-Lab-OS 通过 **通信设备代理** 机制实现端口共享:一个串口只创建一个 `serial` 节点,多个子设备共享这个通信实例。 + +### 工作原理 + +`ROS2WorkstationNode` 初始化时分两轮遍历子设备(`workstation.py`): + +**第一轮 — 初始化所有子设备**:按 `children` 顺序调用 `initialize_device()`,通信设备(`serial_` / `io_` 开头的 id)优先完成初始化,创建 `serial.Serial()` 实例。其他子设备此时 `self.hardware_interface = "serial_pump"`(字符串)。 + +**第二轮 — 代理替换**:遍历所有已初始化的子设备,读取子设备的 `_hardware_interface` 配置: + +``` +hardware_interface = d.ros_node_instance._hardware_interface +# → {"name": "hardware_interface", "read": "send_command", "write": "send_command"} +``` + +1. 取 `name` 字段对应的属性值:`name_value = getattr(driver, hardware_interface["name"])` + - 如果 `name_value` 是字符串且该字符串是某个子设备的 id → 触发代理替换 +2. 从通信设备获取真正的 `read`/`write` 方法 +3. 用 `setattr(driver, read_method, _read)` 将通信设备的方法绑定到子设备上 + +因此: + +- **通信设备 id 必须与子设备 config 中填的字符串完全一致**(如 `"serial_pump"`) +- **通信设备 id 必须以 `serial_` 或 `io_` 开头**(否则第一轮不会被识别为通信设备) +- **通信设备必须在 `children` 列表中排在最前面**,确保先初始化 + +### HardwareInterface 参数说明 + +```python +from unilabos.registry.decorators import HardwareInterface + +HardwareInterface( + name="hardware_interface", # __init__ 中接收通信实例的属性名 + read="send_command", # 通信设备上暴露的读方法名 + write="send_command", # 通信设备上暴露的写方法名 + extra_info=["list_ports"], # 可选:额外暴露的方法 +) +``` + +**`name` 字段的含义**:对应设备类 `__init__` 中,用于保存通信实例的**属性名**。系统据此知道要替换哪个属性。大部分设备直接用 `"hardware_interface"`,也可以自定义(如 `"io_device_port"`)。 + +### 示例 1:泵(name="hardware_interface") + +```python +from unilabos.registry.decorators import device, HardwareInterface + +@device( + id="my_pump", + category=["pump_and_valve"], + hardware_interface=HardwareInterface( + name="hardware_interface", + read="send_command", + write="send_command", + ), +) +class MyPump: + def __init__(self, port=None, address="1", **kwargs): + # name="hardware_interface" → 系统替换 self.hardware_interface + self.hardware_interface = port # 初始为字符串 "serial_pump",启动后被替换为 Serial 实例 + self.address = address + + def send_command(self, command: str): + full_command = f"/{self.address}{command}\r\n" + self.hardware_interface.write(bytearray(full_command, "ascii")) + return self.hardware_interface.read_until(b"\n") +``` + +### 示例 2:电磁阀(name="io_device_port",自定义属性名) + +```python +@device( + id="solenoid_valve", + category=["pump_and_valve"], + hardware_interface=HardwareInterface( + name="io_device_port", # 自定义属性名 → 系统替换 self.io_device_port + read="read_io_coil", + write="write_io_coil", + ), +) +class SolenoidValve: + def __init__(self, io_device_port: str = None, **kwargs): + # name="io_device_port" → 图文件 config 中用 "io_device_port": "io_board_1" + self.io_device_port = io_device_port # 初始为字符串,系统替换为 Modbus 实例 +``` + +### Serial 通信设备(class="serial") + +`serial` 是 Uni-Lab-OS 内置的通信代理设备,代码位于 `unilabos/ros/nodes/presets/serial_node.py`: + +```python +from serial import Serial, SerialException +from threading import Lock + +class ROS2SerialNode(BaseROS2DeviceNode): + def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, **kwargs): + self.port = port + self.baudrate = baudrate + self._hardware_interface = { + "name": "hardware_interface", + "write": "send_command", + "read": "read_data", + } + self._query_lock = Lock() + + self.hardware_interface = Serial(baudrate=baudrate, port=port) + + BaseROS2DeviceNode.__init__( + self, driver_instance=self, registry_name=registry_name, + device_id=device_id, status_types={}, action_value_mappings={}, + hardware_interface=self._hardware_interface, print_publish=False, + ) + self.create_service(SerialCommand, "serialwrite", self.handle_serial_request) + + def send_command(self, command: str): + with self._query_lock: + self.hardware_interface.write(bytearray(f"{command}\n", "ascii")) + return self.hardware_interface.read_until(b"\n").decode() + + def read_data(self): + with self._query_lock: + return self.hardware_interface.read_until(b"\n").decode() +``` + +在图文件中使用 `"class": "serial"` 即可创建串口代理: + +```json +{ + "id": "serial_pump", + "class": "serial", + "parent": "my_station", + "config": { "port": "COM7", "baudrate": 9600 } +} +``` + +### 图文件配置 + +**通信设备必须在 `children` 列表中排在最前面**,确保先于其他子设备初始化: + +```json +{ + "nodes": [ + { + "id": "my_station", + "class": "workstation", + "children": ["serial_pump", "pump_1", "pump_2"], + "config": { "protocol_type": ["PumpTransferProtocol"] } + }, + { + "id": "serial_pump", + "class": "serial", + "parent": "my_station", + "config": { "port": "COM7", "baudrate": 9600 } + }, + { + "id": "pump_1", + "class": "syringe_pump_with_valve.runze.SY03B-T08", + "parent": "my_station", + "config": { "port": "serial_pump", "address": "1", "max_volume": 25.0 } + }, + { + "id": "pump_2", + "class": "syringe_pump_with_valve.runze.SY03B-T08", + "parent": "my_station", + "config": { "port": "serial_pump", "address": "2", "max_volume": 25.0 } + } + ], + "links": [ + { + "source": "pump_1", + "target": "serial_pump", + "type": "communication", + "port": { "pump_1": "port", "serial_pump": "port" } + }, + { + "source": "pump_2", + "target": "serial_pump", + "type": "communication", + "port": { "pump_2": "port", "serial_pump": "port" } + } + ] +} +``` + +### 通信协议速查 + +| 协议 | config 参数 | 依赖包 | 通信设备 class | +| -------------------- | ------------------------------ | ---------- | -------------------------- | +| Serial (RS232/RS485) | `port`, `baudrate` | `pyserial` | `serial` | +| Modbus RTU | `port`, `baudrate`, `slave_id` | `pymodbus` | `device_comms/modbus_plc/` | +| Modbus TCP | `host`, `port`, `slave_id` | `pymodbus` | `device_comms/modbus_plc/` | +| TCP Socket | `host`, `port` | stdlib | 自定义 | +| HTTP API | `url`, `token` | `requests` | `device_comms/rpc.py` | + +参考实现:`unilabos/test/experiments/Grignard_flow_batchreact_single_pumpvalve.json` + +--- + +## Deck 与物料生命周期 + +### 1. Deck 入参与两种初始化模式 + +系统根据设备节点 `config.deck` 的写法,自动反序列化 Deck 实例后传入 `__init__` 的 `deck` 参数。目前 `deck` 是固定字段名,只支持一个主 Deck。建议一个设备拥有一个台面,台面上抽象二级、三级子物料。 + +有两种初始化模式: + +#### init 初始化(推荐) + +`config.deck` 直接包含 `_resource_type` + `_resource_child_name`,系统先用 Deck 节点的 `config` 调用 Deck 类的 `__init__` 反序列化,再将实例传入设备的 `deck` 参数。子物料随 Deck 的 `children` 一起反序列化。 + +```json +"config": { + "deck": { + "_resource_type": "unilabos.devices.liquid_handling.prcxi.prcxi:PRCXI9300Deck", + "_resource_child_name": "PRCXI_Deck" + } +} +``` + +#### deserialize 初始化 + +`config.deck` 用 `data` 包裹一层,系统走 `deserialize` 路径,可传入更多参数(如 `allow_marshal` 等): + +```json +"config": { + "deck": { + "data": { + "_resource_child_name": "YB_Bioyond_Deck", + "_resource_type": "unilabos.resources.bioyond.decks:BIOYOND_YB_Deck" + } + } +} +``` + +没有特殊需求时推荐 init 初始化。 + +#### config.deck 字段说明 + +| 字段 | 说明 | +|------|------| +| `_resource_type` | Deck 类的完整模块路径(`module:ClassName`) | +| `_resource_child_name` | 对应图文件中 Deck 节点的 `id`,建立父子关联 | + +#### 设备 __init__ 接收 + +```python +def __init__(self, config=None, deck=None, protocol_type=None, **kwargs): + super().__init__(deck=deck, **kwargs) + # deck 已经是反序列化后的 Deck 实例 + # → PRCXI9300Deck / BIOYOND_YB_Deck 等 +``` + +#### Deck 节点(图文件中) + +Deck 节点作为设备的 `children` 之一,`parent` 指向设备 id: + +```json +{ + "id": "PRCXI_Deck", + "parent": "PRCXI", + "type": "deck", + "class": "", + "children": [], + "config": { + "type": "PRCXI9300Deck", + "size_x": 542, "size_y": 374, "size_z": 0, + "category": "deck", + "sites": [...] + }, + "data": {} +} +``` + +- `config` 中的字段会传入 Deck 类的 `__init__`(因此 `__init__` 必须能接受所有 `serialize()` 输出的字段) +- `children` 初始为空时,由同步器或手动初始化填充 +- `config.type` 填 Deck 类名 + +### 2. Deck 为空时自行初始化 + +如果 Deck 节点的 `children` 为空,工作站需在 `post_init` 或首次同步时自行初始化内容: + +```python +@not_action +def post_init(self, ros_node): + super().post_init(ros_node) + if self.deck and not self.deck.children: + self._initialize_default_deck() + +def _initialize_default_deck(self): + from my_labware import My_TipRack, My_Plate + self.deck.assign_child_resource(My_TipRack("T1"), spot=0) + self.deck.assign_child_resource(My_Plate("T2"), spot=1) +``` + +### 3. 物料双向同步 + +当工作站对接外部系统(LIMS/MES)时,需要实现 `ResourceSynchronizer` 处理双向物料同步: + +```python +from unilabos.devices.workstation.workstation_base import ResourceSynchronizer + +class MyResourceSynchronizer(ResourceSynchronizer): + def sync_from_external(self) -> bool: + """从外部系统同步到 self.workstation.deck""" + external_data = self._query_external_materials() + # 以外部工站为准:根据外部数据反向创建 PLR 资源实例 + for item in external_data: + cls = self._resolve_resource_class(item["type"]) + resource = cls(name=item["name"], **item["params"]) + self.workstation.deck.assign_child_resource(resource, spot=item["slot"]) + return True + + def sync_to_external(self, resource) -> bool: + """将 UniLab 侧物料变更同步到外部系统""" + # 以 UniLab 为准:将 PLR 资源转为外部格式并推送 + external_format = self._convert_to_external(resource) + return self._push_to_external(external_format) + + def handle_external_change(self, change_info) -> bool: + """处理外部系统主动推送的变更""" + return True +``` + +同步策略取决于业务场景: + +- **以外部工站为准**:从外部 API 查询物料数据,反向创建对应的 PLR 资源实例放到 Deck 上 +- **以 UniLab 为准**:UniLab 侧的物料变更通过 `sync_to_external` 推送到外部系统 + +在工作站 `post_init` 中初始化同步器: + +```python +@not_action +def post_init(self, ros_node): + super().post_init(ros_node) + self.resource_synchronizer = MyResourceSynchronizer(self) + self.resource_synchronizer.sync_from_external() +``` + +### 4. 序列化与持久化(serialize / serialize_state) + +资源类需正确实现序列化,系统据此完成持久化和前端同步。 + +**`serialize()`** — 输出资源的结构信息(`config` 层),反序列化时作为 `__init__` 的入参回传。因此 **`__init__` 必须通过 `**kwargs`接受`serialize()` 输出的所有字段\*\*,即使当前不使用: + +```python +class MyDeck(Deck): + def __init__(self, name, size_x, size_y, size_z, + sites=None, # serialize() 输出的字段 + rotation=None, # serialize() 输出的字段 + barcode=None, # serialize() 输出的字段 + **kwargs): # 兜底:接受所有未知的 serialize 字段 + super().__init__(size_x, size_y, size_z, name) + # ... + + def serialize(self) -> dict: + data = super().serialize() + data["sites"] = [...] # 自定义字段 + return data +``` + +**`serialize_state()`** — 输出资源的运行时状态(`data` 层),用于持久化可变信息。`data` 中的内容会被正确保存和恢复: + +```python +class MyPlate(Plate): + def __init__(self, name, size_x, size_y, size_z, + material_info=None, **kwargs): + super().__init__(name, size_x, size_y, size_z, **kwargs) + self._unilabos_state = {} + if material_info: + self._unilabos_state["Material"] = material_info + + def serialize_state(self) -> Dict[str, Any]: + data = super().serialize_state() + data.update(self._unilabos_state) + return data +``` + +关键要点: + +- `serialize()` 输出的所有字段都会作为 `config` 回传到 `__init__`,所以 `__init__` 必须能接受它们(显式声明或 `**kwargs`) +- `serialize_state()` 输出的 `data` 用于持久化运行时状态(如物料信息、液体量等) +- `_unilabos_state` 中只存可 JSON 序列化的基本类型(str, int, float, bool, list, dict, None) + +### 5. 子物料自动同步 + +子物料(Bottle、Plate、TipRack 等)放到 Deck 上后,系统会自动将其同步到前端的 Deck 视图。只需保证资源类正确实现了 `serialize()` / `serialize_state()` 和反序列化即可。 + +### 6. 图文件配置(参考 prcxi_9320_slim.json) + +```json +{ + "nodes": [ + { + "id": "my_station", + "type": "device", + "class": "my_workstation", + "config": { + "deck": { + "_resource_type": "unilabos.resources.my_module:MyDeck", + "_resource_child_name": "my_deck" + }, + "host": "10.20.30.1", + "port": 9999 + } + }, + { + "id": "my_deck", + "parent": "my_station", + "type": "deck", + "class": "", + "children": [], + "config": { + "type": "MyLabDeck", + "size_x": 542, + "size_y": 374, + "size_z": 0, + "category": "deck", + "sites": [ + { + "label": "T1", + "visible": true, + "occupied_by": null, + "position": { "x": 0, "y": 0, "z": 0 }, + "size": { "width": 128.0, "height": 86, "depth": 0 }, + "content_type": ["plate", "tip_rack", "tube_rack", "adaptor"] + } + ] + }, + "data": {} + } + ], + "edges": [] +} +``` + +Deck 节点要点: + +- `config.type` 填 Deck 类名(如 `"PRCXI9300Deck"`) +- `config.sites` 完整列出所有 site(从 Deck 类的 `serialize()` 输出获取) +- `children` 初始为空(由同步器或手动初始化填充) +- 设备节点 `config.deck._resource_type` 指向 Deck 类的完整模块路径 + +--- + +## 子设备 + +子设备按标准设备接入流程创建(参见 add-device SKILL),使用 `@device` 装饰器。 + +子设备约束: + +- 图文件中 `parent` 指向工作站 ID +- 在工作站 `children` 数组中列出 + +--- + +## 关键规则 + +1. **`__init__` 必须接受 `deck` 和 `**kwargs`** — `WorkstationBase.**init**`需要`deck` 参数 +2. **Deck 通过 `config.deck._resource_type` 反序列化传入** — 不要在 `__init__` 中手动创建 Deck +3. **Deck 为空时自行初始化内容** — 在 `post_init` 中检查并填充默认物料 +4. **外部同步实现 `ResourceSynchronizer`** — `sync_from_external` / `sync_to_external` +5. **通过 `self._children` 访问子设备** — 不要自行维护子设备引用 +6. **`post_init` 中启动后台服务** — 不要在 `__init__` 中启动网络连接 +7. **异步方法使用 `await self._ros_node.sleep()`** — 禁止 `time.sleep()` 和 `asyncio.sleep()` +8. **使用 `@not_action` 标记非动作方法** — `post_init`, `initialize`, `cleanup` +9. **子物料保证正确 serialize/deserialize** — 系统自动同步到前端 Deck 视图 + +--- + +## 验证 + +```bash +# 模块可导入 +python -c "from unilabos.devices.workstation.. import " + +# 启动测试(AST 自动扫描) +unilab -g .json +``` + +--- + +## 现有工作站参考 + +| 工作站 | 驱动类 | 类型 | +| -------------- | ----------------------------- | -------- | +| Protocol 通用 | `ProtocolNode` | Protocol | +| Bioyond 反应站 | `BioyondReactionStation` | 外部系统 | +| 纽扣电池组装 | `CoinCellAssemblyWorkstation` | 硬件控制 | + +参考路径:`unilabos/devices/workstation/` 目录下各工作站实现。 diff --git a/.cursor/skills/add-workstation/reference.md b/.cursor/skills/add-workstation/reference.md new file mode 100644 index 00000000..0c1b9f0d --- /dev/null +++ b/.cursor/skills/add-workstation/reference.md @@ -0,0 +1,371 @@ +# 工作站高级模式参考 + +本文件是 SKILL.md 的补充,包含外部系统集成、物料同步、配置结构等高级模式。 +Agent 在需要实现这些功能时按需阅读。 + +--- + +## 1. 外部系统集成模式 + +### 1.1 RPC 客户端 + +与外部 LIMS/MES 系统通信的标准模式。继承 `BaseRequest`,所有接口统一用 POST。 + +```python +from unilabos.device_comms.rpc import BaseRequest + + +class MySystemRPC(BaseRequest): + """外部系统 RPC 客户端""" + + def __init__(self, host: str, api_key: str): + super().__init__(host) + self.api_key = api_key + + def _request(self, endpoint: str, data: dict = None) -> dict: + return self.post( + url=f"{self.host}/api/{endpoint}", + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": data or {}, + }, + ) + + def query_status(self) -> dict: + return self._request("status/query") + + def create_order(self, order_data: dict) -> dict: + return self._request("order/create", order_data) +``` + +参考:`unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py`(`BioyondV1RPC`) + +### 1.2 HTTP 回调服务 + +接收外部系统报送的标准模式。使用 `WorkstationHTTPService`,在 `post_init` 中启动。 + +```python +from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService + + +class MyWorkstation(WorkstationBase): + def __init__(self, config=None, deck=None, **kwargs): + super().__init__(deck=deck, **kwargs) + self.config = config or {} + http_cfg = self.config.get("http_service_config", {}) + self._http_service_config = { + "host": http_cfg.get("http_service_host", "127.0.0.1"), + "port": http_cfg.get("http_service_port", 8080), + } + self.http_service = None + + def post_init(self, ros_node): + super().post_init(ros_node) + self.http_service = WorkstationHTTPService( + workstation_instance=self, + host=self._http_service_config["host"], + port=self._http_service_config["port"], + ) + self.http_service.start() +``` + +**HTTP 服务路由**(固定端点,由 `WorkstationHTTPHandler` 自动分发): + +| 端点 | 调用的工作站方法 | +|------|-----------------| +| `/report/step_finish` | `process_step_finish_report(report_request)` | +| `/report/sample_finish` | `process_sample_finish_report(report_request)` | +| `/report/order_finish` | `process_order_finish_report(report_request, used_materials)` | +| `/report/material_change` | `process_material_change_report(report_data)` | +| `/report/error_handling` | `handle_external_error(error_data)` | + +实现对应方法即可接收回调: + +```python +def process_step_finish_report(self, report_request) -> Dict[str, Any]: + """处理步骤完成报告""" + step_name = report_request.data.get("stepName") + return {"success": True, "message": f"步骤 {step_name} 已处理"} + +def process_order_finish_report(self, report_request, used_materials) -> Dict[str, Any]: + """处理订单完成报告""" + order_code = report_request.data.get("orderCode") + return {"success": True} +``` + +参考:`unilabos/devices/workstation/workstation_http_service.py` + +### 1.3 连接监控 + +独立线程周期性检测外部系统连接状态,状态变化时发布 ROS 事件。 + +```python +class ConnectionMonitor: + def __init__(self, workstation, check_interval=30): + self.workstation = workstation + self.check_interval = check_interval + self._running = False + self._thread = None + + def start(self): + self._running = True + self._thread = threading.Thread(target=self._monitor_loop, daemon=True) + self._thread.start() + + def _monitor_loop(self): + while self._running: + try: + # 调用外部系统接口检测连接 + self.workstation.hardware_interface.ping() + status = "online" + except Exception: + status = "offline" + time.sleep(self.check_interval) +``` + +参考:`unilabos/devices/workstation/bioyond_studio/station.py`(`ConnectionMonitor`) + +--- + +## 2. Config 结构模式 + +工作站的 `config` 在图文件中定义,传入 `__init__`。以下是常见字段模式: + +### 2.1 外部系统连接 + +```json +{ + "api_host": "http://192.168.1.100:8080", + "api_key": "YOUR_API_KEY" +} +``` + +### 2.2 HTTP 回调服务 + +```json +{ + "http_service_config": { + "http_service_host": "127.0.0.1", + "http_service_port": 8080 + } +} +``` + +### 2.3 物料类型映射 + +将 PLR 资源类名映射到外部系统的物料类型(名称 + UUID)。用于双向物料转换。 + +```json +{ + "material_type_mappings": { + "PLR_ResourceClassName": ["外部系统显示名", "external-type-uuid"], + "BIOYOND_PolymerStation_Reactor": ["反应器", "3a14233b-902d-0d7b-..."] + } +} +``` + +### 2.4 仓库映射 + +将仓库名映射到外部系统的仓库 UUID 和库位 UUID。用于入库/出库操作。 + +```json +{ + "warehouse_mapping": { + "仓库名": { + "uuid": "warehouse-uuid", + "site_uuids": { + "A01": "site-uuid-A01", + "A02": "site-uuid-A02" + } + } + } +} +``` + +### 2.5 工作流映射 + +将内部工作流名映射到外部系统的工作流 ID。 + +```json +{ + "workflow_mappings": { + "internal_workflow_name": "external-workflow-uuid" + } +} +``` + +### 2.6 物料默认参数 + +```json +{ + "material_default_parameters": { + "NMP": { + "unit": "毫升", + "density": "1.03", + "densityUnit": "g/mL", + "description": "N-甲基吡咯烷酮" + } + } +} +``` + +--- + +## 3. 资源同步机制 + +### 3.1 ResourceSynchronizer + +抽象基类,用于与外部物料系统双向同步。定义在 `workstation_base.py`。 + +```python +from unilabos.devices.workstation.workstation_base import ResourceSynchronizer + + +class MyResourceSynchronizer(ResourceSynchronizer): + def __init__(self, workstation, api_client): + super().__init__(workstation) + self.api_client = api_client + + def sync_from_external(self) -> bool: + """从外部系统拉取物料到 deck""" + external_materials = self.api_client.list_materials() + for material in external_materials: + plr_resource = self._convert_to_plr(material) + self.workstation.deck.assign_child_resource(plr_resource, coordinate) + return True + + def sync_to_external(self, plr_resource) -> bool: + """将 deck 中的物料变更推送到外部系统""" + external_data = self._convert_from_plr(plr_resource) + self.api_client.update_material(external_data) + return True + + def handle_external_change(self, change_info) -> bool: + """处理外部系统推送的物料变更""" + return True +``` + +### 3.2 update_resource — 上传资源树到云端 + +将 PLR Deck 序列化后通过 ROS 服务上传。典型使用场景: + +```python +# 在 post_init 中上传初始 deck +from unilabos.ros.nodes.base_device_node import ROS2DeviceNode + +ROS2DeviceNode.run_async_func( + self._ros_node.update_resource, True, + **{"resources": [self.deck]} +) + +# 在动作方法中更新特定资源 +ROS2DeviceNode.run_async_func( + self._ros_node.update_resource, True, + **{"resources": [updated_plate]} +) +``` + +--- + +## 4. 工作流序列管理 + +工作站通过 `workflow_sequence` 属性管理任务队列(JSON 字符串形式)。 + +```python +class MyWorkstation(WorkstationBase): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._workflow_sequence = [] + + @property + def workflow_sequence(self) -> str: + """返回 JSON 字符串,ROS 自动发布""" + import json + return json.dumps(self._workflow_sequence) + + async def append_to_workflow_sequence(self, workflow_name: str) -> Dict[str, Any]: + """添加工作流到队列""" + self._workflow_sequence.append({ + "name": workflow_name, + "status": "pending", + "created_at": time.time(), + }) + return {"success": True} + + async def clear_workflows(self) -> Dict[str, Any]: + """清空工作流队列""" + self._workflow_sequence = [] + return {"success": True} +``` + +--- + +## 5. 站间物料转移 + +工作站之间转移物料的模式。通过 ROS ActionClient 调用目标站的动作。 + +```python +async def transfer_materials_to_another_station( + self, + target_device_id: str, + transfer_groups: list, + **kwargs, +) -> Dict[str, Any]: + """将物料转移到另一个工作站""" + target_node = self._children.get(target_device_id) + if not target_node: + # 通过 ROS 节点查找非子设备的目标站 + pass + + for group in transfer_groups: + resource = self.find_resource_by_name(group["resource_name"]) + # 从本站 deck 移除 + resource.unassign() + # 调用目标站的接收方法 + # ... + + return {"success": True, "transferred": len(transfer_groups)} +``` + +参考:`BioyondDispensingStation.transfer_materials_to_reaction_station` + +--- + +## 6. post_init 完整模式 + +`post_init` 是工作站初始化的关键阶段,此时 ROS 节点和子设备已就绪。 + +```python +def post_init(self, ros_node): + super().post_init(ros_node) + + # 1. 初始化外部系统客户端(此时 config 已可用) + self.rpc_client = MySystemRPC( + host=self.config.get("api_host"), + api_key=self.config.get("api_key"), + ) + self.hardware_interface = self.rpc_client + + # 2. 启动连接监控 + self.connection_monitor = ConnectionMonitor(self) + self.connection_monitor.start() + + # 3. 启动 HTTP 回调服务 + if hasattr(self, '_http_service_config'): + self.http_service = WorkstationHTTPService( + workstation_instance=self, + host=self._http_service_config["host"], + port=self._http_service_config["port"], + ) + self.http_service.start() + + # 4. 上传 deck 到云端 + ROS2DeviceNode.run_async_func( + self._ros_node.update_resource, True, + **{"resources": [self.deck]} + ) + + # 5. 初始化资源同步器(可选) + self.resource_synchronizer = MyResourceSynchronizer(self, self.rpc_client) +```