Compare commits

...

15 Commits

Author SHA1 Message Date
Xuwznln
0f6264503a new registry sys
exp. support with add device
2026-03-21 19:26:24 +08:00
Junhan Chang
2c554182d3 add ai conventions 2026-03-19 14:14:40 +08:00
Xuwznln
6d319d91ff correct raise create resource error 2026-03-10 16:26:37 +08:00
Xuwznln
3155b2f97e ret info fix revert 2026-03-10 16:04:27 +08:00
Xuwznln
e5e30a1c7d ret info fix 2026-03-10 16:00:24 +08:00
Xuwznln
4e82f62327 fix prcxi check 2026-03-10 15:57:27 +08:00
Xuwznln
95d3456214 add create_resource schema 2026-03-10 15:27:39 +08:00
Xuwznln
38bf95b13c re signal host ready event 2026-03-10 14:13:06 +08:00
Xuwznln
f2c0bec02c add websocket connection timeout and improve reconnection logic
add open_timeout parameter to websocket connection
add TimeoutError and InvalidStatus exception handling
implement exponential backoff for reconnection attempts
simplify reconnection logic flow
2026-03-07 04:40:56 +08:00
Xuwznln
e0394bf414 Merge remote-tracking branch 'origin/dev' into dev 2026-03-04 19:18:55 +08:00
Xuwznln
975a56415a import gzip 2026-03-04 19:18:36 +08:00
Xuwznln
cadbe87e3f add gzip 2026-03-04 19:18:19 +08:00
Xuwznln
b993c1f590 add gzip 2026-03-04 19:18:09 +08:00
Xuwznln
e0fae94c10 change pose extra to any 2026-03-04 19:06:58 +08:00
Xuwznln
b5cd181ac1 add isFlapY 2026-03-04 18:59:45 +08:00
35 changed files with 5649 additions and 1235 deletions

View File

@@ -49,7 +49,7 @@ jobs:
uv pip uninstall enum34 || echo enum34 not installed, skipping
uv pip install .
- name: Run check mode (complete_registry)
- name: Run check mode (AST registry validation)
run: |
call conda activate check-env
echo Running check mode...

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@ output/
unilabos_data/
pyrightconfig.json
.cursorignore
device_package*/
## Python
# Byte-compiled / optimized / DLL files

87
AGENTS.md Normal file
View File

@@ -0,0 +1,87 @@
# AGENTS.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Also follow the monorepo-level rules in `../AGENTS.md`.
## Build & Development
```bash
# Install in editable mode (requires mamba env with python 3.11)
pip install -e .
uv pip install -r unilabos/utils/requirements.txt
# Run with a device graph
unilab --graph <graph.json> --config <config.py> --backend ros
unilab --graph <graph.json> --config <config.py> --backend simple # no ROS2 needed
# Common CLI flags
unilab --app_bridges websocket fastapi # communication bridges
unilab --test_mode # simulate hardware, no real execution
unilab --check_mode # CI validation of registry imports
unilab --skip_env_check # skip auto-install of dependencies
unilab --visual rviz|web|disable # visualization mode
unilab --is_slave # run as slave node
# Workflow upload subcommand
unilab workflow_upload -f <workflow.json> -n <name> --tags tag1 tag2
# Tests
pytest tests/ # all tests
pytest tests/resources/test_resourcetreeset.py # single test file
pytest tests/resources/test_resourcetreeset.py::TestClassName::test_method # single test
```
## Architecture
### Startup Flow
`unilab` CLI → `unilabos/app/main.py:main()` → loads config → builds registry → reads device graph (JSON/GraphML) → starts backend thread (ROS2/simple) → starts FastAPI web server + WebSocket client.
### Core Layers
**Registry** (`unilabos/registry/`): Singleton `Registry` class discovers and catalogs all device types, resource types, and communication devices from YAML definitions. Device types live in `registry/devices/*.yaml`, resources in `registry/resources/`, comms in `registry/device_comms/`. The registry resolves class paths to actual Python classes via `utils/import_manager.py`.
**Resource Tracking** (`unilabos/resources/resource_tracker.py`): Pydantic-based `ResourceDict``ResourceDictInstance``ResourceTreeSet` hierarchy. `ResourceTreeSet` is the canonical in-memory representation of all devices and resources, used throughout the system. Graph I/O is in `resources/graphio.py` (reads JSON/GraphML device topology files into `nx.Graph` + `ResourceTreeSet`).
**Device Drivers** (`unilabos/devices/`): 30+ hardware drivers organized by device type (liquid_handling, hplc, balance, arm, etc.). Each driver is a Python class that gets wrapped by `ros/device_node_wrapper.py:ros2_device_node()` to become a ROS2 node with publishers, subscribers, and action servers.
**ROS2 Layer** (`unilabos/ros/`): `device_node_wrapper.py` dynamically wraps any device class into `ROS2DeviceNode` (defined in `ros/nodes/base_device_node.py`). Preset node types in `ros/nodes/presets/` include `host_node`, `controller_node`, `workstation`, `serial_node`, `camera`. Messages use custom `unilabos_msgs` (pre-built, distributed via releases).
**Protocol Compilation** (`unilabos/compile/`): 20+ protocol compilers (add, centrifuge, dissolve, filter, heatchill, stir, pump, etc.) that transform YAML protocol definitions into executable sequences.
**Communication** (`unilabos/device_comms/`): Hardware communication adapters — OPC-UA client, Modbus PLC, RPC, and a universal driver. `app/communication.py` provides a factory pattern for WebSocket client connections to the cloud.
**Web/API** (`unilabos/app/web/`): FastAPI server with REST API (`api.py`), Jinja2 template pages (`pages.py`), and HTTP client for cloud communication (`client.py`). Runs on port 8002 by default.
### Configuration System
- **Config classes** in `unilabos/config/config.py`: `BasicConfig`, `WSConfig`, `HTTPConfig`, `ROSConfig` — all class-level attributes, loaded from Python config files
- Config files are `.py` files with matching class names (see `config/example_config.py`)
- Environment variables override with prefix `UNILABOS_` (e.g., `UNILABOS_BASICCONFIG_PORT=9000`)
- Device topology defined in graph files (JSON with node-link format, or GraphML)
### Key Data Flow
1. Graph file → `graphio.read_node_link_json()``(nx.Graph, ResourceTreeSet, resource_links)`
2. `ResourceTreeSet` + `Registry``initialize_device.initialize_device_from_dict()``ROS2DeviceNode` instances
3. Device nodes communicate via ROS2 topics/actions or direct Python calls (simple backend)
4. Cloud sync via WebSocket (`app/ws_client.py`) and HTTP (`app/web/client.py`)
### Test Data
Example device graphs and experiment configs are in `unilabos/test/experiments/` (not `tests/`). Registry test fixtures in `unilabos/test/registry/`.
## Code Conventions
- Code comments and log messages in simplified Chinese
- Python 3.11+, type hints expected
- Pydantic models for data validation (`resource_tracker.py`)
- Singleton pattern via `@singleton` decorator (`utils/decorator.py`)
- Dynamic class loading via `utils/import_manager.py` — device classes resolved at runtime from registry YAML paths
- CLI argument dashes auto-converted to underscores for consistency
## Licensing
- Framework code: GPL-3.0
- Device drivers (`unilabos/devices/`): DP Technology Proprietary License — do not redistribute

4
CLAUDE.md Normal file
View File

@@ -0,0 +1,4 @@
Please follow the rules defined in:
@AGENTS.md

View File

@@ -15,6 +15,9 @@ Python 类设备驱动在完成注册表后可以直接在 Uni-Lab 中使用,
**示例:**
```python
from unilabos.registry.decorators import device, topic_config
@device(id="mock_gripper", category=["gripper"], description="Mock Gripper")
class MockGripper:
def __init__(self):
self._position: float = 0.0
@@ -23,19 +26,23 @@ class MockGripper:
self._status = "Idle"
@property
@topic_config() # 添加 @topic_config 才会定时广播
def position(self) -> float:
return self._position
@property
@topic_config()
def velocity(self) -> float:
return self._velocity
@property
@topic_config()
def torque(self) -> float:
return self._torque
# 会被自动识别的设备属性,接入 Uni-Lab 时会定时对外广播
# 使用 @topic_config 装饰的属性,接入 Uni-Lab 时会定时对外广播
@property
@topic_config(period=2.0) # 可自定义发布周期
def status(self) -> str:
return self._status
@@ -149,7 +156,7 @@ my_device: # 设备唯一标识符
系统会自动分析您的 Python 驱动类并生成:
- `status_types`:从 `@property` 装饰的方法自动识别状态属性
- `status_types`:从 `@topic_config` 装饰的 `@property` 方法自动识别状态属性
- `action_value_mappings`:从类方法自动生成动作映射
- `init_param_schema`:从 `__init__` 方法分析初始化参数
- `schema`:前端显示用的属性类型定义
@@ -179,7 +186,9 @@ Uni-Lab 设备驱动是一个 Python 类,需要遵循以下结构:
```python
from typing import Dict, Any
from unilabos.registry.decorators import device, topic_config
@device(id="my_device", category=["general"], description="My Device")
class MyDevice:
"""设备类文档字符串
@@ -198,8 +207,9 @@ class MyDevice:
# 初始化硬件连接
@property
@topic_config() # 必须添加 @topic_config 才会广播
def status(self) -> str:
"""设备状态(会自动广播)"""
"""设备状态(通过 @topic_config 广播)"""
return self._status
def my_action(self, param: float) -> Dict[str, Any]:
@@ -217,34 +227,61 @@ class MyDevice:
## 状态属性 vs 动作方法
### 状态属性(@property
### 状态属性(@property + @topic_config
状态属性会被自动识别并定期广播:
状态属性需要同时使用 `@property``@topic_config` 装饰器才会被识别并定期广播:
```python
from unilabos.registry.decorators import topic_config
@property
@topic_config() # 必须添加,否则不会广播
def temperature(self) -> float:
"""当前温度"""
return self._read_temperature()
@property
@topic_config(period=2.0) # 可自定义发布周期(秒)
def status(self) -> str:
"""设备状态: idle, running, error"""
return self._status
@property
@topic_config(name="ready") # 可自定义发布名称
def is_ready(self) -> bool:
"""设备是否就绪"""
return self._status == "idle"
```
也可以使用普通方法(非 @property)配合 `@topic_config`
```python
@topic_config(period=10.0)
def get_sensor_data(self) -> Dict[str, float]:
"""获取传感器数据get_ 前缀会自动去除,发布名为 sensor_data"""
return {"temp": self._temp, "humidity": self._humidity}
```
**`@topic_config` 参数**:
| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `period` | float | 5.0 | 发布周期(秒) |
| `print_publish` | bool | 节点默认 | 是否打印发布日志 |
| `qos` | int | 10 | QoS 深度 |
| `name` | str | None | 自定义发布名称 |
**发布名称优先级**`@topic_config(name=...)` > `get_` 前缀去除 > 方法名
**特点**:
- 使用`@property`装饰器
- 只读,不能有参数
- 自动添加到注册表的`status_types`
- 必须使用 `@topic_config` 装饰器
- 支持 `@property` 和普通方法
- 添加到注册表的 `status_types`
- 定期发布到 ROS2 topic
> **⚠️ 重要:** 仅有 `@property` 装饰器而没有 `@topic_config` 的属性**不会**被广播。这是一个 Breaking Change。
### 动作方法
动作方法是设备可以执行的操作:
@@ -497,6 +534,7 @@ class LiquidHandler:
self._status = "idle"
@property
@topic_config()
def status(self) -> str:
return self._status
@@ -886,7 +924,52 @@ class MyDevice:
## 最佳实践
### 1. 类型注解
### 1. 使用 `@device` 装饰器标识设备
```python
from unilabos.registry.decorators import device
@device(id="my_device", category=["heating"], description="My Heating Device", icon="heater.webp")
class MyDevice:
...
```
- `id`:设备唯一标识符,用于注册表匹配
- `category`:分类列表,前端用于分组显示
- `description`:设备描述
- `icon`:图标文件名(可选)
### 2. 使用 `@topic_config` 声明需要广播的状态
```python
from unilabos.registry.decorators import topic_config
# ✓ @property + @topic_config → 会广播
@property
@topic_config(period=2.0)
def temperature(self) -> float:
return self._temp
# ✓ 普通方法 + @topic_config → 会广播get_ 前缀自动去除)
@topic_config(period=10.0)
def get_sensor_data(self) -> Dict[str, float]:
return {"temp": self._temp}
# ✓ 使用 name 参数自定义发布名称
@property
@topic_config(name="ready")
def is_ready(self) -> bool:
return self._status == "idle"
# ✗ 仅有 @property没有 @topic_config → 不会广播
@property
def internal_state(self) -> str:
return self._state
```
> **注意:** 与 `@property` 连用时,`@topic_config` 必须放在 `@property` 下面。
### 3. 类型注解
```python
from typing import Dict, Any, Optional, List
@@ -901,7 +984,7 @@ def method(
pass
```
### 2. 文档字符串
### 4. 文档字符串
```python
def method(self, param: float) -> Dict[str, Any]:
@@ -923,7 +1006,7 @@ def method(self, param: float) -> Dict[str, Any]:
pass
```
### 3. 配置验证
### 5. 配置验证
```python
def __init__(self, config: Dict[str, Any]):
@@ -937,7 +1020,7 @@ def __init__(self, config: Dict[str, Any]):
self.baudrate = config['baudrate']
```
### 4. 资源清理
### 6. 资源清理
```python
def __del__(self):
@@ -946,7 +1029,7 @@ def __del__(self):
self.connection.close()
```
### 5. 设计前端友好的返回值
### 7. 设计前端友好的返回值
**记住:返回值会直接显示在 Web 界面**

View File

@@ -422,18 +422,20 @@ placeholder_keys:
### status_types
系统会扫描你的 Python 类,从状态方法property 或 get\_方法自动生成这部分:
系统会扫描你的 Python 类,从带有 `@topic_config` 装饰器的 `@property`方法自动生成这部分:
```yaml
status_types:
current_temperature: float # 从 get_current_temperature() 或 @property current_temperature
is_heating: bool # 从 get_is_heating() 或 @property is_heating
status: str # 从 get_status() 或 @property status
current_temperature: float # 从 @topic_config 装饰的 @property 或方法
is_heating: bool
status: str
```
**注意事项**
- 系统会查找所有 `get_` 开头的方法和 `@property` 装饰的属性
- 仅有带 `@topic_config` 装饰器的 `@property` 或方法才会被识别为状态属性
- 没有 `@topic_config``@property` 不会生成 status_types也不会广播
- `get_` 前缀的方法名会自动去除前缀(如 `get_temperature``temperature`
- 类型会自动转成相应的类型(如 `str``float``bool`
- 如果类型是 `Any``None` 或未知的,默认使用 `String`
@@ -537,11 +539,13 @@ class AdvancedLiquidHandler:
self._temperature = 25.0
@property
@topic_config()
def status(self) -> str:
"""设备状态"""
return self._status
@property
@topic_config()
def temperature(self) -> float:
"""当前温度"""
return self._temperature
@@ -809,21 +813,23 @@ my_temperature_controller:
你的设备类需要符合以下要求:
```python
from unilabos.common.device_base import DeviceBase
from unilabos.registry.decorators import device, topic_config
class MyDevice(DeviceBase):
@device(id="my_device", category=["temperature"], description="My Device")
class MyDevice:
def __init__(self, config):
"""初始化,参数会自动分析到 init_param_schema.config"""
super().__init__(config)
self.port = config.get('port', '/dev/ttyUSB0')
# 状态方法(会自动生成到 status_types
# 状态方法(必须添加 @topic_config 才会生成到 status_types 并广播
@property
@topic_config()
def status(self):
"""返回设备状态"""
return "idle"
@property
@topic_config()
def temperature(self):
"""返回当前温度"""
return 25.0
@@ -1039,7 +1045,34 @@ resource.type # "resource"
### 代码规范
1. **始终使用类型注解**
1. **使用 `@device` 装饰器标识设备类**
```python
from unilabos.registry.decorators import device
@device(id="my_device", category=["heating"], description="My Device")
class MyDevice:
...
```
2. **使用 `@topic_config` 声明广播属性**
```python
from unilabos.registry.decorators import topic_config
# ✓ 需要广播的状态属性
@property
@topic_config(period=2.0)
def temperature(self) -> float:
return self._temp
# ✗ 仅有 @property 不会广播
@property
def internal_counter(self) -> int:
return self._counter
```
3. **始终使用类型注解**
```python
# ✓ 好
@@ -1051,7 +1084,7 @@ def method(self, resource, device):
pass
```
2. **提供有意义的参数名**
4. **提供有意义的参数名**
```python
# ✓ 好 - 清晰的参数名
@@ -1063,7 +1096,7 @@ def transfer(self, r1: ResourceSlot, r2: ResourceSlot):
pass
```
3. **使用 Optional 表示可选参数**
5. **使用 Optional 表示可选参数**
```python
from typing import Optional
@@ -1076,7 +1109,7 @@ def method(
pass
```
4. **添加详细的文档字符串**
6. **添加详细的文档字符串**
```python
def method(
@@ -1096,13 +1129,13 @@ def method(
pass
```
5. **方法命名规范**
7. **方法命名规范**
- 状态方法使用 `@property` 装饰器或 `get_` 前缀
- 状态方法使用 `@property` + `@topic_config` 装饰器,或普通方法 + `@topic_config`
- 动作方法使用动词开头
- 保持命名清晰、一致
6. **完善的错误处理**
8. **完善的错误处理**
- 实现完善的错误处理
- 添加日志记录
- 提供有意义的错误信息

View File

@@ -221,10 +221,10 @@ Laboratory A Laboratory B
```bash
# 实验室A
unilab --ak your_ak --sk your_sk --upload_registry --use_remote_resource
unilab --ak your_ak --sk your_sk --upload_registry
# 实验室B
unilab --ak your_ak --sk your_sk --upload_registry --use_remote_resource
unilab --ak your_ak --sk your_sk --upload_registry
```
---

View File

@@ -22,7 +22,6 @@ options:
--is_slave Run the backend as slave node (without host privileges).
--slave_no_host Skip waiting for host service in slave mode
--upload_registry Upload registry information when starting unilab
--use_remote_resource Use remote resources when starting unilab
--config CONFIG Configuration file path, supports .py format Python config files
--port PORT Port for web service information page
--disable_browser Disable opening information page on startup
@@ -85,7 +84,7 @@ Uni-Lab 的启动过程分为以下几个阶段:
支持两种方式:
- **本地文件**:使用 `-g` 指定图谱文件(支持 JSON 和 GraphML 格式)
- **远程资源**使用 `--use_remote_resource` 从云端获取
- **远程资源**不指定本地文件即可
### 7. 注册表构建
@@ -196,7 +195,7 @@ unilab --config path/to/your/config.py
unilab --ak your_ak --sk your_sk -g path/to/graph.json --upload_registry
# 使用远程资源启动
unilab --ak your_ak --sk your_sk --use_remote_resource
unilab --ak your_ak --sk your_sk
# 更新注册表
unilab --ak your_ak --sk your_sk --complete_registry

View File

@@ -4,6 +4,7 @@ import os
import platform
import shutil
import signal
import subprocess
import sys
import threading
import time
@@ -25,6 +26,84 @@ from unilabos.config.config import load_config, BasicConfig, HTTPConfig
_restart_requested: bool = False
_restart_reason: str = ""
RESTART_EXIT_CODE = 42
def _build_child_argv():
"""Build sys.argv for child process, stripping supervisor-only arguments."""
result = []
skip_next = False
for arg in sys.argv:
if skip_next:
skip_next = False
continue
if arg in ("--restart_mode", "--restart-mode"):
continue
if arg in ("--auto_restart_count", "--auto-restart-count"):
skip_next = True
continue
if arg.startswith("--auto_restart_count=") or arg.startswith("--auto-restart-count="):
continue
result.append(arg)
return result
def _run_as_supervisor(max_restarts: int):
"""
Supervisor process that spawns and monitors child processes.
Similar to Uvicorn's --reload: the supervisor itself does no heavy work,
it only launches the real process as a child and restarts it when the child
exits with RESTART_EXIT_CODE.
"""
child_argv = [sys.executable] + _build_child_argv()
restart_count = 0
print_status(
f"[Supervisor] Restart mode enabled (max restarts: {max_restarts}), "
f"child command: {' '.join(child_argv)}",
"info",
)
while True:
print_status(
f"[Supervisor] Launching process (restart {restart_count}/{max_restarts})...",
"info",
)
try:
process = subprocess.Popen(child_argv)
exit_code = process.wait()
except KeyboardInterrupt:
print_status("[Supervisor] Interrupted, terminating child process...", "info")
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
sys.exit(1)
if exit_code == RESTART_EXIT_CODE:
restart_count += 1
if restart_count > max_restarts:
print_status(
f"[Supervisor] Maximum restart count ({max_restarts}) reached, exiting",
"warning",
)
sys.exit(1)
print_status(
f"[Supervisor] Child requested restart ({restart_count}/{max_restarts}), restarting in 2s...",
"info",
)
time.sleep(2)
else:
if exit_code != 0:
print_status(f"[Supervisor] Child exited with code {exit_code}", "warning")
else:
print_status("[Supervisor] Child exited normally", "info")
sys.exit(exit_code)
def load_config_from_file(config_path):
if config_path is None:
@@ -66,6 +145,13 @@ def parse_args():
action="append",
help="Path to the registry directory",
)
parser.add_argument(
"--devices",
type=str,
default=None,
action="append",
help="Path to Python code directory for AST-based device/resource scanning",
)
parser.add_argument(
"--working_dir",
type=str,
@@ -155,12 +241,6 @@ def parse_args():
action="store_true",
help="Skip environment dependency check on startup",
)
parser.add_argument(
"--complete_registry",
action="store_true",
default=False,
help="Complete registry information",
)
parser.add_argument(
"--check_mode",
action="store_true",
@@ -178,6 +258,24 @@ def parse_args():
default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware",
)
parser.add_argument(
"--extra_resource",
action="store_true",
default=False,
help="Load extra lab_ prefixed labware resources (529 auto-generated definitions from lab_resources.py)",
)
parser.add_argument(
"--restart_mode",
action="store_true",
default=False,
help="Enable supervisor mode: automatically restart the process when triggered via WebSocket",
)
parser.add_argument(
"--auto_restart_count",
type=int,
default=500,
help="Maximum number of automatic restarts in restart mode (default: 500)",
)
# workflow upload subcommand
workflow_parser = subparsers.add_parser(
"workflow_upload",
@@ -228,6 +326,11 @@ def main():
args = parser.parse_args()
args_dict = vars(args)
# Supervisor mode: spawn child processes and monitor for restart
if args_dict.get("restart_mode", False):
_run_as_supervisor(args_dict.get("auto_restart_count", 5))
return
# 环境检查 - 检查并自动安装必需的包 (可选)
skip_env_check = args_dict.get("skip_env_check", False)
check_mode = args_dict.get("check_mode", False)
@@ -358,6 +461,9 @@ def main():
BasicConfig.test_mode = args_dict.get("test_mode", False)
if BasicConfig.test_mode:
print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning")
BasicConfig.extra_resource = args_dict.get("extra_resource", False)
if BasicConfig.extra_resource:
print_status("启用额外资源加载将加载lab_开头的labware资源定义", "info")
BasicConfig.communication_protocol = "websocket"
machine_name = platform.node()
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
@@ -382,22 +488,30 @@ def main():
# 显示启动横幅
print_unilab_banner(args_dict)
# 注册表 - check_mode 时强制启用 complete_registry
complete_registry = args_dict.get("complete_registry", False) or check_mode
lab_registry = build_registry(args_dict["registry_path"], complete_registry, BasicConfig.upload_registry)
# Step 0: AST 分析优先 + YAML 注册表加载
# check_mode 和 upload_registry 都会执行实际 import 验证
devices_dirs = args_dict.get("devices", None)
lab_registry = build_registry(
registry_paths=args_dict["registry_path"],
devices_dirs=devices_dirs,
upload_registry=BasicConfig.upload_registry,
check_mode=check_mode,
)
# Check mode: complete_registry 完成后直接退出git diff 检测由 CI workflow 执行
# Check mode: 注册表验证完成后直接退出
if check_mode:
print_status("Check mode: complete_registry 完成,退出", "info")
device_count = len(lab_registry.device_type_registry)
resource_count = len(lab_registry.resource_type_registry)
print_status(f"Check mode: 注册表验证完成 ({device_count} 设备, {resource_count} 资源),退出", "info")
os._exit(0)
# Step 1: 上传全部注册表到服务端,同步保存到 unilabos_data
if BasicConfig.upload_registry:
# 设备注册到服务端 - 需要 ak 和 sk
if BasicConfig.ak and BasicConfig.sk:
print_status("开始注册设备到服务端...", "info")
# print_status("开始注册设备到服务端...", "info")
try:
register_devices_and_resources(lab_registry)
print_status("设备注册完成", "info")
# print_status("设备注册完成", "info")
except Exception as e:
print_status(f"设备注册失败: {e}", "error")
else:
@@ -482,7 +596,7 @@ def main():
continue
# 如果从远端获取了物料信息,则与本地物料进行同步
if request_startup_json and "nodes" in request_startup_json:
if file_path is not None and request_startup_json and "nodes" in request_startup_json:
print_status("开始同步远端物料到本地...", "info")
remote_tree_set = ResourceTreeSet.from_raw_dict_list(request_startup_json["nodes"])
resource_tree_set.merge_remote_resources(remote_tree_set)
@@ -579,6 +693,10 @@ def main():
open_browser=not args_dict["disable_browser"],
port=BasicConfig.port,
)
if restart_requested:
print_status("[Main] Restart requested, cleaning up...", "info")
cleanup_for_restart()
os._exit(RESTART_EXIT_CODE)
if __name__ == "__main__":

View File

@@ -1,60 +1,83 @@
import json
import time
from typing import Optional, Tuple, Dict, Any
from typing import Any, Dict, Optional, Tuple
from unilabos.utils.log import logger
from unilabos.utils.type_check import TypeEncoder
try:
import orjson
def _normalize_device(info: dict) -> dict:
"""Serialize via orjson to strip non-JSON types (type objects etc.)."""
return orjson.loads(orjson.dumps(info, default=str))
except ImportError:
def _normalize_device(info: dict) -> dict:
return json.loads(json.dumps(info, ensure_ascii=False, cls=TypeEncoder))
def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
"""
注册设备和资源到服务器仅支持HTTP
"""
# 注册资源信息 - 使用HTTP方式
from unilabos.app.web.client import http_client
logger.info("[UniLab Register] 开始注册设备和资源...")
# 注册设备信息
devices_to_register = {}
for device_info in lab_registry.obtain_registry_device_info():
devices_to_register[device_info["id"]] = json.loads(
json.dumps(device_info, ensure_ascii=False, cls=TypeEncoder)
)
logger.debug(f"[UniLab Register] 收集设备: {device_info['id']}")
devices_to_register[device_info["id"]] = _normalize_device(device_info)
logger.trace(f"[UniLab Register] 收集设备: {device_info['id']}")
resources_to_register = {}
for resource_info in lab_registry.obtain_registry_resource_info():
resources_to_register[resource_info["id"]] = resource_info
logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}")
logger.trace(f"[UniLab Register] 收集资源: {resource_info['id']}")
if gather_only:
return devices_to_register, resources_to_register
# 注册设备
if devices_to_register:
try:
start_time = time.time()
response = http_client.resource_registry({"resources": list(devices_to_register.values())})
response = http_client.resource_registry(
{"resources": list(devices_to_register.values())},
tag="device_registry",
)
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}s")
res_data = response.json() if response.status_code == 200 else {}
skipped = res_data.get("data", {}).get("skipped", False)
if skipped:
logger.info(
f"[UniLab Register] 设备注册跳过(内容未变化)"
f" {len(devices_to_register)}{cost_time:.3f}s"
)
elif response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time:.3f}s")
else:
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}s")
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time:.3f}s")
except Exception as e:
logger.error(f"[UniLab Register] 设备注册异常: {e}")
# 注册资源
if resources_to_register:
try:
start_time = time.time()
response = http_client.resource_registry({"resources": list(resources_to_register.values())})
response = http_client.resource_registry(
{"resources": list(resources_to_register.values())},
tag="resource_registry",
)
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}s")
res_data = response.json() if response.status_code == 200 else {}
skipped = res_data.get("data", {}).get("skipped", False)
if skipped:
logger.info(
f"[UniLab Register] 资源注册跳过(内容未变化)"
f" {len(resources_to_register)}{cost_time:.3f}s"
)
elif response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time:.3f}s")
else:
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}s")
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time:.3f}s")
except Exception as e:
logger.error(f"[UniLab Register] 资源注册异常: {e}")
logger.info("[UniLab Register] 设备和资源注册完成.")

View File

@@ -1052,7 +1052,7 @@ async def handle_file_import(websocket: WebSocket, request_data: dict):
"result": {},
"schema": lab_registry._generate_unilab_json_command_schema(v["args"], k),
"goal_default": {i["name"]: i["default"] for i in v["args"]},
"handles": [],
"handles": {},
}
# 不生成已配置action的动作
for k, v in enhanced_info["action_methods"].items()
@@ -1340,5 +1340,5 @@ def setup_api_routes(app):
# 启动广播任务
@app.on_event("startup")
async def startup_event():
asyncio.create_task(broadcast_device_status())
asyncio.create_task(broadcast_status_page_data())
asyncio.create_task(broadcast_device_status(), name="web-api-startup-device")
asyncio.create_task(broadcast_status_page_data(), name="web-api-startup-status")

View File

@@ -3,11 +3,30 @@ HTTP客户端模块
提供与远程服务器通信的客户端功能只有host需要用
"""
import gzip
import json
import os
from typing import List, Dict, Any, Optional
try:
import orjson as _json_fast
def _fast_dumps(obj, **kwargs) -> bytes:
return _json_fast.dumps(obj, option=_json_fast.OPT_NON_STR_KEYS, default=str)
def _fast_dumps_pretty(obj, **kwargs) -> bytes:
return _json_fast.dumps(
obj, option=_json_fast.OPT_NON_STR_KEYS | _json_fast.OPT_INDENT_2, default=str,
)
except ImportError:
_json_fast = None # type: ignore[assignment]
def _fast_dumps(obj, **kwargs) -> bytes:
return json.dumps(obj, ensure_ascii=False, default=str).encode("utf-8")
def _fast_dumps_pretty(obj, **kwargs) -> bytes:
return json.dumps(obj, indent=2, ensure_ascii=False, default=str).encode("utf-8")
import requests
from unilabos.resources.resource_tracker import ResourceTreeSet
from unilabos.utils.log import info
@@ -280,22 +299,54 @@ class HTTPClient:
)
return response
def resource_registry(self, registry_data: Dict[str, Any] | List[Dict[str, Any]]) -> requests.Response:
def resource_registry(
self, registry_data: Dict[str, Any] | List[Dict[str, Any]], tag: str = "registry",
) -> requests.Response:
"""
注册资源到服务器
注册资源到服务器,同步保存请求/响应到 unilabos_data
Args:
registry_data: 注册表数据,格式为 {resource_id: resource_info} / [{resource_info}]
tag: 保存文件的标签后缀 (如 "device_registry" / "resource_registry")
Returns:
Response: API响应对象
"""
# 序列化一次,同时用于保存和发送
json_bytes = _fast_dumps(registry_data)
# 保存请求数据到 unilabos_data
req_path = os.path.join(BasicConfig.working_dir, f"req_{tag}_upload.json")
try:
os.makedirs(BasicConfig.working_dir, exist_ok=True)
with open(req_path, "wb") as f:
f.write(_fast_dumps_pretty(registry_data))
logger.trace(f"注册表请求数据已保存: {req_path}")
except Exception as e:
logger.warning(f"保存注册表请求数据失败: {e}")
compressed_body = gzip.compress(json_bytes)
headers = {
"Authorization": f"Lab {self.auth}",
"Content-Type": "application/json",
"Content-Encoding": "gzip",
}
response = requests.post(
f"{self.remote_addr}/lab/resource",
json=registry_data,
headers={"Authorization": f"Lab {self.auth}"},
data=compressed_body,
headers=headers,
timeout=30,
)
# 保存响应数据到 unilabos_data
res_path = os.path.join(BasicConfig.working_dir, f"res_{tag}_upload.json")
try:
with open(res_path, "w", encoding="utf-8") as f:
f.write(f"{response.status_code}\n{response.text}")
logger.trace(f"注册表响应数据已保存: {res_path}")
except Exception as e:
logger.warning(f"保存注册表响应数据失败: {e}")
if response.status_code not in [200, 201]:
logger.error(f"注册资源失败: {response.status_code}, {response.text}")
if response.status_code == 200:

View File

@@ -86,7 +86,7 @@ def setup_server() -> FastAPI:
# 设置页面路由
try:
setup_web_pages(pages)
info("[Web] 已加载Web UI模块")
# info("[Web] 已加载Web UI模块")
except ImportError as e:
info(f"[Web] 未找到Web页面模块: {str(e)}")
except Exception as e:
@@ -138,7 +138,7 @@ def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = T
server_thread = threading.Thread(target=server.run, daemon=True, name="uvicorn_server")
server_thread.start()
info("[Web] Server started, monitoring for restart requests...")
# info("[Web] Server started, monitoring for restart requests...")
# 监控重启标志
import unilabos.app.main as main_module

View File

@@ -26,6 +26,7 @@ from enum import Enum
from typing_extensions import TypedDict
from unilabos.app.model import JobAddReq
from unilabos.resources.resource_tracker import ResourceDictType
from unilabos.ros.nodes.presets.host_node import HostNode
from unilabos.utils.type_check import serialize_result_info
from unilabos.app.communication import BaseCommunicationClient
@@ -408,6 +409,7 @@ class MessageProcessor:
# 线程控制
self.is_running = False
self.thread = None
self._loop = None # asyncio event loop引用用于外部关闭websocket
self.reconnect_count = 0
logger.info(f"[MessageProcessor] Initialized for URL: {websocket_url}")
@@ -434,22 +436,31 @@ class MessageProcessor:
def stop(self) -> None:
"""停止消息处理线程"""
self.is_running = False
# 主动关闭websocket以快速中断消息接收循环
ws = self.websocket
loop = self._loop
if ws and loop and loop.is_running():
try:
asyncio.run_coroutine_threadsafe(ws.close(), loop)
except Exception:
pass
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2)
logger.info("[MessageProcessor] Stopped")
def _run(self):
"""运行消息处理主循环"""
loop = asyncio.new_event_loop()
self._loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
loop.run_until_complete(self._connection_handler())
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._connection_handler())
except Exception as e:
logger.error(f"[MessageProcessor] Thread error: {str(e)}")
logger.error(traceback.format_exc())
finally:
if loop:
loop.close()
if self._loop:
self._loop.close()
self._loop = None
async def _connection_handler(self):
"""处理WebSocket连接和重连逻辑"""
@@ -466,8 +477,10 @@ class MessageProcessor:
async with websockets.connect(
self.websocket_url,
ssl=ssl_context,
open_timeout=20,
ping_interval=WSConfig.ping_interval,
ping_timeout=10,
close_timeout=5,
additional_headers={
"Authorization": f"Lab {BasicConfig.auth_secret()}",
"EdgeSession": f"{self.session_id}",
@@ -478,81 +491,94 @@ class MessageProcessor:
self.connected = True
self.reconnect_count = 0
logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}")
logger.info(f"[MessageProcessor] 已连接到 {self.websocket_url}")
# 启动发送协程
send_task = asyncio.create_task(self._send_handler())
send_task = asyncio.create_task(self._send_handler(), name="websocket-send_task")
# 每次连接(含重连)后重新向服务端注册,
# 否则服务端不知道客户端已上线,不会推送消息。
if self.websocket_client:
self.websocket_client.publish_host_ready()
try:
# 接收消息循环
await self._message_handler()
finally:
# 必须在 async with __aexit__ 之前停止 send_task
# 否则 send_task 会在关闭握手期间继续发送数据,
# 干扰 websockets 库的内部清理,导致 task 泄漏。
self.connected = False
send_task.cancel()
try:
await send_task
except asyncio.CancelledError:
pass
self.connected = False
except websockets.exceptions.ConnectionClosed:
logger.warning("[MessageProcessor] Connection closed")
self.connected = False
logger.warning("[MessageProcessor] 与服务端连接中断")
except TimeoutError:
logger.warning(
f"[MessageProcessor] 与服务端连接通信超时 (已尝试 {self.reconnect_count + 1} 次),请检查您的网络状况"
)
except websockets.exceptions.InvalidStatus as e:
logger.warning(
f"[MessageProcessor] 收到服务端注册码 {e.response.status_code}, 上一进程可能还未退出"
)
except Exception as e:
logger.error(f"[MessageProcessor] Connection error: {str(e)}")
logger.error(traceback.format_exc())
self.connected = False
logger.error(f"[MessageProcessor] 尝试重连时出错 {str(e)}")
finally:
self.connected = False
self.websocket = None
# 重连逻辑
if self.is_running and self.reconnect_count < WSConfig.max_reconnect_attempts:
if not self.is_running:
break
if self.reconnect_count < WSConfig.max_reconnect_attempts:
self.reconnect_count += 1
backoff = WSConfig.reconnect_interval
logger.info(
f"[MessageProcessor] Reconnecting in {WSConfig.reconnect_interval}s "
f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
)
await asyncio.sleep(WSConfig.reconnect_interval)
elif self.reconnect_count >= WSConfig.max_reconnect_attempts:
await asyncio.sleep(backoff)
else:
logger.error("[MessageProcessor] Max reconnection attempts reached")
break
else:
self.reconnect_count -= 1
async def _message_handler(self):
"""处理接收到的消息"""
"""处理接收到的消息
ConnectionClosed 不在此处捕获,让其向上传播到 _connection_handler
以便 async with websockets.connect() 的 __aexit__ 能感知连接已断,
正确清理内部 task避免 task 泄漏。
"""
if not self.websocket:
logger.error("[MessageProcessor] WebSocket connection is None")
return
try:
async for message in self.websocket:
try:
data = json.loads(message)
message_type = data.get("action", "")
message_data = data.get("data")
if self.session_id and self.session_id == data.get("edge_session"):
await self._process_message(message_type, message_data)
async for message in self.websocket:
try:
data = json.loads(message)
message_type = data.get("action", "")
message_data = data.get("data")
if self.session_id and self.session_id == data.get("edge_session"):
await self._process_message(message_type, message_data)
else:
if message_type.endswith("_material"):
logger.trace(
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
)
logger.debug(
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
)
else:
if message_type.endswith("_material"):
logger.trace(
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
)
logger.debug(
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
)
else:
await self._process_message(message_type, message_data)
except json.JSONDecodeError:
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
except Exception as e:
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
logger.error(traceback.format_exc())
except websockets.exceptions.ConnectionClosed:
logger.info("[MessageProcessor] Message handler stopped - connection closed")
except Exception as e:
logger.error(f"[MessageProcessor] Message handler error: {str(e)}")
logger.error(traceback.format_exc())
await self._process_message(message_type, message_data)
except json.JSONDecodeError:
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
except Exception as e:
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
logger.error(traceback.format_exc())
async def _send_handler(self):
"""处理发送队列中的消息"""
@@ -601,6 +627,7 @@ class MessageProcessor:
except asyncio.CancelledError:
logger.debug("[MessageProcessor] Send handler cancelled")
raise
except Exception as e:
logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}")
logger.error(traceback.format_exc())
@@ -632,6 +659,10 @@ class MessageProcessor:
# elif message_type == "session_id":
# self.session_id = message_data.get("session_id")
# logger.info(f"[MessageProcessor] Session ID: {self.session_id}")
elif message_type == "add_device":
await self._handle_device_manage(message_data, "add")
elif message_type == "remove_device":
await self._handle_device_manage(message_data, "remove")
elif message_type == "request_restart":
await self._handle_request_restart(message_data)
else:
@@ -968,6 +999,37 @@ class MessageProcessor:
)
thread.start()
async def _handle_device_manage(self, device_list: list[ResourceDictType], action: str):
"""Handle add_device / remove_device from LabGo server."""
if not device_list:
return
for item in device_list:
target_node_id = item.get("target_node_id", "host_node")
def _notify(target_id: str, act: str, cfg: ResourceDictType):
try:
host_node = HostNode.get_instance(timeout=5)
if not host_node:
logger.error(f"[DeviceManage] HostNode not available for {act}_device")
return
success = host_node.notify_device_manage(target_id, act, cfg)
if success:
logger.info(f"[DeviceManage] {act}_device completed on {target_id}")
else:
logger.warning(f"[DeviceManage] {act}_device failed on {target_id}")
except Exception as e:
logger.error(f"[DeviceManage] Error in {act}_device: {e}")
logger.error(traceback.format_exc())
thread = threading.Thread(
target=_notify,
args=(target_node_id, action, item),
daemon=True,
name=f"DeviceManage-{action}-{item.get('id', '')}",
)
thread.start()
async def _handle_request_restart(self, data: Dict[str, Any]):
"""
处理重启请求
@@ -979,10 +1041,9 @@ class MessageProcessor:
logger.info(f"[MessageProcessor] Received restart request, reason: {reason}, delay: {delay}s")
# 发送确认消息
if self.websocket_client:
await self.websocket_client.send_message(
{"action": "restart_acknowledged", "data": {"reason": reason, "delay": delay}}
)
self.send_message(
{"action": "restart_acknowledged", "data": {"reason": reason, "delay": delay}}
)
# 设置全局重启标志
import unilabos.app.main as main_module
@@ -1084,6 +1145,7 @@ class QueueProcessor:
def stop(self) -> None:
"""停止队列处理线程"""
self.is_running = False
self.queue_update_event.set() # 立即唤醒等待中的线程
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2)
logger.info("[QueueProcessor] Stopped")
@@ -1337,8 +1399,8 @@ class WebSocketClient(BaseCommunicationClient):
message = {"action": "normal_exit", "data": {"session_id": session_id}}
self.message_processor.send_message(message)
logger.info(f"[WebSocketClient] Sent normal_exit message with session_id: {session_id}")
# 给一点时间让消息发送出去
time.sleep(1)
# send_handler 每100ms检查一次队列等300ms足以让消息发
time.sleep(0.3)
except Exception as e:
logger.warning(f"[WebSocketClient] Failed to send normal_exit message: {str(e)}")

View File

@@ -24,6 +24,7 @@ class BasicConfig:
port = 8002 # 本地HTTP服务
check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性
test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果
extra_resource = False # 是否加载lab_开头的额外资源
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"
@@ -40,7 +41,7 @@ class BasicConfig:
class WSConfig:
reconnect_interval = 5 # 重连间隔(秒)
max_reconnect_attempts = 999 # 最大重连次数
ping_interval = 30 # ping间隔
ping_interval = 20 # ping间隔
# HTTP配置

View File

@@ -1,4 +1,3 @@
from abc import abstractmethod
from functools import wraps
import inspect

View File

@@ -634,7 +634,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
def __init__(
self,
deck: Deck,
deck: PRCXI9300Deck,
host: str,
port: int,
timeout: float,
@@ -648,11 +648,11 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
is_9320=False,
):
tablets_info = []
count = 0
for child in deck.children:
for site_id in range(len(deck.sites)):
child = deck._get_site_resource(site_id)
# 如果放其他类型的物料,是不可以的
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
number = int(child.name.replace("T", ""))
number = site_id + 1
tablets_info.append(
WorkTablets(
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]

View File

@@ -1,15 +1,15 @@
"""
Virtual Workbench Device - 模拟工作台设备
包含
包含:
- 1个机械臂 (每次操作3s, 独占锁)
- 3个加热台 (每次加热10s, 可并行)
工作流程
1. A1-A5 物料同时启动竞争机械臂
工作流程:
1. A1-A5 物料同时启动, 竞争机械臂
2. 机械臂将物料移动到空闲加热台
3. 加热完成后机械臂将物料移动到C1-C5
3. 加热完成后, 机械臂将物料移动到C1-C5
注意调用来自线程池使用 threading.Lock 进行同步
注意: 调用来自线程池, 使用 threading.Lock 进行同步
"""
import logging
@@ -21,9 +21,11 @@ from threading import Lock, RLock
from typing_extensions import TypedDict
from unilabos.registry.decorators import (
device, action, ActionInputHandle, ActionOutputHandle, DataSource, topic_config, not_action
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.utils.decorator import not_action, always_free
from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES
from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample
# ============ TypedDict 返回类型定义 ============
@@ -57,6 +59,8 @@ class MoveToOutputResult(TypedDict):
success: bool
station_id: int
material_id: str
output_position: str
message: str
unilabos_samples: List[LabSample]
@@ -81,9 +85,9 @@ class HeatingStationState(Enum):
"""加热台状态枚举"""
IDLE = "idle" # 空闲
OCCUPIED = "occupied" # 已放置物料等待加热
OCCUPIED = "occupied" # 已放置物料, 等待加热
HEATING = "heating" # 加热中
COMPLETED = "completed" # 加热完成等待取走
COMPLETED = "completed" # 加热完成, 等待取走
class ArmState(Enum):
@@ -105,19 +109,24 @@ class HeatingStation:
heating_progress: float = 0.0
@device(
id="virtual_workbench",
category=["virtual_device"],
description="Virtual Workbench with 1 robotic arm and 3 heating stations for concurrent material processing",
)
class VirtualWorkbench:
"""
Virtual Workbench Device - 虚拟工作台设备
模拟一个包含1个机械臂和3个加热台的工作站
- 机械臂操作耗时3秒同一时间只能执行一个操作
- 加热台加热耗时10秒3个加热台可并行工作
- 机械臂操作耗时3秒, 同一时间只能执行一个操作
- 加热台加热耗时10秒, 3个加热台可并行工作
工作流:
1. 物料A1-A5并发启动线程池竞争机械臂使用权
2. 获取机械臂后查找空闲加热台
3. 机械臂将物料放入加热台开始加热
4. 加热完成后机械臂将物料移动到目标位置Cn
1. 物料A1-A5并发启动(线程池), 竞争机械臂使用权
2. 获取机械臂后, 查找空闲加热台
3. 机械臂将物料放入加热台, 开始加热
4. 加热完成后, 机械臂将物料移动到目标位置Cn
"""
_ros_node: BaseROS2DeviceNode
@@ -145,19 +154,19 @@ class VirtualWorkbench:
self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS))
# 机械臂状态和锁 (使用threading.Lock)
# 机械臂状态和锁
self._arm_lock = Lock()
self._arm_state = ArmState.IDLE
self._arm_current_task: Optional[str] = None
# 加热台状态 (station_id -> HeatingStation) - 立即初始化不依赖initialize()
# 加热台状态
self._heating_stations: Dict[int, HeatingStation] = {
i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1)
}
self._stations_lock = RLock() # 可重入锁,保护加热台状态
self._stations_lock = RLock()
# 任务追踪
self._active_tasks: Dict[str, Dict[str, Any]] = {} # material_id -> task_info
self._active_tasks: Dict[str, Dict[str, Any]] = {}
self._tasks_lock = Lock()
# 处理其他kwargs参数
@@ -183,7 +192,6 @@ class VirtualWorkbench:
"""初始化虚拟工作台"""
self.logger.info(f"初始化虚拟工作台 {self.device_id}")
# 重置加热台状态 (已在__init__中创建这里重置为初始状态)
with self._stations_lock:
for station in self._heating_stations.values():
station.state = HeatingStationState.IDLE
@@ -191,7 +199,6 @@ class VirtualWorkbench:
station.material_number = None
station.heating_progress = 0.0
# 初始化状态
self.data.update(
{
"status": "Ready",
@@ -257,11 +264,7 @@ class VirtualWorkbench:
self.data["message"] = message
def _find_available_heating_station(self) -> Optional[int]:
"""查找空闲的加热台
Returns:
空闲加热台ID如果没有则返回None
"""
"""查找空闲的加热台"""
with self._stations_lock:
for station_id, station in self._heating_stations.items():
if station.state == HeatingStationState.IDLE:
@@ -269,23 +272,12 @@ class VirtualWorkbench:
return None
def _acquire_arm(self, task_description: str) -> bool:
"""获取机械臂使用权阻塞直到获取
Args:
task_description: 任务描述,用于日志
Returns:
是否成功获取
"""
"""获取机械臂使用权(阻塞直到获取)"""
self.logger.info(f"[{task_description}] 等待获取机械臂...")
# 阻塞等待获取锁
self._arm_lock.acquire()
self._arm_state = ArmState.BUSY
self._arm_current_task = task_description
self._update_data_status(f"机械臂执行: {task_description}")
self.logger.info(f"[{task_description}] 成功获取机械臂使用权")
return True
@@ -298,6 +290,22 @@ class VirtualWorkbench:
self._update_data_status(f"机械臂已释放 (完成: {task})")
self.logger.info(f"机械臂已释放 (完成: {task})")
@action(
auto_prefix=True,
description="批量准备物料 - 虚拟起始节点, 生成A1-A5物料, 输出5个handle供后续节点使用",
handles=[
ActionOutputHandle(key="channel_1", data_type="workbench_material",
label="实验1", data_key="material_1", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_2", data_type="workbench_material",
label="实验2", data_key="material_2", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_3", data_type="workbench_material",
label="实验3", data_key="material_3", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_4", data_type="workbench_material",
label="实验4", data_key="material_4", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_5", data_type="workbench_material",
label="实验5", data_key="material_5", data_source=DataSource.EXECUTOR),
],
)
def prepare_materials(
self,
sample_uuids: SampleUUIDsType,
@@ -306,19 +314,14 @@ class VirtualWorkbench:
"""
批量准备物料 - 虚拟起始节点
作为工作流的起始节点生成指定数量的物料编号供后续节点使用。
输出5个handle (material_1 ~ material_5)分别对应实验1~5。
Args:
count: 待生成的物料数量默认5 (生成 A1-A5)
Returns:
PrepareMaterialsResult: 包含 material_1 ~ material_5 用于传递给 move_to_heating_station
作为工作流的起始节点, 生成指定数量的物料编号供后续节点使用。
输出5个handle (material_1 ~ material_5), 分别对应实验1~5。
"""
# 生成物料列表 A1 - A{count}
materials = [i for i in range(1, count + 1)]
self.logger.info(f"[准备物料] 生成 {count} 个物料: " f"A1-A{count} -> material_1~material_{count}")
self.logger.info(
f"[准备物料] 生成 {count} 个物料: A1-A{count} -> material_1~material_{count}"
)
return {
"success": True,
@@ -329,9 +332,28 @@ class VirtualWorkbench:
"material_4": materials[3] if len(materials) > 3 else 0,
"material_5": materials[4] if len(materials) > 4 else 0,
"message": f"已准备 {count} 个物料: A1-A{count}",
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
"unilabos_samples": [
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra={"material_uuid": content} if isinstance(content, str) else (content.serialize() if content else {}),
)
for sample_uuid, content in sample_uuids.items()
],
}
@action(
auto_prefix=True,
description="将物料从An位置移动到空闲加热台, 返回分配的加热台ID",
handles=[
ActionInputHandle(key="material_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE),
ActionOutputHandle(key="heating_station_output", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="material_number_output", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.EXECUTOR),
],
)
def move_to_heating_station(
self,
sample_uuids: SampleUUIDsType,
@@ -340,20 +362,12 @@ class VirtualWorkbench:
"""
将物料从An位置移动到加热台
多线程并发调用时会竞争机械臂使用权并自动查找空闲加热台
Args:
material_number: 物料编号 (1-5)
Returns:
MoveToHeatingStationResult: 包含 station_id, material_number 等用于传递给下一个节点
多线程并发调用时, 会竞争机械臂使用权, 并自动查找空闲加热台
"""
# 根据物料编号生成物料ID
material_id = f"A{material_number}"
task_desc = f"移动{material_id}到加热台"
self.logger.info(f"[任务] {task_desc} - 开始执行")
# 记录任务
with self._tasks_lock:
self._active_tasks[material_id] = {
"status": "waiting_for_arm",
@@ -361,33 +375,27 @@ class VirtualWorkbench:
}
try:
# 步骤1: 等待获取机械臂使用权(竞争)
with self._tasks_lock:
self._active_tasks[material_id]["status"] = "waiting_for_arm"
self._acquire_arm(task_desc)
# 步骤2: 查找空闲加热台
with self._tasks_lock:
self._active_tasks[material_id]["status"] = "finding_station"
station_id = None
# 循环等待直到找到空闲加热台
while station_id is None:
station_id = self._find_available_heating_station()
if station_id is None:
self.logger.info(f"[{material_id}] 没有空闲加热台等待中...")
# 释放机械臂,等待后重试
self.logger.info(f"[{material_id}] 没有空闲加热台, 等待中...")
self._release_arm()
time.sleep(0.5)
self._acquire_arm(task_desc)
# 步骤3: 占用加热台 - 立即标记为OCCUPIED防止其他任务选择同一加热台
with self._stations_lock:
self._heating_stations[station_id].state = HeatingStationState.OCCUPIED
self._heating_stations[station_id].current_material = material_id
self._heating_stations[station_id].material_number = material_number
# 步骤4: 模拟机械臂移动操作 (3秒)
with self._tasks_lock:
self._active_tasks[material_id]["status"] = "arm_moving"
self._active_tasks[material_id]["assigned_station"] = station_id
@@ -395,11 +403,11 @@ class VirtualWorkbench:
time.sleep(self.ARM_OPERATION_TIME)
# 步骤5: 放入加热台完成
self._update_data_status(f"{material_id}已放入加热台{station_id}")
self.logger.info(f"[{material_id}] 已放入加热台{station_id} (用时{self.ARM_OPERATION_TIME}s)")
self.logger.info(
f"[{material_id}] 已放入加热台{station_id} (用时{self.ARM_OPERATION_TIME}s)"
)
# 释放机械臂
self._release_arm()
with self._tasks_lock:
@@ -412,8 +420,16 @@ class VirtualWorkbench:
"material_number": material_number,
"message": f"{material_id}已成功移动到加热台{station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
except Exception as e:
@@ -427,11 +443,33 @@ class VirtualWorkbench:
"material_number": material_number,
"message": f"移动失败: {str(e)}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
@always_free
@action(
auto_prefix=True,
always_free=True,
description="启动指定加热台的加热程序",
handles=[
ActionInputHandle(key="station_id_input", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source=DataSource.HANDLE),
ActionInputHandle(key="material_number_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE),
ActionOutputHandle(key="heating_done_station", data_type="workbench_station",
label="加热完成-加热台ID", data_key="station_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="heating_done_material", data_type="workbench_material",
label="加热完成-物料编号", data_key="material_number", data_source=DataSource.EXECUTOR),
],
)
def start_heating(
self,
sample_uuids: SampleUUIDsType,
@@ -440,13 +478,6 @@ class VirtualWorkbench:
) -> StartHeatingResult:
"""
启动指定加热台的加热程序
Args:
station_id: 加热台ID (1-3),从 move_to_heating_station 的 handle 传入
material_number: 物料编号,从 move_to_heating_station 的 handle 传入
Returns:
StartHeatingResult: 包含 station_id, material_number 等用于传递给下一个节点
"""
self.logger.info(f"[加热台{station_id}] 开始加热")
@@ -458,8 +489,16 @@ class VirtualWorkbench:
"material_number": material_number,
"message": f"无效的加热台ID: {station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
with self._stations_lock:
@@ -473,8 +512,16 @@ class VirtualWorkbench:
"material_number": material_number,
"message": f"加热台{station_id}上没有物料",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
if station.state == HeatingStationState.HEATING:
@@ -485,13 +532,20 @@ class VirtualWorkbench:
"material_number": material_number,
"message": f"加热台{station_id}已经在加热中",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
material_id = station.current_material
# 开始加热
station.state = HeatingStationState.HEATING
station.heating_start_time = time.time()
station.heating_progress = 0.0
@@ -502,7 +556,6 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}开始加热{material_id}")
# 打印当前所有正在加热的台位
with self._stations_lock:
heating_list = [
f"加热台{sid}:{s.current_material}"
@@ -511,7 +564,6 @@ class VirtualWorkbench:
]
self.logger.info(f"[并行加热] 当前同时加热中: {', '.join(heating_list)}")
# 模拟加热过程
start_time = time.time()
last_countdown_log = start_time
while True:
@@ -524,7 +576,6 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%")
# 每5秒打印一次倒计时
if time.time() - last_countdown_log >= 5.0:
self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s")
last_countdown_log = time.time()
@@ -534,7 +585,6 @@ class VirtualWorkbench:
time.sleep(1.0)
# 加热完成
with self._stations_lock:
self._heating_stations[station_id].state = HeatingStationState.COMPLETED
self._heating_stations[station_id].heating_progress = 100.0
@@ -553,10 +603,28 @@ class VirtualWorkbench:
"material_number": material_number,
"message": f"加热台{station_id}加热完成",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
@action(
auto_prefix=True,
description="将物料从加热台移动到输出位置Cn",
handles=[
ActionInputHandle(key="output_station_input", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source=DataSource.HANDLE),
ActionInputHandle(key="output_material_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE),
],
)
def move_to_output(
self,
sample_uuids: SampleUUIDsType,
@@ -565,15 +633,8 @@ class VirtualWorkbench:
) -> MoveToOutputResult:
"""
将物料从加热台移动到输出位置Cn
Args:
station_id: 加热台ID (1-3),从 start_heating 的 handle 传入
material_number: 物料编号,从 start_heating 的 handle 传入,用于确定输出位置 Cn
Returns:
MoveToOutputResult: 包含执行结果
"""
output_number = material_number # 物料编号决定输出位置
output_number = material_number
if station_id not in self._heating_stations:
return {
@@ -583,8 +644,16 @@ class VirtualWorkbench:
"output_position": f"C{output_number}",
"message": f"无效的加热台ID: {station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
with self._stations_lock:
@@ -599,8 +668,16 @@ class VirtualWorkbench:
"output_position": f"C{output_number}",
"message": f"加热台{station_id}上没有物料",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
if station.state != HeatingStationState.COMPLETED:
@@ -611,8 +688,16 @@ class VirtualWorkbench:
"output_position": f"C{output_number}",
"message": f"加热台{station_id}尚未完成加热 (当前状态: {station.state.value})",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
output_position = f"C{output_number}"
@@ -624,18 +709,17 @@ class VirtualWorkbench:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "waiting_for_arm_output"
# 获取机械臂
self._acquire_arm(task_desc)
with self._tasks_lock:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "arm_moving_to_output"
# 模拟机械臂操作 (3秒)
self.logger.info(f"[{material_id}] 机械臂正在从加热台{station_id}取出并移动到{output_position}...")
self.logger.info(
f"[{material_id}] 机械臂正在从加热台{station_id}取出并移动到{output_position}..."
)
time.sleep(self.ARM_OPERATION_TIME)
# 清空加热台
with self._stations_lock:
self._heating_stations[station_id].state = HeatingStationState.IDLE
self._heating_stations[station_id].current_material = None
@@ -643,17 +727,17 @@ class VirtualWorkbench:
self._heating_stations[station_id].heating_progress = 0.0
self._heating_stations[station_id].heating_start_time = None
# 释放机械臂
self._release_arm()
# 任务完成
with self._tasks_lock:
if material_id in self._active_tasks:
self._active_tasks[material_id]["status"] = "completed"
self._active_tasks[material_id]["end_time"] = time.time()
self._update_data_status(f"{material_id}已移动到{output_position}")
self.logger.info(f"[{material_id}] 已成功移动到{output_position} (用时{self.ARM_OPERATION_TIME}s)")
self.logger.info(
f"[{material_id}] 已成功移动到{output_position} (用时{self.ARM_OPERATION_TIME}s)"
)
return {
"success": True,
@@ -662,8 +746,17 @@ class VirtualWorkbench:
"output_position": output_position,
"message": f"{material_id}已成功移动到{output_position}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str)
else (content.serialize() if content is not None else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
except Exception as e:
@@ -677,83 +770,105 @@ class VirtualWorkbench:
"output_position": output_position,
"message": f"移动失败: {str(e)}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
}
# ============ 状态属性 ============
@property
@topic_config()
def status(self) -> str:
return self.data.get("status", "Unknown")
@property
@topic_config()
def arm_state(self) -> str:
return self._arm_state.value
@property
@topic_config()
def arm_current_task(self) -> str:
return self._arm_current_task or ""
@property
@topic_config()
def heating_station_1_state(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(1)
return station.state.value if station else "unknown"
@property
@topic_config()
def heating_station_1_material(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(1)
return station.current_material or "" if station else ""
@property
@topic_config()
def heating_station_1_progress(self) -> float:
with self._stations_lock:
station = self._heating_stations.get(1)
return station.heating_progress if station else 0.0
@property
@topic_config()
def heating_station_2_state(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(2)
return station.state.value if station else "unknown"
@property
@topic_config()
def heating_station_2_material(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(2)
return station.current_material or "" if station else ""
@property
@topic_config()
def heating_station_2_progress(self) -> float:
with self._stations_lock:
station = self._heating_stations.get(2)
return station.heating_progress if station else 0.0
@property
@topic_config()
def heating_station_3_state(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(3)
return station.state.value if station else "unknown"
@property
@topic_config()
def heating_station_3_material(self) -> str:
with self._stations_lock:
station = self._heating_stations.get(3)
return station.current_material or "" if station else ""
@property
@topic_config()
def heating_station_3_progress(self) -> float:
with self._stations_lock:
station = self._heating_stations.get(3)
return station.heating_progress if station else 0.0
@property
@topic_config()
def active_tasks_count(self) -> int:
with self._tasks_lock:
return len(self._active_tasks)
@property
@topic_config()
def message(self) -> str:
return self.data.get("message", "")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,614 @@
"""
装饰器注册表系统
通过 @device, @action, @resource 装饰器替代 YAML 配置文件来定义设备/动作/资源注册表信息。
Usage:
from unilabos.registry.decorators import (
device, action, resource,
InputHandle, OutputHandle,
ActionInputHandle, ActionOutputHandle,
HardwareInterface, Side, DataSource,
)
@device(
id="solenoid_valve.mock",
category=["pump_and_valve"],
description="模拟电磁阀设备",
handles=[
InputHandle(key="in", data_type="fluid", label="in", side=Side.NORTH),
OutputHandle(key="out", data_type="fluid", label="out", side=Side.SOUTH),
],
hardware_interface=HardwareInterface(
name="hardware_interface",
read="send_command",
write="send_command",
),
)
class SolenoidValveMock:
@action(action_type=EmptyIn)
def close(self):
...
@action(
handles=[
ActionInputHandle(key="in", data_type="fluid", label="in"),
ActionOutputHandle(key="out", data_type="fluid", label="out"),
],
)
def set_valve_position(self, position):
...
# 无 @action 装饰器 => auto- 前缀动作
def is_open(self):
...
"""
from enum import Enum
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, TypeVar
from pydantic import BaseModel, ConfigDict, Field
F = TypeVar("F", bound=Callable[..., Any])
# ---------------------------------------------------------------------------
# 枚举
# ---------------------------------------------------------------------------
class Side(str, Enum):
"""UI 上 Handle 的显示位置"""
NORTH = "NORTH"
SOUTH = "SOUTH"
EAST = "EAST"
WEST = "WEST"
class DataSource(str, Enum):
"""Handle 的数据来源"""
HANDLE = "handle" # 从上游 handle 获取数据 (用于 InputHandle)
EXECUTOR = "executor" # 从执行器输出数据 (用于 OutputHandle)
# ---------------------------------------------------------------------------
# Device / Resource Handle (设备/资源级别端口, 序列化时包含 io_type)
# ---------------------------------------------------------------------------
class _DeviceHandleBase(BaseModel):
"""设备/资源端口基类 (内部使用)"""
model_config = ConfigDict(populate_by_name=True)
key: str = Field(serialization_alias="handler_key")
data_type: str
label: str
side: Optional[Side] = None
data_key: Optional[str] = None
data_source: Optional[str] = None
description: Optional[str] = None
# 子类覆盖
io_type: str = ""
def to_registry_dict(self) -> Dict[str, Any]:
return self.model_dump(by_alias=True, exclude_none=True)
class InputHandle(_DeviceHandleBase):
"""
输入端口 (io_type="target"), 用于 @device / @resource handles
Example:
InputHandle(key="in", data_type="fluid", label="in", side=Side.NORTH)
"""
io_type: str = "target"
class OutputHandle(_DeviceHandleBase):
"""
输出端口 (io_type="source"), 用于 @device / @resource handles
Example:
OutputHandle(key="out", data_type="fluid", label="out", side=Side.SOUTH)
"""
io_type: str = "source"
# ---------------------------------------------------------------------------
# Action Handle (动作级别端口, 序列化时不含 io_type, 按类型自动分组)
# ---------------------------------------------------------------------------
class _ActionHandleBase(BaseModel):
"""动作端口基类 (内部使用)"""
model_config = ConfigDict(populate_by_name=True)
key: str = Field(serialization_alias="handler_key")
data_type: str
label: str
side: Optional[Side] = None
data_key: Optional[str] = None
data_source: Optional[str] = None
description: Optional[str] = None
io_type: Optional[str] = None # source/sink (dataflow) or target/source (device-style)
def to_registry_dict(self) -> Dict[str, Any]:
return self.model_dump(by_alias=True, exclude_none=True)
class ActionInputHandle(_ActionHandleBase):
"""
动作输入端口, 用于 @action handles, 序列化后归入 "input"
Example:
ActionInputHandle(
key="material_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source="handle",
)
"""
pass
class ActionOutputHandle(_ActionHandleBase):
"""
动作输出端口, 用于 @action handles, 序列化后归入 "output"
Example:
ActionOutputHandle(
key="station_output", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source="executor",
)
"""
pass
# ---------------------------------------------------------------------------
# HardwareInterface
# ---------------------------------------------------------------------------
class HardwareInterface(BaseModel):
"""
硬件通信接口定义
描述设备与底层硬件通信的方式 (串口、Modbus 等)。
Example:
HardwareInterface(name="hardware_interface", read="send_command", write="send_command")
"""
name: str
read: Optional[str] = None
write: Optional[str] = None
extra_info: Optional[List[str]] = None
# ---------------------------------------------------------------------------
# 全局注册表 -- 记录所有被装饰器标记的类/函数
# ---------------------------------------------------------------------------
_registered_devices: Dict[str, type] = {} # device_id -> class
_registered_resources: Dict[str, Any] = {} # resource_id -> class or function
def _device_handles_to_list(
handles: Optional[List[_DeviceHandleBase]],
) -> List[Dict[str, Any]]:
"""将设备/资源 Handle 列表序列化为字典列表 (含 io_type)"""
if handles is None:
return []
return [h.to_registry_dict() for h in handles]
def _action_handles_to_dict(
handles: Optional[List[_ActionHandleBase]],
) -> Dict[str, Any]:
"""
将动作 Handle 列表序列化为 {"input": [...], "output": [...]} 格式。
ActionInputHandle => "input", ActionOutputHandle => "output"
"""
if handles is None:
return {}
input_list = [h.to_registry_dict() for h in handles if isinstance(h, ActionInputHandle)]
output_list = [h.to_registry_dict() for h in handles if isinstance(h, ActionOutputHandle)]
result: Dict[str, Any] = {}
if input_list:
result["input"] = input_list
if output_list:
result["output"] = output_list
return result
# ---------------------------------------------------------------------------
# @device 类装饰器
# ---------------------------------------------------------------------------
# noinspection PyShadowingBuiltins
def device(
id: Optional[str] = None,
ids: Optional[List[str]] = None,
id_meta: Optional[Dict[str, Dict[str, Any]]] = None,
category: Optional[List[str]] = None,
description: str = "",
display_name: str = "",
icon: str = "",
version: str = "1.0.0",
handles: Optional[List[_DeviceHandleBase]] = None,
model: Optional[Dict[str, Any]] = None,
device_type: str = "python",
hardware_interface: Optional[HardwareInterface] = None,
):
"""
设备类装饰器
将类标记为一个 UniLab-OS 设备,并附加注册表元数据。
支持两种模式:
1. 单设备: id="xxx", category=[...]
2. 多设备: ids=["id1","id2"], id_meta={"id1":{handles:[...]}, "id2":{...}}
Args:
id: 单设备时的注册表唯一标识
ids: 多设备时的 id 列表,与 id_meta 配合使用
id_meta: 每个 device_id 的覆盖元数据 (handles/description/icon/model)
category: 设备分类标签列表 (必填)
description: 设备描述
display_name: 人类可读的设备显示名称,缺失时默认使用 id
icon: 图标路径
version: 版本号
handles: 设备端口列表 (单设备或 id_meta 未覆盖时使用)
model: 可选的 3D 模型配置
device_type: 设备实现类型 ("python" / "ros2")
hardware_interface: 硬件通信接口 (HardwareInterface)
"""
# Resolve device ids
if ids is not None:
device_ids = list(ids)
if not device_ids:
raise ValueError("@device ids 不能为空")
id_meta = id_meta or {}
elif id is not None:
device_ids = [id]
id_meta = {}
else:
raise ValueError("@device 必须提供 id 或 ids")
if category is None:
raise ValueError("@device category 必填")
base_meta = {
"category": category,
"description": description,
"display_name": display_name,
"icon": icon,
"version": version,
"handles": _device_handles_to_list(handles),
"model": model,
"device_type": device_type,
"hardware_interface": (hardware_interface.model_dump(exclude_none=True) if hardware_interface else None),
}
def decorator(cls):
cls._device_registry_meta = base_meta
cls._device_registry_id_meta = id_meta
cls._device_registry_ids = device_ids
for did in device_ids:
if did in _registered_devices:
raise ValueError(f"@device id 重复: '{did}' 已被 {_registered_devices[did]} 注册")
_registered_devices[did] = cls
return cls
return decorator
# ---------------------------------------------------------------------------
# @action 方法装饰器
# ---------------------------------------------------------------------------
# 区分 "用户没传 action_type" 和 "用户传了 None"
_ACTION_TYPE_UNSET = object()
# noinspection PyShadowingNames
def action(
action_type: Any = _ACTION_TYPE_UNSET,
goal: Optional[Dict[str, str]] = None,
feedback: Optional[Dict[str, str]] = None,
result: Optional[Dict[str, str]] = None,
handles: Optional[List[_ActionHandleBase]] = None,
goal_default: Optional[Dict[str, Any]] = None,
placeholder_keys: Optional[Dict[str, str]] = None,
always_free: bool = False,
is_protocol: bool = False,
description: str = "",
auto_prefix: bool = False,
parent: bool = False,
):
"""
动作方法装饰器
标记方法为注册表动作。有三种用法:
1. @action(action_type=EmptyIn, ...) -- 非 auto, 使用指定 ROS Action 类型
2. @action() -- 非 auto, UniLabJsonCommand (从方法签名生成 schema)
3. 不加 @action -- auto- 前缀, UniLabJsonCommand
Protocol 用法:
@action(action_type=Add, is_protocol=True)
def AddProtocol(self): ...
标记该动作为高级协议 (protocol),运行时通过 ROS Action 路由到
protocol generator 执行。action_type 指向 unilabos_msgs 的 Action 类型。
Args:
action_type: ROS Action 消息类型 (如 EmptyIn, SendCmd, HeatChill).
不传/默认 = UniLabJsonCommand (非 auto).
goal: Goal 字段映射 (ROS字段名 -> 设备参数名).
protocol 模式下可留空,系统自动生成 identity 映射.
feedback: Feedback 字段映射
result: Result 字段映射
handles: 动作端口列表 (ActionInputHandle / ActionOutputHandle)
goal_default: Goal 字段默认值映射 (字段名 -> 默认值), 与自动生成的 goal_default 合并
placeholder_keys: 参数占位符配置
always_free: 是否为永久闲置动作 (不受排队限制)
is_protocol: 是否为工作站协议 (protocol)。True 时运行时走 protocol generator 路径。
description: 动作描述
auto_prefix: 若为 True动作名使用 auto-{method_name} 形式(与无 @action 时一致)
parent: 若为 True当方法参数为空 (*args, **kwargs) 时,通过 MRO 从父类获取真实方法参数
"""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
# action_type 为哨兵值 => 用户没传, 视为 None (UniLabJsonCommand)
resolved_type = None if action_type is _ACTION_TYPE_UNSET else action_type
meta = {
"action_type": resolved_type,
"goal": goal or {},
"feedback": feedback or {},
"result": result or {},
"handles": _action_handles_to_dict(handles),
"goal_default": goal_default or {},
"placeholder_keys": placeholder_keys or {},
"always_free": always_free,
"is_protocol": is_protocol,
"description": description,
"auto_prefix": auto_prefix,
"parent": parent,
}
wrapper._action_registry_meta = meta # type: ignore[attr-defined]
# 设置 _is_always_free 保持与旧 @always_free 装饰器兼容
if always_free:
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
return decorator
def get_action_meta(func) -> Optional[Dict[str, Any]]:
"""获取方法上的 @action 装饰器元数据"""
return getattr(func, "_action_registry_meta", None)
def has_action_decorator(func) -> bool:
"""检查函数是否带有 @action 装饰器"""
return hasattr(func, "_action_registry_meta")
# ---------------------------------------------------------------------------
# @resource 类/函数装饰器
# ---------------------------------------------------------------------------
def resource(
id: str,
category: List[str],
description: str = "",
icon: str = "",
version: str = "1.0.0",
handles: Optional[List[_DeviceHandleBase]] = None,
model: Optional[Dict[str, Any]] = None,
class_type: str = "pylabrobot",
):
"""
资源类/函数装饰器
将类或工厂函数标记为一个 UniLab-OS 资源,附加注册表元数据。
Args:
id: 注册表唯一标识 (必填, 不可重复)
category: 资源分类标签列表 (必填)
description: 资源描述
icon: 图标路径
version: 版本号
handles: 端口列表 (InputHandle / OutputHandle)
model: 可选的 3D 模型配置
class_type: 资源实现类型 ("python" / "pylabrobot" / "unilabos")
"""
def decorator(obj):
meta = {
"resource_id": id,
"category": category,
"description": description,
"icon": icon,
"version": version,
"handles": _device_handles_to_list(handles),
"model": model,
"class_type": class_type,
}
obj._resource_registry_meta = meta
if id in _registered_resources:
raise ValueError(f"@resource id 重复: '{id}' 已被 {_registered_resources[id]} 注册")
_registered_resources[id] = obj
return obj
return decorator
def get_device_meta(cls, device_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
获取类上的 @device 装饰器元数据。
当 device_id 存在且类使用 ids+id_meta 时,返回合并后的 meta
(base_meta 与 id_meta[device_id] 深度合并)。
"""
base = getattr(cls, "_device_registry_meta", None)
if base is None:
return None
id_meta = getattr(cls, "_device_registry_id_meta", None) or {}
if device_id is None or device_id not in id_meta:
result = dict(base)
ids = getattr(cls, "_device_registry_ids", None)
result["device_id"] = device_id if device_id is not None else (ids[0] if ids else None)
return result
overrides = id_meta[device_id]
result = dict(base)
result["device_id"] = device_id
for key in ["handles", "description", "icon", "model"]:
if key in overrides:
val = overrides[key]
if key == "handles" and isinstance(val, list):
# handles 必须是 Handle 对象列表
result[key] = [h.to_registry_dict() for h in val]
else:
result[key] = val
return result
def get_resource_meta(obj) -> Optional[Dict[str, Any]]:
"""获取对象上的 @resource 装饰器元数据"""
return getattr(obj, "_resource_registry_meta", None)
def get_all_registered_devices() -> Dict[str, type]:
"""获取所有已注册的设备类"""
return _registered_devices.copy()
def get_all_registered_resources() -> Dict[str, Any]:
"""获取所有已注册的资源"""
return _registered_resources.copy()
def clear_registry():
"""清空全局注册表 (用于测试)"""
_registered_devices.clear()
_registered_resources.clear()
# ---------------------------------------------------------------------------
# topic_config / not_action / always_free 装饰器
# ---------------------------------------------------------------------------
def topic_config(
period: Optional[float] = None,
print_publish: Optional[bool] = None,
qos: Optional[int] = None,
name: Optional[str] = None,
) -> Callable[[F], F]:
"""
Topic发布配置装饰器
用于装饰 get_{attr_name} 方法或 @property控制对应属性的ROS topic发布行为。
Args:
period: 发布周期。None 表示使用默认值 5.0
print_publish: 是否打印发布日志。None 表示使用节点默认配置
qos: QoS深度配置。None 表示使用默认值 10
name: 自定义发布名称。None 表示使用方法名(去掉 get_ 前缀)
Note:
与 @property 连用时,@topic_config 必须放在 @property 下面,
这样装饰器执行顺序为:先 topic_config 添加配置,再 property 包装。
"""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._topic_period = period # type: ignore[attr-defined]
wrapper._topic_print_publish = print_publish # type: ignore[attr-defined]
wrapper._topic_qos = qos # type: ignore[attr-defined]
wrapper._topic_name = name # type: ignore[attr-defined]
wrapper._has_topic_config = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
return decorator
def get_topic_config(func) -> dict:
"""获取函数上的 topic 配置 (period, print_publish, qos, name)"""
if hasattr(func, "_has_topic_config") and getattr(func, "_has_topic_config", False):
return {
"period": getattr(func, "_topic_period", None),
"print_publish": getattr(func, "_topic_print_publish", None),
"qos": getattr(func, "_topic_qos", None),
"name": getattr(func, "_topic_name", None),
}
return {}
def always_free(func: F) -> F:
"""
标记动作为永久闲置(不受busy队列限制)的装饰器
被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制,
任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_always_free(func) -> bool:
"""检查函数是否被标记为永久闲置"""
return getattr(func, "_is_always_free", False)
def not_action(func: F) -> F:
"""
标记方法为非动作的装饰器
用于装饰 driver 类中的方法,使其在注册表扫描时不被识别为动作。
适用于辅助方法、内部工具方法等不应暴露为设备动作的公共方法。
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_not_action = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_not_action(func) -> bool:
"""检查函数是否被标记为非动作"""
return getattr(func, "_is_not_action", False)

File diff suppressed because it is too large Load Diff

699
unilabos/registry/utils.py Normal file
View File

@@ -0,0 +1,699 @@
"""
注册表工具函数
从 registry.py 中提取的纯工具函数,包括:
- docstring 解析
- 类型字符串 → JSON Schema 转换
- AST 类型节点解析
- TypedDict / Slot / Handle 等辅助检测
"""
import inspect
import logging
import re
import typing
from typing import Any, Dict, List, Optional, Tuple, Union
from msgcenterpy.instances.typed_dict_instance import TypedDictMessageInstance
from unilabos.utils.cls_creator import import_class
_logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 异常
# ---------------------------------------------------------------------------
class ROSMsgNotFound(Exception):
pass
# ---------------------------------------------------------------------------
# Docstring 解析 (Google-style)
# ---------------------------------------------------------------------------
_SECTION_RE = re.compile(r"^(\w[\w\s]*):\s*$")
def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
"""
解析 Google-style docstring提取描述和参数说明。
Returns:
{"description": "短描述", "params": {"param1": "参数1描述", ...}}
"""
result: Dict[str, Any] = {"description": "", "params": {}}
if not docstring:
return result
lines = docstring.strip().splitlines()
if not lines:
return result
result["description"] = lines[0].strip()
in_args = False
current_param: Optional[str] = None
current_desc_parts: list = []
for line in lines[1:]:
stripped = line.strip()
section_match = _SECTION_RE.match(stripped)
if section_match:
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
current_param = None
current_desc_parts = []
section_name = section_match.group(1).lower()
in_args = section_name in ("args", "arguments", "parameters", "params")
continue
if not in_args:
continue
if ":" in stripped and not stripped.startswith(" "):
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
param_part, _, desc_part = stripped.partition(":")
param_name = param_part.strip().split("(")[0].strip()
current_param = param_name
current_desc_parts = [desc_part.strip()]
elif current_param is not None:
aline = line
if aline.startswith(" "):
aline = aline[4:]
elif aline.startswith("\t"):
aline = aline[1:]
current_desc_parts.append(aline.strip())
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
return result
# ---------------------------------------------------------------------------
# 类型常量
# ---------------------------------------------------------------------------
SIMPLE_TYPE_MAP = {
"str": "string",
"string": "string",
"int": "integer",
"integer": "integer",
"float": "number",
"number": "number",
"bool": "boolean",
"boolean": "boolean",
"list": "array",
"array": "array",
"dict": "object",
"object": "object",
}
ARRAY_TYPES = {"list", "List", "tuple", "Tuple", "set", "Set", "Sequence", "Iterable"}
OBJECT_TYPES = {"dict", "Dict", "Mapping"}
WRAPPER_TYPES = {"Optional"}
SLOT_TYPES = {"ResourceSlot", "DeviceSlot"}
# ---------------------------------------------------------------------------
# 简单类型映射
# ---------------------------------------------------------------------------
def get_json_schema_type(type_str: str) -> str:
"""简单类型名 -> JSON Schema type"""
return SIMPLE_TYPE_MAP.get(type_str.lower(), "string")
# ---------------------------------------------------------------------------
# AST 类型解析
# ---------------------------------------------------------------------------
def parse_type_node(type_str: str):
"""将类型注解字符串解析为 AST 节点,失败返回 None。"""
import ast as _ast
try:
return _ast.parse(type_str.strip(), mode="eval").body
except Exception:
return None
def _collect_bitor(node, out: list):
"""递归收集 X | Y | Z 的所有分支。"""
import ast as _ast
if isinstance(node, _ast.BinOp) and isinstance(node.op, _ast.BitOr):
_collect_bitor(node.left, out)
_collect_bitor(node.right, out)
else:
out.append(node)
def type_node_to_schema(
node,
import_map: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""将 AST 类型注解节点递归转换为 JSON Schema dict。
当提供 import_map 时,对于未知类名会尝试通过 import_map 解析模块路径,
然后 import 真实类型对象来生成 schema (支持 TypedDict 等)。
映射规则:
- Optional[X] → X 的 schema (剥掉 Optional)
- Union[X, Y] → {"anyOf": [X_schema, Y_schema]}
- List[X] / Tuple[X] / Set[X] → {"type": "array", "items": X_schema}
- Dict[K, V] → {"type": "object", "additionalProperties": V_schema}
- Literal["a", "b"] → {"type": "string", "enum": ["a", "b"]}
- TypedDict (via import_map) → {"type": "object", "properties": {...}}
- 基本类型 str/int/... → {"type": "string"/"integer"/...}
"""
import ast as _ast
# --- Name 节点: str / int / dict / ResourceSlot / 自定义类 ---
if isinstance(node, _ast.Name):
name = node.id
if name in SLOT_TYPES:
return {"$slot": name}
json_type = SIMPLE_TYPE_MAP.get(name.lower())
if json_type:
return {"type": json_type}
# 尝试通过 import_map 解析并 import 真实类型
if import_map and name in import_map:
type_obj = resolve_type_object(import_map[name])
if type_obj is not None:
return type_to_schema(type_obj)
# 未知类名 → 无法转 schema 的自定义类型默认当 object
return {"type": "object"}
if isinstance(node, _ast.Constant):
if isinstance(node.value, str):
return {"type": SIMPLE_TYPE_MAP.get(node.value.lower(), "string")}
return {"type": "string"}
# --- Subscript 节点: List[X], Dict[K,V], Optional[X], Literal[...] 等 ---
if isinstance(node, _ast.Subscript):
base_name = node.value.id if isinstance(node.value, _ast.Name) else ""
# Optional[X] → 剥掉
if base_name in WRAPPER_TYPES:
return type_node_to_schema(node.slice, import_map)
# Union[X, None] → 剥掉 None; Union[X, Y] → anyOf
if base_name == "Union":
elts = node.slice.elts if isinstance(node.slice, _ast.Tuple) else [node.slice]
non_none = [
e
for e in elts
if not (isinstance(e, _ast.Constant) and e.value is None)
and not (isinstance(e, _ast.Name) and e.id == "None")
]
if len(non_none) == 1:
return type_node_to_schema(non_none[0], import_map)
if len(non_none) > 1:
return {"anyOf": [type_node_to_schema(e, import_map) for e in non_none]}
return {"type": "string"}
# Literal["a", "b", 1] → enum
if base_name == "Literal":
elts = node.slice.elts if isinstance(node.slice, _ast.Tuple) else [node.slice]
values = []
for e in elts:
if isinstance(e, _ast.Constant):
values.append(e.value)
elif isinstance(e, _ast.Name):
values.append(e.id)
if values:
return {"type": "string", "enum": values}
return {"type": "string"}
# List / Tuple / Set → array
if base_name in ARRAY_TYPES:
if isinstance(node.slice, _ast.Tuple) and node.slice.elts:
inner_node = node.slice.elts[0]
else:
inner_node = node.slice
return {"type": "array", "items": type_node_to_schema(inner_node, import_map)}
# Dict → object
if base_name in OBJECT_TYPES:
schema: Dict[str, Any] = {"type": "object"}
if isinstance(node.slice, _ast.Tuple) and len(node.slice.elts) >= 2:
val_node = node.slice.elts[1]
# Dict[str, Any] → 不加 additionalProperties (Any 等同于无约束)
is_any = (isinstance(val_node, _ast.Name) and val_node.id == "Any") or (
isinstance(val_node, _ast.Constant) and val_node.value is None
)
if not is_any:
val_schema = type_node_to_schema(val_node, import_map)
schema["additionalProperties"] = val_schema
return schema
# --- BinOp: X | Y (Python 3.10+) → 当 Union 处理 ---
if isinstance(node, _ast.BinOp) and isinstance(node.op, _ast.BitOr):
parts: list = []
_collect_bitor(node, parts)
non_none = [
p
for p in parts
if not (isinstance(p, _ast.Constant) and p.value is None)
and not (isinstance(p, _ast.Name) and p.id == "None")
]
if len(non_none) == 1:
return type_node_to_schema(non_none[0], import_map)
if len(non_none) > 1:
return {"anyOf": [type_node_to_schema(p, import_map) for p in non_none]}
return {"type": "string"}
return {"type": "string"}
# ---------------------------------------------------------------------------
# 真实类型对象解析 (import-based)
# ---------------------------------------------------------------------------
def resolve_type_object(type_ref: str) -> Optional[Any]:
"""通过 'module.path:ClassName' 格式的引用 import 并返回真实类型对象。
对于 typing 内置名 (str, int, List 等) 直接返回 None (由 AST 路径处理)。
import 失败时静默返回 None。
"""
if ":" not in type_ref:
return None
try:
return import_class(type_ref)
except Exception:
return None
def is_typed_dict_class(obj: Any) -> bool:
"""检查对象是否是 TypedDict 类。"""
if obj is None:
return False
try:
from typing_extensions import is_typeddict
return is_typeddict(obj)
except ImportError:
if isinstance(obj, type):
return hasattr(obj, "__required_keys__") and hasattr(obj, "__optional_keys__")
return False
def type_to_schema(tp: Any) -> Dict[str, Any]:
"""将真实 typing 对象递归转换为 JSON Schema dict。
支持:
- 基本类型: str, int, float, bool → {"type": "string"/"integer"/...}
- typing 泛型: List[X], Dict[K,V], Optional[X], Union[X,Y], Literal[...]
- TypedDict → {"type": "object", "properties": {...}, "required": [...]}
- 自定义类 (ResourceSlot 等) → {"$slot": "..."} 或 {"type": "string"}
"""
origin = getattr(tp, "__origin__", None)
args = getattr(tp, "__args__", None)
# --- None / NoneType ---
if tp is type(None):
return {"type": "null"}
# --- 基本类型 ---
if tp is str:
return {"type": "string"}
if tp is int:
return {"type": "integer"}
if tp is float:
return {"type": "number"}
if tp is bool:
return {"type": "boolean"}
# --- TypedDict ---
if is_typed_dict_class(tp):
try:
return TypedDictMessageInstance.get_json_schema_from_typed_dict(tp)
except Exception:
return {"type": "object"}
# --- Literal ---
if origin is typing.Literal:
values = list(args) if args else []
return {"type": "string", "enum": values}
# --- Optional / Union ---
if origin is typing.Union:
non_none = [a for a in (args or ()) if a is not type(None)]
if len(non_none) == 1:
return type_to_schema(non_none[0])
if len(non_none) > 1:
return {"anyOf": [type_to_schema(a) for a in non_none]}
return {"type": "string"}
# --- List / Sequence / Set / Tuple / Iterable ---
if origin in (list, tuple, set, frozenset) or (
origin is not None
and getattr(origin, "__name__", "") in ("Sequence", "Iterable", "Iterator", "MutableSequence")
):
if args:
return {"type": "array", "items": type_to_schema(args[0])}
return {"type": "array"}
# --- Dict / Mapping ---
if origin in (dict,) or (origin is not None and getattr(origin, "__name__", "") in ("Mapping", "MutableMapping")):
schema: Dict[str, Any] = {"type": "object"}
if args and len(args) >= 2:
schema["additionalProperties"] = type_to_schema(args[1])
return schema
# --- Slot 类型 ---
if isinstance(tp, type):
name = tp.__name__
if name in SLOT_TYPES:
return {"$slot": name}
# --- 其他未知类型 fallback ---
if isinstance(tp, type):
return {"type": "object"}
return {"type": "string"}
# ---------------------------------------------------------------------------
# Slot / Placeholder 检测
# ---------------------------------------------------------------------------
def detect_slot_type(ptype) -> Tuple[Optional[str], bool]:
"""检测参数类型是否为 ResourceSlot / DeviceSlot。
兼容多种格式:
- runtime: "unilabos.registry.placeholder_type:ResourceSlot"
- runtime tuple: ("list", "unilabos.registry.placeholder_type:ResourceSlot")
- AST 裸名: "ResourceSlot", "List[ResourceSlot]", "Optional[ResourceSlot]"
Returns: (slot_name | None, is_list)
"""
ptype_str = str(ptype)
# 快速路径: 字符串里根本没有 Slot
if "ResourceSlot" not in ptype_str and "DeviceSlot" not in ptype_str:
return (None, False)
# runtime 格式: 完整模块路径
if isinstance(ptype, str):
if ptype.endswith(":ResourceSlot") or ptype == "ResourceSlot":
return ("ResourceSlot", False)
if ptype.endswith(":DeviceSlot") or ptype == "DeviceSlot":
return ("DeviceSlot", False)
# AST 复杂格式: List[ResourceSlot], Optional[ResourceSlot] 等
if "[" in ptype:
node = parse_type_node(ptype)
if node is not None:
schema = type_node_to_schema(node)
# 直接是 slot
if "$slot" in schema:
return (schema["$slot"], False)
# array 包裹 slot: {"type": "array", "items": {"$slot": "..."}}
items = schema.get("items", {})
if isinstance(items, dict) and "$slot" in items:
return (items["$slot"], True)
return (None, False)
# runtime tuple 格式
if isinstance(ptype, tuple) and len(ptype) == 2:
inner_str = str(ptype[1])
if "ResourceSlot" in inner_str:
return ("ResourceSlot", True)
if "DeviceSlot" in inner_str:
return ("DeviceSlot", True)
return (None, False)
def detect_placeholder_keys(params: list) -> Dict[str, str]:
"""Detect parameters that reference ResourceSlot or DeviceSlot."""
result: Dict[str, str] = {}
for p in params:
ptype = p.get("type", "")
if "ResourceSlot" in str(ptype):
result[p["name"]] = "unilabos_resources"
elif "DeviceSlot" in str(ptype):
result[p["name"]] = "unilabos_devices"
return result
# ---------------------------------------------------------------------------
# Handle 规范化
# ---------------------------------------------------------------------------
def normalize_ast_handles(handles_raw: Any) -> List[Dict[str, Any]]:
"""Convert AST-parsed handle structures to the standard registry format."""
if not handles_raw:
return []
# handle_type → io_type 映射 (AST 内部类名 → YAML 标准字段值)
_HANDLE_TYPE_TO_IO_TYPE = {
"input": "target",
"output": "source",
"action_input": "action_target",
"action_output": "action_source",
}
result: List[Dict[str, Any]] = []
for h in handles_raw:
if isinstance(h, dict):
call = h.get("_call", "")
if "InputHandle" in call:
handle_type = "input"
elif "OutputHandle" in call:
handle_type = "output"
elif "ActionInputHandle" in call:
handle_type = "action_input"
elif "ActionOutputHandle" in call:
handle_type = "action_output"
else:
handle_type = h.get("handle_type", "unknown")
io_type = _HANDLE_TYPE_TO_IO_TYPE.get(handle_type, handle_type)
entry: Dict[str, Any] = {
"handler_key": h.get("key", ""),
"data_type": h.get("data_type", ""),
"io_type": io_type,
}
side = h.get("side")
if side:
if isinstance(side, str) and "." in side:
val = side.rsplit(".", 1)[-1]
side = val.lower() if val in ("LEFT", "RIGHT", "TOP", "BOTTOM") else val
entry["side"] = side
label = h.get("label")
if label:
entry["label"] = label
data_key = h.get("data_key")
if data_key:
entry["data_key"] = data_key
data_source = h.get("data_source")
if data_source:
if isinstance(data_source, str) and "." in data_source:
val = data_source.rsplit(".", 1)[-1]
data_source = val.lower() if val in ("HANDLE", "EXECUTOR") else val
entry["data_source"] = data_source
description = h.get("description")
if description:
entry["description"] = description
result.append(entry)
return result
def normalize_ast_action_handles(handles_raw: Any) -> Dict[str, Any]:
"""Convert AST-parsed action handle list to {"input": [...], "output": [...]}.
Mirrors the runtime behavior of decorators._action_handles_to_dict:
- ActionInputHandle => grouped under "input"
- ActionOutputHandle => grouped under "output"
Field mapping: key -> handler_key (matches Pydantic serialization_alias).
"""
if not handles_raw or not isinstance(handles_raw, list):
return {}
input_list: List[Dict[str, Any]] = []
output_list: List[Dict[str, Any]] = []
for h in handles_raw:
if not isinstance(h, dict):
continue
call = h.get("_call", "")
is_input = "ActionInputHandle" in call or "InputHandle" in call
is_output = "ActionOutputHandle" in call or "OutputHandle" in call
entry: Dict[str, Any] = {
"handler_key": h.get("key", ""),
"data_type": h.get("data_type", ""),
"label": h.get("label", ""),
}
for opt_key in ("side", "data_key", "data_source", "description", "io_type"):
val = h.get(opt_key)
if val is not None:
# Only resolve enum-style refs (e.g. DataSource.HANDLE -> handle) for data_source/side
# data_key values like "wells.@flatten", "@this.0@@@plate" must be preserved as-is
if (
isinstance(val, str)
and "." in val
and opt_key not in ("io_type", "data_key")
):
val = val.rsplit(".", 1)[-1].lower()
entry[opt_key] = val
# io_type: only add when explicitly set; do not default output to "sink" (YAML convention omits it)
if "io_type" not in entry and is_input:
entry["io_type"] = "source"
if is_input:
input_list.append(entry)
elif is_output:
output_list.append(entry)
result: Dict[str, Any] = {}
if input_list:
result["input"] = input_list
# Always include output (empty list when no outputs) to match YAML
result["output"] = output_list
return result
# ---------------------------------------------------------------------------
# Schema 辅助
# ---------------------------------------------------------------------------
def wrap_action_schema(
goal_schema: Dict[str, Any],
action_name: str,
description: str = "",
result_schema: Optional[Dict[str, Any]] = None,
feedback_schema: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
将 goal 参数 schema 包装为标准的 action schema 格式:
{ "properties": { "goal": ..., "feedback": ..., "result": ... }, ... }
"""
# 去掉 auto- 前缀用于 title/description与 YAML 路径保持一致
display_name = action_name.removeprefix("auto-")
return {
"title": f"{display_name}参数",
"description": description or f"{display_name}的参数schema",
"type": "object",
"properties": {
"goal": goal_schema,
"feedback": feedback_schema or {},
"result": result_schema or {},
},
"required": ["goal"],
}
def preserve_field_descriptions(new_schema: Dict[str, Any], prev_schema: Dict[str, Any]):
"""保留之前 schema 中的 field descriptions"""
if not prev_schema or not new_schema:
return
prev_props = prev_schema.get("properties", {})
new_props = new_schema.get("properties", {})
for field_name, prev_field in prev_props.items():
if field_name in new_props and "title" in prev_field:
new_props[field_name].setdefault("title", prev_field["title"])
# ---------------------------------------------------------------------------
# 深度对比
# ---------------------------------------------------------------------------
def _short(val, limit=120):
"""截断过长的值用于日志显示。"""
s = repr(val)
return s if len(s) <= limit else s[:limit] + "..."
def deep_diff(old, new, path="", max_depth=10) -> list:
"""递归对比两个对象,返回所有差异的描述列表。"""
diffs = []
if max_depth <= 0:
if old != new:
diffs.append(f"{path}: (达到最大深度) OLD≠NEW")
return diffs
if type(old) != type(new):
diffs.append(f"{path}: 类型不同 OLD={type(old).__name__}({_short(old)}) NEW={type(new).__name__}({_short(new)})")
return diffs
if isinstance(old, dict):
old_keys = set(old.keys())
new_keys = set(new.keys())
for k in sorted(new_keys - old_keys):
diffs.append(f"{path}.{k}: 新增字段 (AST有, YAML无) = {_short(new[k])}")
for k in sorted(old_keys - new_keys):
diffs.append(f"{path}.{k}: 缺失字段 (YAML有, AST无) = {_short(old[k])}")
for k in sorted(old_keys & new_keys):
diffs.extend(deep_diff(old[k], new[k], f"{path}.{k}", max_depth - 1))
elif isinstance(old, (list, tuple)):
if len(old) != len(new):
diffs.append(f"{path}: 列表长度不同 OLD={len(old)} NEW={len(new)}")
for i in range(min(len(old), len(new))):
diffs.extend(deep_diff(old[i], new[i], f"{path}[{i}]", max_depth - 1))
if len(new) > len(old):
for i in range(len(old), len(new)):
diffs.append(f"{path}[{i}]: 新增元素 = {_short(new[i])}")
elif len(old) > len(new):
for i in range(len(new), len(old)):
diffs.append(f"{path}[{i}]: 缺失元素 = {_short(old[i])}")
else:
if old != new:
diffs.append(f"{path}: OLD={_short(old)} NEW={_short(new)}")
return diffs
# ---------------------------------------------------------------------------
# MRO 方法参数解析
# ---------------------------------------------------------------------------
def resolve_method_params_via_import(module_str: str, method_name: str) -> Dict[str, str]:
"""当 AST 方法参数为空 (如 *args, **kwargs) 时, import class 并通过 MRO 获取真实方法参数.
返回 identity mapping {param_name: param_name}.
"""
if not module_str or ":" not in module_str:
return {}
try:
cls = import_class(module_str)
except Exception as e:
_logger.debug(f"[AST] resolve_method_params_via_import: import_class('{module_str}') failed: {e}")
return {}
try:
for base_cls in cls.__mro__:
if method_name not in base_cls.__dict__:
continue
method = base_cls.__dict__[method_name]
actual = getattr(method, "__wrapped__", method)
if isinstance(actual, (staticmethod, classmethod)):
actual = actual.__func__
if not callable(actual):
continue
sig = inspect.signature(actual, follow_wrapped=True)
params = [
p.name for p in sig.parameters.values()
if p.name not in ("self", "cls")
and p.kind not in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD)
]
if params:
return {p: p for p in params}
except Exception as e:
_logger.debug(f"[AST] resolve_method_params_via_import: MRO walk for '{method_name}' failed: {e}")
return {}

View File

@@ -12,9 +12,11 @@ class RegularContainer(Container):
kwargs["size_y"] = 0
if "size_z" not in kwargs:
kwargs["size_z"] = 0
if "category" not in kwargs:
kwargs["category"] = "container"
self.kwargs = kwargs
super().__init__(*args, category="container", **kwargs)
super().__init__(*args, **kwargs)
def load_state(self, state: Dict[str, Any]):
super().load_state(state)

View File

@@ -76,7 +76,7 @@ def canonicalize_nodes_data(
if sample_id:
logger.error(f"{node}的sample_id参数已弃用sample_id: {sample_id}")
for k in list(node.keys()):
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children", "pose", "extra"]:
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children", "pose", "extra", "machine_name"]:
v = node.pop(k)
node["config"][k] = v
if outer_host_node_id is not None:
@@ -288,6 +288,15 @@ def read_node_link_json(
physical_setup_graph = nx.node_link_graph(graph_data, edges="links", multigraph=False)
handle_communications(physical_setup_graph)
# Stamp machine_name on device trees only (resources are cloud-managed)
local_machine = BasicConfig.machine_name or "本地"
for tree in resource_tree_set.trees:
if tree.root_node.res_content.type != "device":
continue
for node in tree.get_all_nodes():
if not node.res_content.machine_name:
node.res_content.machine_name = local_machine
return physical_setup_graph, resource_tree_set, standardized_links
@@ -372,6 +381,15 @@ def read_graphml(graphml_file: str) -> tuple[nx.Graph, ResourceTreeSet, List[Dic
physical_setup_graph = nx.node_link_graph(graph_data, link="links", multigraph=False)
handle_communications(physical_setup_graph)
# Stamp machine_name on device trees only (resources are cloud-managed)
local_machine = BasicConfig.machine_name or "本地"
for tree in resource_tree_set.trees:
if tree.root_node.res_content.type != "device":
continue
for node in tree.get_all_nodes():
if not node.res_content.machine_name:
node.res_content.machine_name = local_machine
return physical_setup_graph, resource_tree_set, standardized_links

View File

@@ -75,14 +75,6 @@ class ResourceDictPositionObject(BaseModel):
z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPoseExtraObjectType(BaseModel):
z_index: int
class ResourceDictPoseExtraObject(BaseModel):
z_index: Optional[int] = Field(alias="zIndex", default=None)
class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType
@@ -109,7 +101,7 @@ class ResourceDictPosition(BaseModel):
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field(
description="Cross section type", default="rectangle"
)
extra: Optional[ResourceDictPoseExtraObject] = Field(description="Extra data", default=None)
extra: Optional[Dict[str, Any]] = Field(description="Extra data", default=None)
class ResourceDictType(TypedDict):
@@ -128,6 +120,7 @@ class ResourceDictType(TypedDict):
config: Dict[str, Any]
data: Dict[str, Any]
extra: Dict[str, Any]
machine_name: str
# 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化
@@ -149,6 +142,7 @@ class ResourceDict(BaseModel):
config: Dict[str, Any] = Field(description="Resource configuration")
data: Dict[str, Any] = Field(description="Resource data, eg: container liquid data")
extra: Dict[str, Any] = Field(description="Extra data, eg: slot index")
machine_name: str = Field(description="Machine this resource belongs to", default="")
@field_serializer("parent_uuid")
def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]):
@@ -204,22 +198,30 @@ class ResourceDictInstance(object):
self.typ = "dict"
@classmethod
def get_resource_instance_from_dict(cls, content: Dict[str, Any]) -> "ResourceDictInstance":
def get_resource_instance_from_dict(cls, content: ResourceDictType) -> "ResourceDictInstance":
"""从字典创建资源实例"""
if "id" not in content:
content["id"] = content["name"]
if "uuid" not in content:
content["uuid"] = str(uuid.uuid4())
if "description" in content and content["description"] is None:
# noinspection PyTypedDict
del content["description"]
if "model" in content and content["model"] is None:
# noinspection PyTypedDict
del content["model"]
# noinspection PyTypedDict
if "schema" in content and content["schema"] is None:
# noinspection PyTypedDict
del content["schema"]
# noinspection PyTypedDict
if "x" in content.get("position", {}):
# 说明是老版本的position格式转换成新的
# noinspection PyTypedDict
content["position"] = {"position": content["position"]}
# noinspection PyTypedDict
if not content.get("class"):
# noinspection PyTypedDict
content["class"] = ""
if not content.get("config"): # todo: 后续从后端保证字段非空
content["config"] = {}
@@ -230,16 +232,18 @@ class ResourceDictInstance(object):
if "position" in content:
pose = content.get("pose", {})
if "position" not in pose:
# noinspection PyTypedDict
if "position" in content["position"]:
# noinspection PyTypedDict
pose["position"] = content["position"]["position"]
else:
pose["position"] = {"x": 0, "y": 0, "z": 0}
pose["position"] = ResourceDictPositionObjectType(x=0, y=0, z=0)
if "size" not in pose:
pose["size"] = {
"width": content["config"].get("size_x", 0),
"height": content["config"].get("size_y", 0),
"depth": content["config"].get("size_z", 0),
}
pose["size"] = ResourceDictPositionSizeType(
width= content["config"].get("size_x", 0),
height= content["config"].get("size_y", 0),
depth= content["config"].get("size_z", 0),
)
content["pose"] = pose
try:
res_dict = ResourceDict.model_validate(content)
@@ -407,7 +411,7 @@ class ResourceTreeSet(object):
)
@classmethod
def from_plr_resources(cls, resources: List["PLRResource"], known_newly_created=False) -> "ResourceTreeSet":
def from_plr_resources(cls, resources: List["PLRResource"], known_newly_created=False, old_size=False) -> "ResourceTreeSet":
"""
从plr资源创建ResourceTreeSet
"""
@@ -430,13 +434,20 @@ class ResourceTreeSet(object):
"resource_group": "resource_group",
"trash": "trash",
"plate_adapter": "plate_adapter",
"consumable": "consumable",
"tool": "tool",
"condenser": "condenser",
"crucible": "crucible",
"reagent_bottle": "reagent_bottle",
"flask": "flask",
"beaker": "beaker",
}
if source in replace_info:
return replace_info[source]
elif source is None:
return ""
else:
print("转换pylabrobot的时候出现未知类型", source)
logger.trace(f"转换pylabrobot的时候出现未知类型 {source}")
return source
def build_uuid_mapping(res: "PLRResource", uuid_list: list, parent_uuid: Optional[str] = None):
@@ -491,7 +502,7 @@ class ResourceTreeSet(object):
k: v
for k, v in d.items()
if k
not in [
not in ([
"name",
"children",
"parent_name",
@@ -502,7 +513,15 @@ class ResourceTreeSet(object):
"size_z",
"cross_section_type",
"bottom_type",
]
] if not old_size else [
"name",
"children",
"parent_name",
"location",
"rotation",
"cross_section_type",
"bottom_type",
])
},
"data": states[d["name"]],
"extra": extra,
@@ -801,7 +820,8 @@ class ResourceTreeSet(object):
if remote_root_type == "device":
# 情况1: 一级是 device
if remote_root_id not in local_device_map:
logger.warning(f"Device '{remote_root_id}' 在本地不存在,跳过该 device 下的物料同步")
if remote_root_id != "host_node":
logger.warning(f"Device '{remote_root_id}' 在本地不存在,跳过该 device 下的物料同步")
continue
local_device = local_device_map[remote_root_id]
@@ -848,14 +868,27 @@ class ResourceTreeSet(object):
f"从远端同步了 {added_count} 个物料子树"
)
else:
# 情况2: 二级物料(不是 device
if remote_child_name not in local_children_map:
# 引入整个子树
remote_child.res_content.parent = local_device.res_content
local_device.children.append(remote_child)
logger.info(f"Device '{remote_root_id}': 从远端同步物料子树 '{remote_child_name}'")
else:
logger.info(f"物料 '{remote_root_id}/{remote_child_name}' 已存在,跳过")
# 二级物料已存在,比较三级子节点是否缺失
local_material = local_children_map[remote_child_name]
local_material_children_map = {child.res_content.name: child for child in
local_material.children}
added_count = 0
for remote_sub in remote_child.children:
remote_sub_name = remote_sub.res_content.name
if remote_sub_name not in local_material_children_map:
remote_sub.res_content.parent = local_material.res_content
local_material.children.append(remote_sub)
added_count += 1
else:
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}/{remote_sub_name}' "
f"已存在,跳过"
)
if added_count > 0:
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}': "
f"从远端同步了 {added_count} 个子物料"
)
else:
# 情况1: 一级节点是物料(不是 device
# 检查是否已存在
@@ -878,7 +911,7 @@ class ResourceTreeSet(object):
return self
def dump(self) -> List[List[Dict[str, Any]]]:
def dump(self, old_position=False) -> List[List[Dict[str, Any]]]:
"""
将 ResourceTreeSet 序列化为嵌套列表格式
@@ -894,6 +927,10 @@ class ResourceTreeSet(object):
# 获取树的所有节点并序列化
tree_nodes = [node.res_content.model_dump(by_alias=True) for node in tree.get_all_nodes()]
result.append(tree_nodes)
if old_position:
for r in result:
for rr in r:
rr["position"] = rr["pose"]["position"]
return result
@classmethod

View File

@@ -11,6 +11,7 @@ from io import StringIO
from typing import Iterable, Any, Dict, Type, TypeVar, Union
import yaml
from msgcenterpy.instances.ros2_instance import ROS2MessageInstance
from pydantic import BaseModel
from dataclasses import asdict, is_dataclass
@@ -727,46 +728,9 @@ def ros_message_to_json_schema(msg_class: Any, field_name: str) -> Dict[str, Any
Returns:
对应的 JSON Schema 定义
"""
schema = {"type": "object", "properties": {}, "required": []}
# 优先使用字段名作为标题,否则使用类名
schema = ROS2MessageInstance(msg_class()).get_json_schema()
schema["title"] = field_name
# 获取消息的字段和字段类型
try:
for ind, slot_info in enumerate(msg_class._fields_and_field_types.items()):
slot_name, slot_type = slot_info
type_info = msg_class.SLOT_TYPES[ind]
field_schema = ros_field_type_to_json_schema(type_info, slot_name)
schema["properties"][slot_name] = field_schema
schema["required"].append(slot_name)
# if hasattr(msg_class, 'get_fields_and_field_types'):
# fields_and_types = msg_class.get_fields_and_field_types()
#
# for field_name, field_type in fields_and_types.items():
# # 将 ROS 字段类型转换为 JSON Schema
# field_schema = ros_field_type_to_json_schema(field_type)
#
# schema['properties'][field_name] = field_schema
# schema['required'].append(field_name)
# elif hasattr(msg_class, '__slots__') and hasattr(msg_class, '_fields_and_field_types'):
# # 直接从实例属性获取
# for field_name in msg_class.__slots__:
# # 移除前导下划线(如果有)
# clean_name = field_name[1:] if field_name.startswith('_') else field_name
#
# # 从 _fields_and_field_types 获取类型
# if clean_name in msg_class._fields_and_field_types:
# field_type = msg_class._fields_and_field_types[clean_name]
# field_schema = ros_field_type_to_json_schema(field_type)
#
# schema['properties'][clean_name] = field_schema
# schema['required'].append(clean_name)
except Exception as e:
# 如果获取字段类型失败,添加错误信息
schema["description"] = f"解析消息字段时出错: {str(e)}"
logger.error(f"解析 {msg_class.__name__} 消息字段失败: {str(e)}")
schema.pop("description")
return schema

View File

@@ -34,7 +34,8 @@ from unilabos_msgs.action import SendCmd
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unilabos.config.config import BasicConfig
from unilabos.utils.decorator import get_topic_config, get_all_subscriptions
from unilabos.registry.decorators import get_topic_config
from unilabos.utils.decorator import get_all_subscriptions
from unilabos.resources.container import RegularContainer
from unilabos.resources.graphio import (
@@ -57,6 +58,7 @@ from unilabos_msgs.msg import Resource # type: ignore
from unilabos.resources.resource_tracker import (
DeviceNodeResourceTracker,
ResourceDictType,
ResourceTreeSet,
ResourceTreeInstance,
ResourceDictInstance,
@@ -194,9 +196,9 @@ class PropertyPublisher:
self._value = None
try:
self.publisher_ = node.create_publisher(msg_type, f"{name}", qos)
except AttributeError as ex:
except Exception as e:
self.node.lab_logger().error(
f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}"
f"StatusError, DeviceId: {self.node.device_id} 创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {e}"
)
self.timer = node.create_timer(self.timer_period, self.publish_property)
self.__loop = ROS2DeviceNode.get_asyncio_loop()
@@ -569,9 +571,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
future.add_done_callback(done_cb)
except ImportError:
self.lab_logger().error("Host请求添加物料时本环境并不存在pylabrobot")
res.response = get_result_info_str(traceback.format_exc(), False, {})
except Exception as e:
self.lab_logger().error("Host请求添加物料时出错")
self.lab_logger().error(traceback.format_exc())
res.response = get_result_info_str(traceback.format_exc(), False, {})
return res
# noinspection PyTypeChecker
@@ -594,6 +598,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.s2c_resource_tree, # type: ignore
callback_group=self.callback_group,
),
"s2c_device_manage": self.create_service(
SerialCommand,
f"/srv{self.namespace}/s2c_device_manage",
self.s2c_device_manage, # type: ignore
callback_group=self.callback_group,
),
}
# 向全局在线设备注册表添加设备信息
@@ -1062,6 +1072,48 @@ class BaseROS2DeviceNode(Node, Generic[T]):
return res
async def s2c_device_manage(self, req: SerialCommand_Request, res: SerialCommand_Response):
"""Handle add/remove device requests from HostNode via SerialCommand."""
try:
cmd = json.loads(req.command)
action = cmd.get("action", "")
data = cmd.get("data", {})
device_id = data.get("device_id", "")
if not device_id:
res.response = json.dumps({"success": False, "error": "device_id required"})
return res
if action == "add":
result = self.create_device(device_id, data)
elif action == "remove":
result = self.destroy_device(device_id)
else:
result = {"success": False, "error": f"Unknown action: {action}"}
res.response = json.dumps(result, ensure_ascii=False)
except NotImplementedError as e:
self.lab_logger().warning(f"[DeviceManage] {e}")
res.response = json.dumps({"success": False, "error": str(e)})
except Exception as e:
self.lab_logger().error(f"[DeviceManage] Error: {e}")
res.response = json.dumps({"success": False, "error": str(e)})
return res
def create_device(self, device_id: str, config: "ResourceDictType") -> dict:
"""Create a sub-device dynamically. Override in HostNode / WorkstationNode."""
raise NotImplementedError(
f"{self.__class__.__name__} does not support dynamic device creation"
)
def destroy_device(self, device_id: str) -> dict:
"""Destroy a sub-device dynamically. Override in HostNode / WorkstationNode."""
raise NotImplementedError(
f"{self.__class__.__name__} does not support dynamic device removal"
)
async def transfer_resource_to_another(
self,
plr_resources: List["ResourcePLR"],
@@ -1204,22 +1256,40 @@ class BaseROS2DeviceNode(Node, Generic[T]):
return self._lab_logger
def create_ros_publisher(self, attr_name, msg_type, initial_period=5.0):
"""创建ROS发布者"""
# 检测装饰器配置(支持 get_{attr_name} 方法和 @property
"""创建ROS发布者,仅当方法/属性有 @topic_config 装饰器时才创建。"""
# 检测 @topic_config 装饰器配置
topic_config = {}
driver_class = type(self.driver_instance)
# 优先检测 get_{attr_name} 方法
if hasattr(self.driver_instance, f"get_{attr_name}"):
getter_method = getattr(self.driver_instance, f"get_{attr_name}")
topic_config = get_topic_config(getter_method)
# 区分 @property 和普通方法两种情况
is_prop = hasattr(driver_class, attr_name) and isinstance(
getattr(driver_class, attr_name), property
)
# 如果没有配置,检测 @property 装饰的属性
if is_prop:
# @property: 检测 fget 上的 @topic_config
class_attr = getattr(driver_class, attr_name)
if class_attr.fget is not None:
topic_config = get_topic_config(class_attr.fget)
else:
# 普通方法: 直接检测 attr_name 方法上的 @topic_config
if hasattr(self.driver_instance, attr_name):
method = getattr(self.driver_instance, attr_name)
if callable(method):
topic_config = get_topic_config(method)
# 没有 @topic_config 装饰器则跳过发布
if not topic_config:
driver_class = type(self.driver_instance)
if hasattr(driver_class, attr_name):
class_attr = getattr(driver_class, attr_name)
if isinstance(class_attr, property) and class_attr.fget is not None:
topic_config = get_topic_config(class_attr.fget)
return
# 发布名称优先级: @topic_config(name=...) > get_ 前缀去除 > attr_name
cfg_name = topic_config.get("name")
if cfg_name:
publish_name = cfg_name
elif attr_name.startswith("get_"):
publish_name = attr_name[4:]
else:
publish_name = attr_name
# 使用装饰器配置或默认值
cfg_period = topic_config.get("period")
@@ -1232,10 +1302,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 获取属性值的方法
def get_device_attr():
try:
if hasattr(self.driver_instance, f"get_{attr_name}"):
return getattr(self.driver_instance, f"get_{attr_name}")()
else:
if is_prop:
return getattr(self.driver_instance, attr_name)
else:
return getattr(self.driver_instance, attr_name)()
except AttributeError as ex:
if ex.args[0].startswith(f"AttributeError: '{self.driver_instance.__class__.__name__}' object"):
self.lab_logger().error(
@@ -1247,8 +1317,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
)
self.lab_logger().error(traceback.format_exc())
self._property_publishers[attr_name] = PropertyPublisher(
self, attr_name, get_device_attr, msg_type, period, print_publish, qos
self._property_publishers[publish_name] = PropertyPublisher(
self, publish_name, get_device_attr, msg_type, period, print_publish, qos
)
def create_ros_action_server(self, action_name, action_value_mapping):
@@ -1256,14 +1326,17 @@ class BaseROS2DeviceNode(Node, Generic[T]):
action_type = action_value_mapping["type"]
str_action_type = str(action_type)[8:-2]
self._action_servers[action_name] = ActionServer(
self,
action_type,
action_name,
execute_callback=self._create_execute_callback(action_name, action_value_mapping),
callback_group=self.callback_group,
)
try:
self._action_servers[action_name] = ActionServer(
self,
action_type,
action_name,
execute_callback=self._create_execute_callback(action_name, action_value_mapping),
callback_group=self.callback_group,
)
except Exception as e:
self.lab_logger().error(f"创建ActionServer失败Device: {self.device_id}, Action Name: {action_name}, Action Type: {action_type}, Error: {e}")
return
self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}")
def _setup_decorated_subscribers(self):

View File

@@ -4,7 +4,14 @@ import cv2
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.registry.decorators import device
@device(
id="camera",
category=["camera"],
description="""VideoPublisher摄像头设备节点用于实时视频采集和流媒体发布。该设备通过OpenCV连接本地摄像头如USB摄像头、内置摄像头等定时采集视频帧并将其转换为ROS2的sensor_msgs/Image消息格式发布到视频话题。主要用于实验室自动化系统中的视觉监控、图像分析、实时观察等应用场景。支持可配置的摄像头索引、发布频率等参数。""",
)
class VideoPublisher(BaseROS2DeviceNode):
def __init__(self, device_id='video_publisher', registry_name="", device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
# 初始化BaseROS2DeviceNode使用自身作为driver_instance

View File

@@ -12,6 +12,7 @@ from geometry_msgs.msg import Point
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.service import Service
from typing_extensions import TypedDict
from unilabos_msgs.action import EmptyIn, StrSingleInput, ResourceCreateFromOuterEasy, ResourceCreateFromOuter
from unilabos_msgs.msg import Resource # type: ignore
from unilabos_msgs.srv import (
ResourceAdd,
@@ -23,6 +24,7 @@ from unilabos_msgs.srv import (
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
from unilabos.registry.decorators import device
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.registry.registry import lab_registry
from unilabos.resources.container import RegularContainer
@@ -30,6 +32,7 @@ from unilabos.resources.graphio import initialize_resource
from unilabos.resources.registry import add_schema
from unilabos.resources.resource_tracker import (
ResourceDict,
ResourceDictType,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
@@ -65,7 +68,13 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample]
# unilabos_samples: List[LabSample]
class CreateResourceReturn(TypedDict):
created_resource_tree: List[List[ResourceDict]]
liquid_input_resource_tree: List[Dict[str, Any]]
# unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict):
@@ -80,6 +89,7 @@ class TestLatencyReturn(TypedDict):
status: str
@device(id="host_node", category=[], description="Host Node", icon="icon_device.webp")
class HostNode(BaseROS2DeviceNode):
"""
主机节点类,负责管理设备、资源和控制器
@@ -268,44 +278,42 @@ class HostNode(BaseROS2DeviceNode):
self._action_clients: Dict[str, ActionClient] = { # 为了方便了解实际的数据类型host的默认写好
"/devices/host_node/create_resource": ActionClient(
self,
lab_registry.ResourceCreateFromOuterEasy,
ResourceCreateFromOuterEasy,
"/devices/host_node/create_resource",
callback_group=self.callback_group,
),
"/devices/host_node/create_resource_detailed": ActionClient(
self,
lab_registry.ResourceCreateFromOuter,
ResourceCreateFromOuter,
"/devices/host_node/create_resource_detailed",
callback_group=self.callback_group,
),
"/devices/host_node/test_latency": ActionClient(
self,
lab_registry.EmptyIn,
EmptyIn,
"/devices/host_node/test_latency",
callback_group=self.callback_group,
),
"/devices/host_node/test_resource": ActionClient(
self,
lab_registry.EmptyIn,
EmptyIn,
"/devices/host_node/test_resource",
callback_group=self.callback_group,
),
"/devices/host_node/_execute_driver_command": ActionClient(
self,
lab_registry.StrSingleInput,
StrSingleInput,
"/devices/host_node/_execute_driver_command",
callback_group=self.callback_group,
),
"/devices/host_node/_execute_driver_command_async": ActionClient(
self,
lab_registry.StrSingleInput,
StrSingleInput,
"/devices/host_node/_execute_driver_command_async",
callback_group=self.callback_group,
),
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
) # device_id -> action_value_mappings(本地+远程设备统一存储)
self._action_value_mappings: Dict[str, Dict] = {} # device_id -> action_value_mappings(本地+远程设备统一存储)
self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings)
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
@@ -323,10 +331,18 @@ class HostNode(BaseROS2DeviceNode):
self._discover_devices()
# 初始化所有本机设备节点,多一次过滤,防止重复初始化
local_machine = BasicConfig.machine_name
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type != "device":
continue
dev_machine = device_config.res_content.machine_name
if dev_machine and local_machine and dev_machine != local_machine:
self.lab_logger().info(
f"[Host Node] Device {device_id} belongs to machine '{dev_machine}', "
f"local is '{local_machine}', skipping initialization."
)
continue
if device_id not in self.devices_names:
self.initialize_device(device_id, device_config)
else:
@@ -556,7 +572,7 @@ class HostNode(BaseROS2DeviceNode):
liquid_type: list[str] = [],
liquid_volume: list[int] = [],
slot_on_deck: str = "",
):
) -> CreateResourceReturn:
# 暂不支持多对同名父子同时存在
res_creation_input = {
"id": res_id.split("/")[-1],
@@ -609,6 +625,8 @@ class HostNode(BaseROS2DeviceNode):
assert len(response) == 1, "Create Resource应当只返回一个结果"
for i in response:
res = json.loads(i)
if "suc" in res:
raise ValueError(res.get("error"))
return res
except Exception as ex:
pass
@@ -650,7 +668,12 @@ class HostNode(BaseROS2DeviceNode):
action_id = f"/devices/{device_id}/{action_name}"
if action_id not in self._action_clients:
action_type = action_value_mapping["type"]
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
try:
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
except Exception as e:
self.lab_logger().error(
f"创建ActionClient失败Device: {device_id}, Action Name: {action_name}, Action Type: {action_type}, Error: {e}")
continue
self.lab_logger().trace(
f"[Host Node] Created ActionClient (Local): {action_id}"
) # 子设备再创建用的是Discover发现的
@@ -1250,9 +1273,9 @@ class HostNode(BaseROS2DeviceNode):
# 用 registry_name 索引已存储的 registry_config,获取 action_value_mappings
if registry_name and registry_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[registry_name].get(
"class", {}
).get("action_value_mappings", {})
action_mappings = (
self._slave_registry_configs[registry_name].get("class", {}).get("action_value_mappings", {})
)
if action_mappings:
self._action_value_mappings[edge_device_id] = action_mappings
self.lab_logger().info(
@@ -1272,14 +1295,19 @@ class HostNode(BaseROS2DeviceNode):
# 解析 devices_config,建立 device_id -> action_value_mappings 映射
if devices_config:
machine_name = info["machine_name"]
# Stamp machine_name on each device dict before parsing
for device_tree in devices_config:
for device_dict in device_tree:
device_dict["machine_name"] = machine_name
device_id = device_dict.get("id", "")
class_name = device_dict.get("class", "")
if device_id and class_name and class_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[class_name].get(
"class", {}
).get("action_value_mappings", {})
action_mappings = (
self._slave_registry_configs[class_name]
.get("class", {})
.get("action_value_mappings", {})
)
if action_mappings:
self._action_value_mappings[device_id] = action_mappings
self.lab_logger().info(
@@ -1287,6 +1315,18 @@ class HostNode(BaseROS2DeviceNode):
f"for remote device {device_id} (class: {class_name})"
)
# Merge slave devices_config into self.devices_config tree
try:
slave_tree_set = ResourceTreeSet.load(devices_config) # slave一定是根节点的tree
for tree in slave_tree_set.trees:
self.devices_config.trees.append(tree)
self.lab_logger().info(
f"[Host Node] Merged {len(slave_tree_set.trees)} slave device trees "
f"(machine: {machine_name}) into devices_config"
)
except Exception as e:
self.lab_logger().error(f"[Host Node] Failed to merge slave devices_config: {e}")
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
@@ -1695,3 +1735,177 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().error(f"[Host Node-Resource] Error notifying resource tree update: {str(e)}")
self.lab_logger().error(traceback.format_exc())
return False
# ------------------------------------------------------------------
# Device lifecycle (add / remove) — pure forwarder
# ------------------------------------------------------------------
def notify_device_manage(self, target_node_id: str, action: str, config: ResourceDictType) -> bool:
"""Forward an add/remove device command to the target node via ROS2 SerialCommand.
The HostNode does NOT interpret the command; it simply resolves the
target namespace and forwards the request to ``s2c_device_manage``.
If *target_node_id* equals the HostNode's own device_id (i.e. the
command targets the host itself), we call our local ``create_device``
/ ``destroy_device`` directly instead of going through ROS2.
"""
try:
# If the target is the host itself, handle locally
device_id = config["id"]
if target_node_id == self.device_id:
if action == "add":
return self.create_device(device_id, config).get("success", False)
elif action == "remove":
return self.destroy_device(device_id).get("success", False)
if target_node_id not in self.devices_names:
self.lab_logger().error(
f"[Host Node-DeviceMgr] Target {target_node_id} not found in devices_names"
)
return False
namespace = self.devices_names[target_node_id]
device_key = f"{namespace}/{target_node_id}"
if device_key not in self._online_devices:
self.lab_logger().error(f"[Host Node-DeviceMgr] Target {device_key} is offline")
return False
srv_address = f"/srv{namespace}/s2c_device_manage"
self.lab_logger().info(
f"[Host Node-DeviceMgr] Forwarding {action}_device to {target_node_id} ({srv_address})"
)
sclient = self.create_client(SerialCommand, srv_address)
if not sclient.wait_for_service(timeout_sec=5.0):
self.lab_logger().error(f"[Host Node-DeviceMgr] Service {srv_address} not available")
return False
request = SerialCommand.Request()
request.command = json.dumps({"action": action, "data": config}, ensure_ascii=False)
future = sclient.call_async(request)
timeout = 30.0
start_time = time.time()
while not future.done():
if time.time() - start_time > timeout:
self.lab_logger().error(
f"[Host Node-DeviceMgr] Timeout waiting for {action}_device on {target_node_id}"
)
return False
time.sleep(0.05)
response = future.result()
self.lab_logger().info(
f"[Host Node-DeviceMgr] {action}_device on {target_node_id} completed"
)
return True
except Exception as e:
self.lab_logger().error(f"[Host Node-DeviceMgr] Error: {e}")
self.lab_logger().error(traceback.format_exc())
return False
def create_device(self, device_id: str, config: ResourceDictType) -> dict:
"""Dynamically create a root-level device on the host."""
if not device_id:
return {"success": False, "error": "device_id required"}
if device_id in self.devices_names:
return {"success": False, "error": f"Device {device_id} already exists"}
try:
config.setdefault("id", device_id)
config.setdefault("type", "device")
config.setdefault("machine_name", BasicConfig.machine_name or "本地")
res_dict = ResourceDictInstance.get_resource_instance_from_dict(config)
self.initialize_device(device_id, res_dict)
if device_id not in self.devices_names:
return {"success": False, "error": f"initialize_device failed for {device_id}"}
# Add to config tree (devices_config)
tree = ResourceTreeInstance(res_dict)
self.devices_config.trees.append(tree)
# Add to resource tracker so s2c_resource_tree can find it
try:
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self._resource_tracker.add_resource(plr_resource)
except Exception as ex:
self.lab_logger().warning(f"[Host Node-DeviceMgr] PLR resource registration skipped: {ex}")
self.lab_logger().info(f"[Host Node-DeviceMgr] Device {device_id} created successfully")
return {"success": True, "device_id": device_id}
except Exception as e:
self.lab_logger().error(f"[Host Node-DeviceMgr] Failed to create {device_id}: {e}")
self.lab_logger().error(traceback.format_exc())
return {"success": False, "error": str(e)}
def destroy_device(self, device_id: str) -> dict:
"""Remove a root-level device from the host."""
if not device_id:
return {"success": False, "error": "device_id required"}
if device_id not in self.devices_names:
return {"success": False, "error": f"Device {device_id} not found"}
if device_id == self.device_id:
return {"success": False, "error": "Cannot destroy host_node itself"}
try:
namespace = self.devices_names[device_id]
device_key = f"{namespace}/{device_id}"
# Remove action clients
action_prefix = f"/devices/{device_id}/"
to_remove = [k for k in self._action_clients if k.startswith(action_prefix)]
for k in to_remove:
try:
self._action_clients[k].destroy()
except Exception:
pass
del self._action_clients[k]
# Remove from config tree (devices_config)
self.devices_config.trees = [
t for t in self.devices_config.trees
if t.root_node.res_content.id != device_id
]
# Remove from resource tracker
try:
tracked = self._resource_tracker.uuid_to_resources.copy()
for uid, res in tracked.items():
res_id = res.get("id") if isinstance(res, dict) else getattr(res, "name", None)
if res_id == device_id:
self._resource_tracker.remove_resource(res)
except Exception as ex:
self.lab_logger().warning(f"[Host Node-DeviceMgr] Resource tracker cleanup: {ex}")
# Clean internal state
self._online_devices.discard(device_key)
self.devices_names.pop(device_id, None)
self.device_machine_names.pop(device_id, None)
self._action_value_mappings.pop(device_id, None)
# Destroy the ROS2 node of the device
instance = self.devices_instances.pop(device_id, None)
if instance is not None:
try:
# noinspection PyProtectedMember
ros_node = getattr(instance, "_ros_node", None)
if ros_node is not None:
ros_node.destroy_node()
except Exception as e:
self.lab_logger().warning(f"[Host Node-DeviceMgr] Error destroying ROS node for {device_id}: {e}")
self.lab_logger().info(f"[Host Node-DeviceMgr] Device {device_id} destroyed")
return {"success": True, "device_id": device_id}
except Exception as e:
self.lab_logger().error(f"[Host Node-DeviceMgr] Failed to destroy {device_id}: {e}")
self.lab_logger().error(traceback.format_exc())
return {"success": False, "error": str(e)}

View File

@@ -20,7 +20,7 @@ from unilabos.ros.msgs.message_converter import (
convert_from_ros_msg_with_mapping,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDictInstance
from unilabos.resources.resource_tracker import ResourceDictType, ResourceTreeSet, ResourceDictInstance
from unilabos.utils.type_check import get_result_info_str
if TYPE_CHECKING:
@@ -177,6 +177,103 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
self.lab_logger().trace(f"为子设备 {device_id} 创建动作客户端: {action_name}")
return d
def create_device(self, device_id: str, config: ResourceDictType) -> dict:
"""Dynamically add a sub-device to this workstation."""
if not device_id:
return {"success": False, "error": "device_id required"}
if device_id in self.sub_devices:
return {"success": False, "error": f"Sub-device {device_id} already exists"}
try:
from unilabos.config.config import BasicConfig
config.setdefault("id", device_id)
config.setdefault("type", "device")
config.setdefault("machine_name", BasicConfig.machine_name or "本地")
res_dict = ResourceDictInstance.get_resource_instance_from_dict(config)
d = self.initialize_device(device_id, res_dict)
if d is None:
return {"success": False, "error": f"initialize_device returned None for {device_id}"}
# Add to children config list
self.children.append(res_dict)
# Add to resource tracker
try:
from unilabos.resources.resource_tracker import ResourceTreeInstance
tree = ResourceTreeInstance(res_dict)
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self.resource_tracker.add_resource(plr_resource)
except Exception as ex:
self.lab_logger().warning(f"[Workstation-DeviceMgr] PLR resource registration skipped: {ex}")
self.lab_logger().info(f"[Workstation-DeviceMgr] Sub-device {device_id} created")
return {"success": True, "device_id": device_id}
except Exception as e:
self.lab_logger().error(f"[Workstation-DeviceMgr] Failed to create {device_id}: {e}")
self.lab_logger().error(traceback.format_exc())
return {"success": False, "error": str(e)}
def destroy_device(self, device_id: str) -> dict:
"""Dynamically remove a sub-device from this workstation."""
if not device_id:
return {"success": False, "error": "device_id required"}
if device_id not in self.sub_devices:
return {"success": False, "error": f"Sub-device {device_id} not found"}
try:
# Remove from children config list
self.children = [
c for c in self.children
if c.res_content.id != device_id
]
# Remove from resource tracker
try:
tracked = self.resource_tracker.uuid_to_resources.copy()
for uid, res in tracked.items():
res_id = res.get("id") if isinstance(res, dict) else getattr(res, "name", None)
if res_id == device_id:
self.resource_tracker.remove_resource(res)
except Exception as ex:
self.lab_logger().warning(f"[Workstation-DeviceMgr] Resource tracker cleanup: {ex}")
# Remove action clients for this sub-device
action_prefix = f"/devices/{device_id}/"
to_remove = [k for k in self._action_clients if k.startswith(action_prefix)]
for k in to_remove:
try:
self._action_clients[k].destroy()
except Exception:
pass
del self._action_clients[k]
# Destroy the ROS2 node
instance = self.sub_devices.pop(device_id, None)
if instance is not None:
ros_node = getattr(instance, "ros_node_instance", None)
if ros_node is not None:
try:
ros_node.destroy_node()
except Exception as e:
self.lab_logger().warning(
f"[Workstation-DeviceMgr] Error destroying ROS node for {device_id}: {e}"
)
# Remove from communication map if present
self.communication_node_id_to_instance.pop(device_id, None)
self.lab_logger().info(f"[Workstation-DeviceMgr] Sub-device {device_id} destroyed")
return {"success": True, "device_id": device_id}
except Exception as e:
self.lab_logger().error(f"[Workstation-DeviceMgr] Failed to destroy {device_id}: {e}")
self.lab_logger().error(traceback.format_exc())
return {"success": False, "error": str(e)}
def create_ros_action_server(self, action_name, action_value_mapping):
"""创建ROS动作服务器"""
if action_name not in self.protocol_names:

View File

@@ -19,74 +19,6 @@ def singleton(cls):
return get_instance
def topic_config(
period: Optional[float] = None,
print_publish: Optional[bool] = None,
qos: Optional[int] = None,
) -> Callable[[F], F]:
"""
Topic发布配置装饰器
用于装饰 get_{attr_name} 方法或 @property控制对应属性的ROS topic发布行为。
Args:
period: 发布周期。None 表示使用默认值 5.0
print_publish: 是否打印发布日志。None 表示使用节点默认配置
qos: QoS深度配置。None 表示使用默认值 10
Example:
class MyDriver:
# 方式1: 装饰 get_{attr_name} 方法
@topic_config(period=1.0, print_publish=False, qos=5)
def get_temperature(self):
return self._temperature
# 方式2: 与 @property 连用topic_config 放在下面)
@property
@topic_config(period=0.1)
def position(self):
return self._position
Note:
与 @property 连用时,@topic_config 必须放在 @property 下面,
这样装饰器执行顺序为:先 topic_config 添加配置,再 property 包装。
"""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
# 在函数上附加配置属性 (type: ignore 用于动态属性)
wrapper._topic_period = period # type: ignore[attr-defined]
wrapper._topic_print_publish = print_publish # type: ignore[attr-defined]
wrapper._topic_qos = qos # type: ignore[attr-defined]
wrapper._has_topic_config = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
return decorator
def get_topic_config(func) -> dict:
"""
获取函数上的topic配置
Args:
func: 被装饰的函数
Returns:
包含 period, print_publish, qos 的配置字典
"""
if hasattr(func, "_has_topic_config") and getattr(func, "_has_topic_config", False):
return {
"period": getattr(func, "_topic_period", None),
"print_publish": getattr(func, "_topic_print_publish", None),
"qos": getattr(func, "_topic_qos", None),
}
return {}
def subscribe(
topic: str,
msg_type: Optional[type] = None,
@@ -104,24 +36,6 @@ def subscribe(
- {namespace}: 完整命名空间 (如 "/devices/pump_1")
msg_type: ROS 消息类型。如果为 None需要在回调函数的类型注解中指定
qos: QoS 深度配置,默认为 10
Example:
from std_msgs.msg import String, Float64
class MyDriver:
@subscribe(topic="/devices/{device_id}/set_speed", msg_type=Float64)
def on_speed_update(self, msg: Float64):
self._speed = msg.data
print(f"Speed updated to: {self._speed}")
@subscribe(topic="{namespace}/command")
def on_command(self, msg: String):
# msg_type 可从类型注解推断
self.execute_command(msg.data)
Note:
- 回调方法的第一个参数是 self第二个参数是收到的 ROS 消息
- topic 中的占位符会在创建订阅时被实际值替换
"""
def decorator(func: F) -> F:
@@ -129,7 +43,6 @@ def subscribe(
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
# 在函数上附加订阅配置
wrapper._subscribe_topic = topic # type: ignore[attr-defined]
wrapper._subscribe_msg_type = msg_type # type: ignore[attr-defined]
wrapper._subscribe_qos = qos # type: ignore[attr-defined]
@@ -141,15 +54,7 @@ def subscribe(
def get_subscribe_config(func) -> dict:
"""
获取函数上的订阅配置
Args:
func: 被装饰的函数
Returns:
包含 topic, msg_type, qos 的配置字典
"""
"""获取函数上的订阅配置 (topic, msg_type, qos)"""
if hasattr(func, "_has_subscribe") and getattr(func, "_has_subscribe", False):
return {
"topic": getattr(func, "_subscribe_topic", None),
@@ -163,9 +68,6 @@ def get_all_subscriptions(instance) -> list:
"""
扫描实例的所有方法,获取带有 @subscribe 装饰器的方法及其配置
Args:
instance: 要扫描的实例
Returns:
包含 (method_name, method, config) 元组的列表
"""
@@ -184,92 +86,14 @@ def get_all_subscriptions(instance) -> list:
return subscriptions
def always_free(func: F) -> F:
"""
标记动作为永久闲置(不受busy队列限制)的装饰器
被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制,
任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。
Example:
class MyDriver:
@always_free
def query_status(self, param: str):
# 这个动作可以随时执行,不需要排队
return self._status
def transfer(self, volume: float):
# 这个动作会按正常排队逻辑执行
pass
Note:
- 可以与其他装饰器组合使用,@always_free 应放在最外层
- 仅影响 WebSocket 调度层的 busy/free 判断,不影响 ROS2 层
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_always_free(func) -> bool:
"""
检查函数是否被标记为永久闲置
Args:
func: 被检查的函数
Returns:
如果函数被 @always_free 装饰则返回 True否则返回 False
"""
return getattr(func, "_is_always_free", False)
def not_action(func: F) -> F:
"""
标记方法为非动作的装饰器
用于装饰 driver 类中的方法,使其在 complete_registry 时不被识别为动作。
适用于辅助方法、内部工具方法等不应暴露为设备动作的公共方法。
Example:
class MyDriver:
@not_action
def helper_method(self):
# 这个方法不会被注册为动作
pass
def actual_action(self, param: str):
# 这个方法会被注册为动作
self.helper_method()
Note:
- 可以与其他装饰器组合使用,@not_action 应放在最外层
- 仅影响 complete_registry 的动作识别,不影响方法的正常调用
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
# 在函数上附加标记
wrapper._is_not_action = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_not_action(func) -> bool:
"""
检查函数是否被标记为非动作
Args:
func: 被检查的函数
Returns:
如果函数被 @not_action 装饰则返回 True否则返回 False
"""
return getattr(func, "_is_not_action", False)
# ---------------------------------------------------------------------------
# 向后兼容重导出 -- 已迁移到 unilabos.registry.decorators
# ---------------------------------------------------------------------------
from unilabos.registry.decorators import ( # noqa: E402, F401
topic_config,
get_topic_config,
always_free,
is_always_free,
not_action,
is_not_action,
)

View File

@@ -22,6 +22,7 @@ class EnvironmentChecker:
# "pymodbus.framer.FramerType": "pymodbus==3.9.2",
"websockets": "websockets",
"msgcenterpy": "msgcenterpy",
"orjson": "orjson",
"opentrons_shared_data": "opentrons_shared_data",
"typing_extensions": "typing_extensions",
"crcmod": "crcmod-plus",
@@ -32,7 +33,7 @@ class EnvironmentChecker:
# 包版本要求(包名: 最低版本)
self.version_requirements = {
"msgcenterpy": "0.1.5", # msgcenterpy 最低版本要求
"msgcenterpy": "0.1.7", # msgcenterpy 最低版本要求
}
self.missing_packages = []

View File

@@ -29,7 +29,7 @@ from ast import Constant
from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS
from unilabos.utils import logger
from unilabos.utils.decorator import is_not_action, is_always_free
from unilabos.registry.decorators import is_not_action, is_always_free
class ImportManager:
@@ -481,10 +481,16 @@ class ImportManager:
return False
def _is_always_free_method(self, node: ast.FunctionDef) -> bool:
"""检查是否是@always_free装饰的方法"""
"""检查是否是@always_free装饰的方法,或 @action(always_free=True) 装饰的方法"""
for decorator in node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id == "always_free":
return True
# 检查 @action(always_free=True)
if isinstance(decorator, ast.Call):
func = decorator.func
if isinstance(func, ast.Name) and func.id == "action":
for keyword in decorator.keywords:
if keyword.arg == "always_free":
if isinstance(keyword.value, Constant) and keyword.value.value is True:
return True
return False
def _get_property_name_from_setter(self, node: ast.FunctionDef) -> str:

View File

@@ -217,7 +217,6 @@ def configure_logger(loglevel=None, working_dir=None):
return log_filepath
# 配置日志系统
configure_logger()

View File

@@ -1,7 +1,8 @@
networkx
typing_extensions
websockets
msgcenterpy>=0.1.5
msgcenterpy>=0.1.7
orjson>=3.11
opentrons_shared_data
pint
fastapi