Compare commits

..

3 Commits

Author SHA1 Message Date
Junhan Chang
48e13a7b4d P1 Edge: 关节桥接重构 — 直接订阅 /joint_states + 资源跟随 + 吞吐优化
- HostNode 直接订阅 /joint_states (JointStateMsg),绕过 JointRepublisher 中间人
- 新增 resource_pose 订阅,实现资源夹取跟随 (gripper attach/detach)
- 吞吐优化:死区过滤 (1e-4 rad)、抑频 (~20Hz)、增量 resource_poses
- JointRepublisher 修复 str→json.dumps (E1)
- communication.py 新增 publish_joint_state 抽象方法
- ws_client.py 实现 push_joint_state action 发送
- 57 项测试覆盖:关节分组、资源跟随、同类型多设备、优化行为

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 06:32:30 +08:00
Junhan Chang
d776550a4b add virtual_sample_demo 样品追踪测试设备 2026-03-23 16:43:20 +08:00
Xuwznln
3d8123849a add external devices param
fix registry upload missing type
2026-03-23 15:01:16 +08:00
15 changed files with 2595 additions and 175 deletions

View File

@@ -0,0 +1,160 @@
---
name: add-device
description: Guide for adding new devices to Uni-Lab-OS (接入新设备). Uses @device decorator + AST auto-scanning instead of manual YAML. Walks through device category, communication protocol, driver creation with decorators, and graph file setup. Use when the user wants to add/integrate a new device, create a device driver, write a device class, or mentions 接入设备/添加设备/设备驱动/物模型.
---
# 添加新设备到 Uni-Lab-OS
**第一步:** 使用 Read 工具读取 `docs/ai_guides/add_device.md`,获取完整的设备接入指南。
该指南包含设备类别(物模型)列表、通信协议模板、常见错误检查清单等。搜索 `unilabos/devices/` 获取已有设备的实现参考。
---
## 装饰器参考
### @device — 设备类装饰器
```python
from unilabos.registry.decorators import device
# 单设备
@device(
id="my_device.vendor", # 注册表唯一标识(必填)
category=["temperature"], # 分类标签列表(必填)
description="设备描述", # 设备描述
display_name="显示名称", # UI 显示名称(默认用 id
icon="DeviceIcon.webp", # 图标文件名
version="1.0.0", # 版本号
device_type="python", # "python" 或 "ros2"
handles=[...], # 端口列表InputHandle / OutputHandle
model={...}, # 3D 模型配置
hardware_interface=HardwareInterface(...), # 硬件通信接口
)
# 多设备(同一个类注册多个设备 ID各自有不同的 handles 等配置)
@device(
ids=["pump.vendor.model_A", "pump.vendor.model_B"],
id_meta={
"pump.vendor.model_A": {"handles": [...], "description": "型号 A"},
"pump.vendor.model_B": {"handles": [...], "description": "型号 B"},
},
category=["pump_and_valve"],
)
```
### @action — 动作方法装饰器
```python
from unilabos.registry.decorators import action
@action # 无参:注册为 UniLabJsonCommand 动作
@action() # 同上
@action(description="执行操作") # 带描述
@action(
action_type=HeatChill, # 指定 ROS Action 消息类型
goal={"temperature": "temp"}, # Goal 字段映射
feedback={}, # Feedback 字段映射
result={}, # Result 字段映射
handles=[...], # 动作级别端口
goal_default={"temp": 25.0}, # Goal 默认值
placeholder_keys={...}, # 参数占位符
always_free=True, # 不受排队限制
auto_prefix=True, # 强制使用 auto- 前缀
parent=True, # 从父类 MRO 获取参数签名
)
```
**自动识别规则:**
-`@action` 的公开方法 → 注册为动作(方法名即动作名)
- **不带 `@action` 的公开方法** → 自动注册为 `auto-{方法名}` 动作
- `_` 开头的方法 → 不扫描
- `@not_action` 标记的方法 → 排除
### @topic_config — 状态属性配置
```python
from unilabos.registry.decorators import topic_config
@property
@topic_config(
period=5.0, # 发布周期(秒),默认 5.0
print_publish=False, # 是否打印发布日志
qos=10, # QoS 深度,默认 10
name="custom_name", # 自定义发布名称(默认用属性名)
)
def temperature(self) -> float:
return self.data.get("temperature", 0.0)
```
### 辅助装饰器
```python
from unilabos.registry.decorators import not_action, always_free
@not_action # 标记为非动作post_init、辅助方法等
@always_free # 标记为不受排队限制(查询类操作)
```
---
## 设备模板
```python
import logging
from typing import Any, Dict, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.registry.decorators import device, action, topic_config, not_action
@device(id="my_device", category=["my_category"], description="设备描述")
class MyDevice:
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
self.device_id = device_id or "my_device"
self.config = config or {}
self.logger = logging.getLogger(f"MyDevice.{self.device_id}")
self.data: Dict[str, Any] = {"status": "Idle"}
@not_action
def post_init(self, ros_node: BaseROS2DeviceNode) -> None:
self._ros_node = ros_node
@action
async def initialize(self) -> bool:
self.data["status"] = "Ready"
return True
@action
async def cleanup(self) -> bool:
self.data["status"] = "Offline"
return True
@action(description="执行操作")
def my_action(self, param: float = 0.0, name: str = "") -> Dict[str, Any]:
"""带 @action 装饰器 → 注册为 'my_action' 动作"""
return {"success": True}
def get_info(self) -> Dict[str, Any]:
"""无 @action → 自动注册为 'auto-get_info' 动作"""
return {"device_id": self.device_id}
@property
@topic_config()
def status(self) -> str:
return self.data.get("status", "Idle")
@property
@topic_config(period=2.0)
def temperature(self) -> float:
return self.data.get("temperature", 0.0)
```
### 要点
- `_ros_node: BaseROS2DeviceNode` 类型标注放在类体顶部
- `__init__` 签名固定为 `(self, device_id=None, config=None, **kwargs)`
- `post_init``@not_action` 标记,参数类型标注为 `BaseROS2DeviceNode`
- 运行时状态存储在 `self.data` 字典中
- 设备文件放在 `unilabos/devices/<category>/` 目录下

View File

@@ -0,0 +1,351 @@
---
name: add-resource
description: Guide for adding new resources (materials, bottles, carriers, decks, warehouses) to Uni-Lab-OS (添加新物料/资源). Uses @resource decorator for AST auto-scanning. Covers Bottle, Carrier, Deck, WareHouse definitions. Use when the user wants to add resources, define materials, create a deck layout, add bottles/carriers/plates, or mentions 物料/资源/resource/bottle/carrier/deck/plate/warehouse.
---
# 添加新物料资源
Uni-Lab-OS 的资源体系基于 PyLabRobot通过扩展实现 Bottle、Carrier、WareHouse、Deck 等实验室物料管理。使用 `@resource` 装饰器注册AST 自动扫描生成注册表条目。
---
## 资源类型
| 类型 | 基类 | 用途 | 示例 |
|------|------|------|------|
| **Bottle** | `Well` (PyLabRobot) | 单个容器(瓶、小瓶、烧杯、反应器) | 试剂瓶、粉末瓶 |
| **BottleCarrier** | `ItemizedCarrier` | 多槽位载架(放多个 Bottle | 6 位试剂架、枪头盒 |
| **WareHouse** | `ItemizedCarrier` | 堆栈/仓库(放多个 Carrier | 4x4 堆栈 |
| **Deck** | `Deck` (PyLabRobot) | 工作站台面(放多个 WareHouse | 反应站 Deck |
**层级关系:** `Deck``WareHouse``BottleCarrier``Bottle`
WareHouse 本质上和 Site 是同一概念 — 都是定义一组固定的放置位slot只不过 WareHouse 多嵌套了一层 Deck。两者都需要开发者根据实际物理尺寸自行计算各 slot 的偏移坐标。
---
## @resource 装饰器
```python
from unilabos.registry.decorators import resource
@resource(
id="my_resource_id", # 注册表唯一标识(必填)
category=["bottles"], # 分类标签列表(必填)
description="资源描述",
icon="", # 图标
version="1.0.0",
handles=[...], # 端口列表InputHandle / OutputHandle
model={...}, # 3D 模型配置
class_type="pylabrobot", # "python" / "pylabrobot" / "unilabos"
)
```
---
## 创建规范
### 命名规则
1. **`name` 参数作为前缀**:所有工厂函数必须接受 `name: str` 参数,创建子物料时以 `name` 作为前缀,确保实例名在运行时全局唯一
2. **Bottle 命名约定**:试剂瓶-Bottle烧杯-Beaker烧瓶-Flask小瓶-Vial
3. **函数名 = `@resource(id=...)`**:工厂函数名与注册表 id 保持一致
### 子物料命名示例
```python
# Carrier 内部的 sites 用 name 前缀
for k, v in sites.items():
v.name = f"{name}_{v.name}" # "堆栈1左_A01", "堆栈1左_B02" ...
# Carrier 中放置 Bottle 时用 name 前缀
carrier[0] = My_Reagent_Bottle(f"{name}_flask_1") # "堆栈1左_flask_1"
carrier[i] = My_Solid_Vial(f"{name}_vial_{ordering[i]}") # "堆栈1左_vial_A1"
# create_homogeneous_resources 使用 name_prefix
sites=create_homogeneous_resources(
klass=ResourceHolder,
locations=[...],
name_prefix=name, # 自动生成 "{name}_0", "{name}_1" ...
)
# Deck setup 中用仓库名称作为 name 传入
self.warehouses = {
"堆栈1左": my_warehouse_4x4("堆栈1左"), # WareHouse.name = "堆栈1左"
"试剂堆栈": my_reagent_stack("试剂堆栈"), # WareHouse.name = "试剂堆栈"
}
```
### 其他规范
- **max_volume 单位为 μL**500mL = 500000
- **尺寸单位为 mm**`diameter`, `height`, `size_x/y/z`, `dx/dy/dz`
- **BottleCarrier 必须设置 `num_items_x/y/z`**:用于前端渲染布局
- **Deck 的 `__init__` 必须接受 `setup=False`**:图文件中 `config.setup=true` 触发 `setup()`
- **按项目分组文件**:同一工作站的资源放在 `unilabos/resources/<project>/`
- **`__init__` 必须接受 `serialize()` 输出的所有字段**`serialize()` 输出会作为 `config` 回传到 `__init__`,因此必须通过显式参数或 `**kwargs` 接受,否则反序列化会报错
- **持久化运行时状态用 `serialize_state()`**:通过 `_unilabos_state` 字典存储可变信息(如物料内容、液体量),只存 JSON 可序列化的基本类型
---
## 资源模板
### Bottle
```python
from unilabos.registry.decorators import resource
from unilabos.resources.itemized_carrier import Bottle
@resource(id="My_Reagent_Bottle", category=["bottles"], description="我的试剂瓶")
def My_Reagent_Bottle(
name: str,
diameter: float = 70.0,
height: float = 120.0,
max_volume: float = 500000.0,
barcode: str = None,
) -> Bottle:
return Bottle(
name=name,
diameter=diameter,
height=height,
max_volume=max_volume,
barcode=barcode,
model="My_Reagent_Bottle",
)
```
**Bottle 参数:**
- `name`: 实例名称(运行时唯一,由上层 Carrier 以前缀方式传入)
- `diameter`: 瓶体直径 (mm)
- `height`: 瓶体高度 (mm)
- `max_volume`: 最大容积(**μL**500mL = 500000
- `barcode`: 条形码(可选)
### BottleCarrier
```python
from pylabrobot.resources import ResourceHolder
from pylabrobot.resources.carrier import create_ordered_items_2d
from unilabos.resources.itemized_carrier import BottleCarrier
from unilabos.registry.decorators import resource
@resource(id="My_6SlotCarrier", category=["bottle_carriers"], description="六槽位载架")
def My_6SlotCarrier(name: str) -> BottleCarrier:
sites = create_ordered_items_2d(
klass=ResourceHolder,
num_items_x=3, num_items_y=2,
dx=10.0, dy=10.0, dz=5.0,
item_dx=42.0, item_dy=35.0,
size_x=20.0, size_y=20.0, size_z=50.0,
)
# 子 site 用 name 作为前缀
for k, v in sites.items():
v.name = f"{name}_{v.name}"
carrier = BottleCarrier(
name=name, size_x=146.0, size_y=80.0, size_z=55.0,
sites=sites, model="My_6SlotCarrier",
)
carrier.num_items_x = 3
carrier.num_items_y = 2
carrier.num_items_z = 1
# 放置 Bottle 时用 name 作为前缀
ordering = ["A1", "B1", "A2", "B2", "A3", "B3"]
for i in range(6):
carrier[i] = My_Reagent_Bottle(f"{name}_vial_{ordering[i]}")
return carrier
```
### WareHouse / Deck 放置位
WareHouse 和 Site 本质上是同一概念都是定义一组固定放置位slot根据物理尺寸自行批量计算偏移坐标。WareHouse 只是多嵌套了一层 Deck 而已。推荐开发者直接根据实物测量数据计算各 slot 偏移量。
#### WareHouse使用 warehouse_factory
```python
from unilabos.resources.warehouse import warehouse_factory
from unilabos.registry.decorators import resource
@resource(id="my_warehouse_4x4", category=["warehouse"], description="4x4 堆栈仓库")
def my_warehouse_4x4(name: str) -> "WareHouse":
return warehouse_factory(
name=name,
num_items_x=4, num_items_y=4, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0, # 第一个 slot 的起始偏移
item_dx=147.0, item_dy=106.0, item_dz=130.0, # slot 间距
resource_size_x=127.0, resource_size_y=85.0, resource_size_z=100.0, # slot 尺寸
model="my_warehouse_4x4",
col_offset=0, # 列标签起始偏移0 → A01, 4 → A05
layout="row-major", # "row-major" 行优先 / "col-major" 列优先 / "vertical-col-major" 竖向
)
```
`warehouse_factory` 参数说明:
- `dx/dy/dz`:第一个 slot 相对 WareHouse 原点的偏移mm
- `item_dx/item_dy/item_dz`:相邻 slot 间距mm需根据实际物理间距测量
- `resource_size_x/y/z`:每个 slot 的可放置区域尺寸
- `layout`:影响 slot 标签和坐标映射
- `"row-major"`A01,A02,...,B01,B02,...(行优先,适合横向排列)
- `"col-major"`A01,B01,...,A02,B02,...(列优先)
- `"vertical-col-major"`竖向排列y 坐标反向
#### Deck 组装 WareHouse
Deck 通过 `setup()` 将多个 WareHouse 放置到指定坐标:
```python
from pylabrobot.resources import Deck, Coordinate
from unilabos.registry.decorators import resource
@resource(id="MyStation_Deck", category=["deck"], description="我的工作站 Deck")
class MyStation_Deck(Deck):
def __init__(self, name="MyStation_Deck", size_x=2700.0, size_y=1080.0, size_z=1500.0,
category="deck", setup=False, **kwargs) -> None:
super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z)
if setup:
self.setup()
def setup(self) -> None:
self.warehouses = {
"堆栈1左": my_warehouse_4x4("堆栈1左"),
"堆栈1右": my_warehouse_4x4("堆栈1右"),
}
self.warehouse_locations = {
"堆栈1左": Coordinate(-200.0, 400.0, 0.0), # 自行测量计算
"堆栈1右": Coordinate(2350.0, 400.0, 0.0),
}
for wh_name, wh in self.warehouses.items():
self.assign_child_resource(wh, location=self.warehouse_locations[wh_name])
```
#### Site 模式(前端定向放置)
适用于有固定孔位/槽位的设备(如移液站 PRCXI 9300Deck 通过 `sites` 列表定义前端展示的放置位,前端据此渲染可拖拽的孔位布局:
```python
import collections
from typing import Any, Dict, List, Optional
from pylabrobot.resources import Deck, Resource, Coordinate
from unilabos.registry.decorators import resource
@resource(id="MyLabDeck", category=["deck"], description="带 Site 定向放置的 Deck")
class MyLabDeck(Deck):
# 根据设备台面实测批量计算各 slot 坐标偏移
_DEFAULT_SITE_POSITIONS = [
(0, 0, 0), (138, 0, 0), (276, 0, 0), (414, 0, 0), # T1-T4
(0, 96, 0), (138, 96, 0), (276, 96, 0), (414, 96, 0), # T5-T8
]
_DEFAULT_SITE_SIZE = {"width": 128.0, "height": 86.0, "depth": 0}
_DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "tube_rack", "adaptor"]
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
sites: Optional[List[Dict[str, Any]]] = None, **kwargs):
super().__init__(size_x, size_y, size_z, name)
if sites is not None:
self.sites = [dict(s) for s in sites]
else:
self.sites = []
for i, (x, y, z) in enumerate(self._DEFAULT_SITE_POSITIONS):
self.sites.append({
"label": f"T{i + 1}", # 前端显示的槽位标签
"visible": True, # 是否在前端可见
"position": {"x": x, "y": y, "z": z}, # 槽位物理坐标
"size": dict(self._DEFAULT_SITE_SIZE), # 槽位尺寸
"content_type": list(self._DEFAULT_CONTENT_TYPE), # 允许放入的物料类型
})
self._ordering = collections.OrderedDict(
(site["label"], None) for site in self.sites
)
def assign_child_resource(self, resource: Resource,
location: Optional[Coordinate] = None,
reassign: bool = True,
spot: Optional[int] = None):
idx = spot
if spot is None:
for i, site in enumerate(self.sites):
if site.get("label") == resource.name:
idx = i
break
if idx is None:
for i in range(len(self.sites)):
if self._get_site_resource(i) is None:
idx = i
break
if idx is None:
raise ValueError(f"No available site for '{resource.name}'")
loc = Coordinate(**self.sites[idx]["position"])
super().assign_child_resource(resource, location=loc, reassign=reassign)
def serialize(self) -> dict:
data = super().serialize()
sites_out = []
for i, site in enumerate(self.sites):
occupied = self._get_site_resource(i)
sites_out.append({
"label": site["label"],
"visible": site.get("visible", True),
"occupied_by": occupied.name if occupied else None,
"position": site["position"],
"size": site["size"],
"content_type": site["content_type"],
})
data["sites"] = sites_out
return data
```
**Site 字段说明:**
| 字段 | 类型 | 说明 |
|------|------|------|
| `label` | str | 槽位标签(如 `"T1"`),前端显示名称,也用于匹配 resource.name |
| `visible` | bool | 是否在前端可见 |
| `position` | dict | 物理坐标 `{x, y, z}`mm需自行测量计算偏移 |
| `size` | dict | 槽位尺寸 `{width, height, depth}`mm |
| `content_type` | list | 允许放入的物料类型,如 `["plate", "tip_rack", "tube_rack", "adaptor"]` |
**参考实现:** `unilabos/devices/liquid_handling/prcxi/prcxi.py` 中的 `PRCXI9300Deck`4x4 共 16 个 site
---
## 文件位置
```
unilabos/resources/
├── <project>/ # 按项目分组
│ ├── bottles.py # Bottle 工厂函数
│ ├── bottle_carriers.py # Carrier 工厂函数
│ ├── warehouses.py # WareHouse 工厂函数
│ └── decks.py # Deck 类定义
```
---
## 验证
```bash
# 资源可导入
python -c "from unilabos.resources.my_project.bottles import My_Reagent_Bottle; print(My_Reagent_Bottle('test'))"
# 启动测试AST 自动扫描)
unilab -g <graph>.json
```
仅在以下情况仍需 YAML第三方库资源如 pylabrobot 内置资源,无 `@resource` 装饰器)。
---
## 关键路径
| 内容 | 路径 |
|------|------|
| Bottle/Carrier 基类 | `unilabos/resources/itemized_carrier.py` |
| WareHouse 基类 + 工厂 | `unilabos/resources/warehouse.py` |
| PLR 注册 | `unilabos/resources/plr_additional_res_reg.py` |
| 装饰器定义 | `unilabos/registry/decorators.py` |

View File

@@ -0,0 +1,292 @@
# 资源高级参考
本文件是 SKILL.md 的补充,包含类继承体系、序列化/反序列化、Bioyond 物料同步、非瓶类资源和仓库工厂模式。Agent 在需要实现这些功能时按需阅读。
---
## 1. 类继承体系
```
PyLabRobot
├── Resource (PLR 基类)
│ ├── Well
│ │ └── Bottle (unilabos) → 瓶/小瓶/烧杯/反应器
│ ├── Deck
│ │ └── 自定义 Deck 类 (unilabos) → 工作站台面
│ ├── ResourceHolder → 槽位占位符
│ └── Container
│ └── Battery (unilabos) → 组装好的电池
├── ItemizedCarrier (unilabos, 继承 Resource)
│ ├── BottleCarrier (unilabos) → 瓶载架
│ └── WareHouse (unilabos) → 堆栈仓库
├── ItemizedResource (PLR)
│ └── MagazineHolder (unilabos) → 子弹夹载架
└── ResourceStack (PLR)
└── Magazine (unilabos) → 子弹夹洞位
```
### Bottle 类细节
```python
class Bottle(Well):
def __init__(self, name, diameter, height, max_volume,
size_x=0.0, size_y=0.0, size_z=0.0,
barcode=None, category="container", model=None, **kwargs):
super().__init__(
name=name,
size_x=diameter, # PLR 用 diameter 作为 size_x/size_y
size_y=diameter,
size_z=height, # PLR 用 height 作为 size_z
max_volume=max_volume,
category=category,
model=model,
bottom_type="flat",
cross_section_type="circle"
)
```
注意 `size_x = size_y = diameter``size_z = height`
### ItemizedCarrier 核心方法
| 方法 | 说明 |
|------|------|
| `__getitem__(identifier)` | 通过索引或 Excel 标识(如 `"A01"`)访问槽位 |
| `__setitem__(identifier, resource)` | 向槽位放入资源 |
| `get_child_identifier(child)` | 获取子资源的标识符 |
| `capacity` | 总槽位数 |
| `sites` | 所有槽位字典 |
---
## 2. 序列化与反序列化
### PLR ↔ UniLab 转换
| 函数 | 位置 | 方向 |
|------|------|------|
| `ResourceTreeSet.from_plr_resources(resources)` | `resource_tracker.py` | PLR → UniLab |
| `ResourceTreeSet.to_plr_resources()` | `resource_tracker.py` | UniLab → PLR |
### `from_plr_resources` 流程
```
PLR Resource
↓ build_uuid_mapping (递归生成 UUID)
↓ resource.serialize() → dict
↓ resource.serialize_all_state() → states
↓ resource_plr_inner (递归构建 ResourceDictInstance)
ResourceTreeSet
```
关键:每个 PLR 资源通过 `unilabos_uuid` 属性携带 UUID`unilabos_extra` 携带扩展数据(如 `class` 名)。
### `to_plr_resources` 流程
```
ResourceTreeSet
↓ collect_node_data (收集 UUID、状态、扩展数据)
↓ node_to_plr_dict (转为 PLR 字典格式)
↓ find_subclass(type_name, PLRResource) (查找 PLR 子类)
↓ sub_cls.deserialize(plr_dict) (反序列化)
↓ loop_set_uuid, loop_set_extra (递归设置 UUID 和扩展)
PLR Resource
```
### Bottle 序列化
```python
class Bottle(Well):
def serialize(self) -> dict:
data = super().serialize()
return {**data, "diameter": self.diameter, "height": self.height}
@classmethod
def deserialize(cls, data: dict, allow_marshal=False):
barcode_data = data.pop("barcode", None)
instance = super().deserialize(data, allow_marshal=allow_marshal)
if barcode_data and isinstance(barcode_data, str):
instance.barcode = barcode_data
return instance
```
---
## 3. Bioyond 物料同步
### 双向转换函数
| 函数 | 位置 | 方向 |
|------|------|------|
| `resource_bioyond_to_plr(materials, type_mapping, deck)` | `graphio.py` | Bioyond → PLR |
| `resource_plr_to_bioyond(resources, type_mapping, warehouse_mapping)` | `graphio.py` | PLR → Bioyond |
### `resource_bioyond_to_plr` 流程
```
Bioyond 物料列表
↓ reverse_type_mapping: {typeName → (model, UUID)}
↓ 对每个物料:
typeName → 查映射 → model (如 "BIOYOND_PolymerStation_Reactor")
initialize_resource({"name": unique_name, "class": model})
↓ 设置 unilabos_extra (material_bioyond_id, material_bioyond_name 等)
↓ 处理 detail (子物料/坐标)
↓ 按 locationName 放入 deck.warehouses 对应槽位
PLR 资源列表
```
### `resource_plr_to_bioyond` 流程
```
PLR 资源列表
↓ 遍历每个资源:
载架(capacity > 1): 生成 details 子物料 + 坐标
单瓶: 直接映射
↓ type_mapping 查找 typeId
↓ warehouse_mapping 查找位置 UUID
↓ 组装 Bioyond 格式 (name, typeName, typeId, quantity, Parameters, locations)
Bioyond 物料列表
```
### BioyondResourceSynchronizer
工作站通过 `ResourceSynchronizer` 自动同步物料:
```python
class BioyondResourceSynchronizer(ResourceSynchronizer):
def sync_from_external(self) -> bool:
all_data = []
all_data.extend(api_client.stock_material('{"typeMode": 0}')) # 耗材
all_data.extend(api_client.stock_material('{"typeMode": 1}')) # 样品
all_data.extend(api_client.stock_material('{"typeMode": 2}')) # 试剂
unilab_resources = resource_bioyond_to_plr(
all_data,
type_mapping=self.workstation.bioyond_config["material_type_mappings"],
deck=self.workstation.deck
)
# 更新 deck 上的资源
```
---
## 4. 非瓶类资源
### ElectrodeSheet极片
路径:`unilabos/resources/battery/electrode_sheet.py`
```python
class ElectrodeSheet(ResourcePLR):
"""片状材料(极片、隔膜、弹片、垫片等)"""
_unilabos_state = {
"diameter": 0.0,
"thickness": 0.0,
"mass": 0.0,
"material_type": "",
"color": "",
"info": "",
}
```
工厂函数:`PositiveCan`, `PositiveElectrode`, `NegativeCan`, `NegativeElectrode`, `SpringWasher`, `FlatWasher`, `AluminumFoil`
### Battery电池
```python
class Battery(Container):
"""组装好的电池"""
_unilabos_state = {
"color": "",
"electrolyte_name": "",
"open_circuit_voltage": 0.0,
}
```
### Magazine / MagazineHolder子弹夹
```python
class Magazine(ResourceStack):
"""子弹夹洞位,可堆叠 ElectrodeSheet"""
# direction, max_sheets
class MagazineHolder(ItemizedResource):
"""多洞位子弹夹"""
# hole_diameter, hole_depth, max_sheets_per_hole
```
工厂函数 `magazine_factory()``create_homogeneous_resources` 生成洞位,可选预填 `ElectrodeSheet``Battery`
---
## 5. 仓库工厂模式参考
### 实际 warehouse 工厂函数示例
```python
# 行优先 4x4 仓库
def bioyond_warehouse_1x4x4(name: str) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=4, num_items_y=4, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=147.0, item_dy=106.0, item_dz=130.0,
layout="row-major", # A01,A02,A03,A04, B01,...
)
# 右侧 4x4 仓库(列名偏移)
def bioyond_warehouse_1x4x4_right(name: str) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=4, num_items_y=4, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=147.0, item_dy=106.0, item_dz=130.0,
col_offset=4, # A05,A06,A07,A08
layout="row-major",
)
# 竖向仓库(站内试剂存放)
def bioyond_warehouse_reagent_storage(name: str) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=1, num_items_y=2, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=147.0, item_dy=106.0, item_dz=130.0,
layout="vertical-col-major",
)
# 行偏移F 行开始)
def bioyond_warehouse_5x3x1(name: str, row_offset: int = 0) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=3, num_items_y=5, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=159.0, item_dy=183.0, item_dz=130.0,
row_offset=row_offset, # 0→A行起5→F行起
layout="row-major",
)
```
### layout 类型说明
| layout | 命名顺序 | 适用场景 |
|--------|---------|---------|
| `col-major` (默认) | A01,B01,C01,D01, A02,B02,... | 列优先,标准堆栈 |
| `row-major` | A01,A02,A03,A04, B01,B02,... | 行优先Bioyond 前端展示 |
| `vertical-col-major` | 竖向排列,标签从底部开始 | 竖向仓库(试剂存放、测密度) |
---
## 6. 关键路径
| 内容 | 路径 |
|------|------|
| Bottle/Carrier 基类 | `unilabos/resources/itemized_carrier.py` |
| WareHouse 类 + 工厂 | `unilabos/resources/warehouse.py` |
| ResourceTreeSet 转换 | `unilabos/resources/resource_tracker.py` |
| Bioyond 物料转换 | `unilabos/resources/graphio.py` |
| Bioyond 仓库定义 | `unilabos/resources/bioyond/warehouses.py` |
| 电池资源 | `unilabos/resources/battery/` |
| PLR 注册 | `unilabos/resources/plr_additional_res_reg.py` |

View File

@@ -0,0 +1,939 @@
"""
P1 关节数据 & 资源跟随桥接测试 — 全面覆盖 HostNode 关节回调 + resource_pose 回调的边缘 case。
不依赖 ROS2 运行时,通过 mock 模拟 msg 和 bridge。
测试分组:
E1: JointRepublisher JSON 输出格式 (已修复 str→json.dumps)
E2: 关节状态回调 — 从 /joint_states (JointState msg) 直接读取 name/position
E3: 资源跟随 (resource_pose) — 夹爪抓取/释放/多资源
E4: 联合流程 — 关节 + 资源一并通过 bridge 发送
E5: Bridge 调用验证
E6: 同类型设备多实例 — 重复关节名场景
E7: 吞吐优化 — 死区过滤、抑频、增量 resource_poses
"""
import json
import time
import pytest
from unittest.mock import MagicMock
from types import SimpleNamespace
from typing import Dict, Optional
# ==================== 辅助: 模拟 JointState msg ====================
def _make_joint_state_msg(names: list, positions: list, velocities=None, efforts=None):
"""构造模拟的 sensor_msgs/JointState 消息(不依赖 ROS2"""
msg = SimpleNamespace()
msg.name = names
msg.position = positions
msg.velocity = velocities or [0.0] * len(names)
msg.effort = efforts or [0.0] * len(names)
return msg
def _make_string_msg(data: str):
"""构造模拟的 std_msgs/String 消息"""
msg = SimpleNamespace()
msg.data = data
return msg
# ==================== 辅助: 提取 HostNode 核心逻辑用于隔离测试 ====================
class JointBridgeSimulator:
"""
模拟 HostNode 的关节桥接核心逻辑(提取自 host_node.py
不依赖 ROS2 Node、subscription 等基础设施。
包含吞吐优化逻辑:
- 死区过滤 (dead band): 关节变化 < 阈值时不发送
- 抑频 (throttle): 限制最大发送频率
- 增量 resource_poses: 仅在变化时附带
"""
JOINT_DEAD_BAND: float = 1e-4
JOINT_MIN_INTERVAL: float = 0.05 # 秒
def __init__(self, device_uuid_map: Dict[str, str],
dead_band: Optional[float] = None,
min_interval: Optional[float] = None):
self.device_uuid_map = device_uuid_map
self._device_ids_sorted = sorted(device_uuid_map.keys(), key=len, reverse=True)
self._resource_poses: Dict[str, str] = {}
self._resource_poses_dirty: bool = False
self._last_joint_values: Dict[str, float] = {}
self._last_send_time: float = -float("inf") # 确保首条消息总是通过
# 允许测试覆盖优化参数
if dead_band is not None:
self.JOINT_DEAD_BAND = dead_band
if min_interval is not None:
self.JOINT_MIN_INTERVAL = min_interval
def resource_pose_callback(self, msg) -> None:
"""模拟 HostNode._resource_pose_callback含变化检测"""
try:
data = json.loads(msg.data)
except (json.JSONDecodeError, ValueError):
return
if not isinstance(data, dict) or not data:
return
has_change = False
for k, v in data.items():
if self._resource_poses.get(k) != v:
has_change = True
break
if has_change:
self._resource_poses.update(data)
self._resource_poses_dirty = True
def joint_state_callback(self, msg, now: Optional[float] = None) -> dict:
"""
模拟 HostNode._joint_state_callback 核心逻辑(含优化)。
now 参数允许测试控制时间。
返回 {device_id: {"node_uuid": ..., "joint_states": {...}, "resource_poses": {...}}}。
返回 {} 表示被优化过滤。
"""
names = list(msg.name)
positions = list(msg.position)
if not names or len(names) != len(positions):
return {}
if now is None:
now = time.time()
resource_dirty = self._resource_poses_dirty
# 抑频检查
if not resource_dirty and (now - self._last_send_time) < self.JOINT_MIN_INTERVAL:
return {}
# 死区过滤
has_significant_change = False
for name, pos in zip(names, positions):
last_val = self._last_joint_values.get(name)
if last_val is None or abs(float(pos) - last_val) >= self.JOINT_DEAD_BAND:
has_significant_change = True
break
if not has_significant_change and not resource_dirty:
return {}
# 更新状态
for name, pos in zip(names, positions):
self._last_joint_values[name] = float(pos)
self._last_send_time = now
# 按设备 ID 分组关节数据
device_joints: Dict[str, Dict[str, float]] = {}
for name, pos in zip(names, positions):
matched_device = None
for device_id in self._device_ids_sorted:
if name.startswith(device_id + "_"):
matched_device = device_id
break
if matched_device:
if matched_device not in device_joints:
device_joints[matched_device] = {}
device_joints[matched_device][name] = float(pos)
elif len(self.device_uuid_map) == 1:
fallback_id = self._device_ids_sorted[0]
if fallback_id not in device_joints:
device_joints[fallback_id] = {}
device_joints[fallback_id][name] = float(pos)
# 构建设备级 resource_poses仅 dirty 时附带)
device_resource_poses: Dict[str, Dict[str, str]] = {}
if resource_dirty:
for resource_id, link_name in self._resource_poses.items():
matched_device = None
for device_id in self._device_ids_sorted:
if link_name.startswith(device_id + "_"):
matched_device = device_id
break
if matched_device:
if matched_device not in device_resource_poses:
device_resource_poses[matched_device] = {}
device_resource_poses[matched_device][resource_id] = link_name
elif len(self.device_uuid_map) == 1:
fallback_id = self._device_ids_sorted[0]
if fallback_id not in device_resource_poses:
device_resource_poses[fallback_id] = {}
device_resource_poses[fallback_id][resource_id] = link_name
self._resource_poses_dirty = False
result = {}
for device_id, joint_states in device_joints.items():
node_uuid = self.device_uuid_map.get(device_id)
if not node_uuid:
continue
result[device_id] = {
"node_uuid": node_uuid,
"joint_states": joint_states,
"resource_poses": device_resource_poses.get(device_id, {}),
}
return result
# 功能测试中禁用优化dead_band=0, min_interval=0确保逻辑正确性
def _make_sim(device_uuid_map: Dict[str, str]) -> JointBridgeSimulator:
"""创建禁用吞吐优化的模拟器(用于功能正确性测试)"""
return JointBridgeSimulator(device_uuid_map, dead_band=0.0, min_interval=0.0)
# ==================== E1: JointRepublisher JSON 输出 ====================
class TestJointRepublisherFormat:
"""验证 JointRepublisher 输出标准 JSON双引号而非 Python repr单引号"""
def test_output_is_valid_json(self):
"""str() 产生单引号json.dumps() 产生双引号"""
joint_dict = {
"name": ["joint1", "joint2"],
"position": [0.1, 0.2],
"velocity": [0.0, 0.0],
"effort": [0.0, 0.0],
}
result = json.dumps(joint_dict)
parsed = json.loads(result)
assert parsed["name"] == ["joint1", "joint2"]
assert parsed["position"] == [0.1, 0.2]
assert "'" not in result
def test_str_produces_invalid_json(self):
"""对比: str() 不是合法 JSON"""
joint_dict = {"name": ["joint1"], "position": [0.1]}
result = str(joint_dict)
with pytest.raises(json.JSONDecodeError):
json.loads(result)
# ==================== E2: 关节状态回调JointState msg 直接读取)====================
class TestJointStateCallback:
"""测试从 JointState msg 直接读取 name/position 的分组逻辑"""
def test_single_device_simple(self):
"""单设备,关节名有设备前缀"""
sim = _make_sim({"panda": "uuid-panda"})
msg = _make_joint_state_msg(
["panda_joint1", "panda_joint2"], [0.5, 1.0]
)
result = sim.joint_state_callback(msg)
assert "panda" in result
assert result["panda"]["joint_states"]["panda_joint1"] == 0.5
assert result["panda"]["joint_states"]["panda_joint2"] == 1.0
def test_single_device_no_prefix_fallback(self):
"""单设备,关节名无设备前缀 → 应归入唯一设备"""
sim = _make_sim({"robot1": "uuid-r1"})
msg = _make_joint_state_msg(["joint_a", "joint_b"], [1.0, 2.0])
result = sim.joint_state_callback(msg)
assert "robot1" in result
assert result["robot1"]["joint_states"]["joint_a"] == 1.0
assert result["robot1"]["joint_states"]["joint_b"] == 2.0
def test_multi_device_distinct_prefixes(self):
"""多设备,不同前缀,正确分组"""
sim = _make_sim({"arm1": "uuid-arm1", "arm2": "uuid-arm2"})
msg = _make_joint_state_msg(
["arm1_j1", "arm1_j2", "arm2_j1", "arm2_j2"],
[0.1, 0.2, 0.3, 0.4],
)
result = sim.joint_state_callback(msg)
assert result["arm1"]["joint_states"]["arm1_j1"] == 0.1
assert result["arm1"]["joint_states"]["arm1_j2"] == 0.2
assert result["arm2"]["joint_states"]["arm2_j1"] == 0.3
assert result["arm2"]["joint_states"]["arm2_j2"] == 0.4
def test_ambiguous_prefix_longest_wins(self):
"""前缀歧义: arm 和 arm_left — 最长前缀优先"""
sim = _make_sim({"arm": "uuid-arm", "arm_left": "uuid-arm-left"})
msg = _make_joint_state_msg(
["arm_j1", "arm_left_j1", "arm_left_j2"],
[0.1, 0.2, 0.3],
)
result = sim.joint_state_callback(msg)
assert result["arm"]["joint_states"]["arm_j1"] == 0.1
assert result["arm_left"]["joint_states"]["arm_left_j1"] == 0.2
assert result["arm_left"]["joint_states"]["arm_left_j2"] == 0.3
def test_multi_device_unmatched_joints_dropped(self):
"""多设备时,无法匹配前缀的关节应被丢弃(不 fallback"""
sim = _make_sim({"arm1": "uuid-arm1", "arm2": "uuid-arm2"})
msg = _make_joint_state_msg(
["arm1_j1", "unknown_j1"],
[0.1, 0.9],
)
result = sim.joint_state_callback(msg)
assert result["arm1"]["joint_states"]["arm1_j1"] == 0.1
for device_id, data in result.items():
assert "unknown_j1" not in data["joint_states"]
def test_empty_names(self):
"""空 name 列表"""
sim = _make_sim({"dev": "uuid-dev"})
msg = _make_joint_state_msg([], [])
result = sim.joint_state_callback(msg)
assert result == {}
def test_mismatched_lengths(self):
"""name 和 position 长度不一致"""
sim = _make_sim({"dev": "uuid-dev"})
msg = _make_joint_state_msg(["j1", "j2"], [0.1])
result = sim.joint_state_callback(msg)
assert result == {}
def test_no_devices(self):
"""无设备 UUID 映射"""
sim = _make_sim({})
msg = _make_joint_state_msg(["j1"], [0.1])
result = sim.joint_state_callback(msg)
assert result == {}
def test_numeric_prefix_device_ids(self):
"""数字化设备 ID (如 deck1, deck12) — deck12_slot1 不应匹配 deck1"""
sim = _make_sim({"deck1": "uuid-d1", "deck12": "uuid-d12"})
msg = _make_joint_state_msg(
["deck1_slot1", "deck12_slot1"],
[1.0, 2.0],
)
result = sim.joint_state_callback(msg)
assert result["deck1"]["joint_states"]["deck1_slot1"] == 1.0
assert result["deck12"]["joint_states"]["deck12_slot1"] == 2.0
def test_position_float_conversion(self):
"""position 值应强制转为 float即使输入为 int"""
sim = _make_sim({"arm": "uuid-arm"})
msg = _make_joint_state_msg(["arm_j1"], [1])
result = sim.joint_state_callback(msg)
assert result["arm"]["joint_states"]["arm_j1"] == 1.0
assert isinstance(result["arm"]["joint_states"]["arm_j1"], float)
def test_node_uuid_in_result(self):
"""结果中应携带正确的 node_uuid"""
sim = _make_sim({"panda": "uuid-panda-123"})
msg = _make_joint_state_msg(["panda_j1"], [0.5])
result = sim.joint_state_callback(msg)
assert result["panda"]["node_uuid"] == "uuid-panda-123"
def test_device_with_no_uuid_skipped(self):
"""device_uuid_map 中存在映射但值为空 → 跳过"""
sim = _make_sim({"arm": ""})
msg = _make_joint_state_msg(["arm_j1"], [0.5])
result = sim.joint_state_callback(msg)
assert result == {}
def test_many_joints_single_device(self):
"""单设备大量关节(如 7-DOF arm"""
sim = _make_sim({"panda": "uuid-panda"})
names = [f"panda_joint{i}" for i in range(1, 8)]
positions = [float(i) * 0.1 for i in range(1, 8)]
msg = _make_joint_state_msg(names, positions)
result = sim.joint_state_callback(msg)
assert len(result["panda"]["joint_states"]) == 7
assert result["panda"]["joint_states"]["panda_joint7"] == pytest.approx(0.7)
def test_duplicate_joint_names_last_wins(self):
"""同类型设备多个实例时如果关节名完全重复bug 场景),后出现的值覆盖前者"""
sim = _make_sim({"dev": "uuid-dev"})
msg = _make_joint_state_msg(["dev_j1", "dev_j1"], [1.0, 2.0])
result = sim.joint_state_callback(msg)
assert result["dev"]["joint_states"]["dev_j1"] == 2.0
def test_negative_positions(self):
"""关节角度为负数"""
sim = _make_sim({"arm": "uuid-arm"})
msg = _make_joint_state_msg(["arm_j1", "arm_j2"], [-1.57, -3.14])
result = sim.joint_state_callback(msg)
assert result["arm"]["joint_states"]["arm_j1"] == pytest.approx(-1.57)
assert result["arm"]["joint_states"]["arm_j2"] == pytest.approx(-3.14)
# ==================== E3: 资源跟随 (resource_pose) ====================
class TestResourcePoseCallback:
"""测试 resource_pose 回调 — 夹爪抓取/释放/多资源"""
def test_single_resource_attach(self):
"""单个资源挂载到夹爪 link"""
sim = _make_sim({"panda": "uuid-panda"})
msg = _make_string_msg(json.dumps({"plate_1": "panda_gripper_link"}))
sim.resource_pose_callback(msg)
assert sim._resource_poses == {"plate_1": "panda_gripper_link"}
assert sim._resource_poses_dirty is True
def test_multiple_resource_attach(self):
"""多个资源同时挂载到不同 link"""
sim = _make_sim({"panda": "uuid-panda"})
msg = _make_string_msg(json.dumps({
"plate_1": "panda_gripper_link",
"tip_rack": "panda_deck_link",
}))
sim.resource_pose_callback(msg)
assert sim._resource_poses["plate_1"] == "panda_gripper_link"
assert sim._resource_poses["tip_rack"] == "panda_deck_link"
def test_incremental_update(self):
"""增量更新:新消息合并到已有状态"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_deck_link"})))
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_2": "panda_gripper_link"})))
assert len(sim._resource_poses) == 2
assert sim._resource_poses["plate_1"] == "panda_deck_link"
assert sim._resource_poses["plate_2"] == "panda_gripper_link"
def test_resource_reattach(self):
"""资源从 deck 移动到 gripper抓取操作"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_deck_link"})))
assert sim._resource_poses["plate_1"] == "panda_deck_link"
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_gripper_link"})))
assert sim._resource_poses["plate_1"] == "panda_gripper_link"
def test_resource_release_back_to_world(self):
"""释放资源回到 world"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_gripper_link"})))
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "world"})))
assert sim._resource_poses["plate_1"] == "world"
def test_empty_dict_heartbeat_no_dirty(self):
"""空 dict心跳包不标记 dirty"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_link"})))
sim._resource_poses_dirty = False # 重置
sim.resource_pose_callback(_make_string_msg(json.dumps({})))
assert sim._resource_poses_dirty is False # 空 dict 不应标记 dirty
def test_same_value_no_dirty(self):
"""重复发送相同值不应标记 dirty"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_link"})))
sim._resource_poses_dirty = False
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_link"})))
assert sim._resource_poses_dirty is False
def test_invalid_json_ignored(self):
"""非法 JSON 消息不影响状态"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate_1": "panda_link"})))
sim.resource_pose_callback(_make_string_msg("not valid json {{{"))
assert sim._resource_poses["plate_1"] == "panda_link"
def test_non_dict_json_ignored(self):
"""JSON 但不是 dict如 list应被忽略"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps(["not", "a", "dict"])))
assert sim._resource_poses == {}
def test_python_repr_ignored(self):
"""Python repr 格式(单引号)应被忽略"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg("{'plate_1': 'panda_link'}"))
assert sim._resource_poses == {}
def test_multi_device_resource_attach(self):
"""多设备场景:不同设备的 link 挂载不同资源"""
sim = _make_sim({"arm1": "uuid-arm1", "arm2": "uuid-arm2"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_A": "arm1_gripper_link",
"plate_B": "arm2_gripper_link",
})))
assert sim._resource_poses["plate_A"] == "arm1_gripper_link"
assert sim._resource_poses["plate_B"] == "arm2_gripper_link"
# ==================== E4: 联合流程 — 关节 + 资源一并通过 bridge ====================
class TestJointWithResourcePoses:
"""测试关节状态回调时resource_poses 被正确按设备分组并包含在结果中"""
def test_single_device_joint_with_resource(self):
"""单设备:关节更新时携带已挂载的资源"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_1": "panda_gripper_link",
})))
msg = _make_joint_state_msg(["panda_j1", "panda_j2"], [0.5, 1.0])
result = sim.joint_state_callback(msg)
assert result["panda"]["resource_poses"] == {"plate_1": "panda_gripper_link"}
def test_single_device_no_resource(self):
"""单设备:无资源挂载时 resource_poses 为空 dict"""
sim = _make_sim({"panda": "uuid-panda"})
msg = _make_joint_state_msg(["panda_j1"], [0.5])
result = sim.joint_state_callback(msg)
assert result["panda"]["resource_poses"] == {}
def test_multi_device_resource_routing(self):
"""多设备:资源按 link 前缀路由到正确设备"""
sim = _make_sim({"arm1": "uuid-arm1", "arm2": "uuid-arm2"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_A": "arm1_gripper_link",
"plate_B": "arm2_gripper_link",
"tube_1": "arm1_tool_link",
})))
msg = _make_joint_state_msg(
["arm1_j1", "arm2_j1"],
[0.1, 0.2],
)
result = sim.joint_state_callback(msg)
assert result["arm1"]["resource_poses"] == {
"plate_A": "arm1_gripper_link",
"tube_1": "arm1_tool_link",
}
assert result["arm2"]["resource_poses"] == {"plate_B": "arm2_gripper_link"}
def test_resource_on_world_frame_not_routed(self):
"""资源挂在 world frame已释放— 多设备时无法匹配任何设备前缀"""
sim = _make_sim({"arm1": "uuid-arm1", "arm2": "uuid-arm2"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_A": "world",
})))
msg = _make_joint_state_msg(["arm1_j1"], [0.1])
result = sim.joint_state_callback(msg)
assert result["arm1"]["resource_poses"] == {}
def test_resource_world_frame_single_device_fallback(self):
"""单设备时 world frame 的资源走 fallback"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_A": "world",
})))
msg = _make_joint_state_msg(["panda_j1"], [0.1])
result = sim.joint_state_callback(msg)
assert result["panda"]["resource_poses"] == {"plate_A": "world"}
def test_grab_and_move_sequence(self):
"""完整夹取序列: 资源在 deck → gripper 抓取 → arm 移动 → 放下"""
sim = _make_sim({"panda": "uuid-panda"})
# 初始: plate 在 deck
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_1": "panda_deck_third_link",
})))
msg = _make_joint_state_msg(
["panda_j1", "panda_j2", "panda_j3"],
[0.0, -0.5, 1.0],
)
result = sim.joint_state_callback(msg)
assert result["panda"]["resource_poses"]["plate_1"] == "panda_deck_third_link"
# 抓取: plate 从 deck → gripper
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_1": "panda_gripper_link",
})))
msg = _make_joint_state_msg(
["panda_j1", "panda_j2", "panda_j3"],
[1.57, 0.0, -0.5],
)
result = sim.joint_state_callback(msg)
assert result["panda"]["resource_poses"]["plate_1"] == "panda_gripper_link"
assert result["panda"]["joint_states"]["panda_j1"] == pytest.approx(1.57)
# 放下: plate 从 gripper → 目标 deck
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_1": "panda_deck_first_link",
})))
msg = _make_joint_state_msg(
["panda_j1", "panda_j2", "panda_j3"],
[0.0, 0.0, 0.0],
)
result = sim.joint_state_callback(msg)
assert result["panda"]["resource_poses"]["plate_1"] == "panda_deck_first_link"
def test_simultaneous_grab_multiple_resources(self):
"""同时持有多个资源(如双夹爪)"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_1": "panda_left_gripper",
"plate_2": "panda_right_gripper",
"tip_rack": "panda_deck_link",
})))
msg = _make_joint_state_msg(["panda_j1"], [0.5])
result = sim.joint_state_callback(msg)
assert len(result["panda"]["resource_poses"]) == 3
def test_resource_with_ambiguous_link_prefix(self):
"""link 前缀歧义: arm_left_gripper 应匹配 arm_left 而非 arm"""
sim = _make_sim({"arm": "uuid-arm", "arm_left": "uuid-arm-left"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_A": "arm_gripper_link",
"plate_B": "arm_left_gripper_link",
})))
msg = _make_joint_state_msg(
["arm_j1", "arm_left_j1"],
[0.1, 0.2],
)
result = sim.joint_state_callback(msg)
assert result["arm"]["resource_poses"] == {"plate_A": "arm_gripper_link"}
assert result["arm_left"]["resource_poses"] == {"plate_B": "arm_left_gripper_link"}
# ==================== E5: Bridge 调用验证 ====================
class TestBridgeCalls:
"""验证完整桥接流: callback → bridge.publish_joint_state 调用"""
def test_bridge_called_per_device(self):
"""每个设备调用一次 publish_joint_state"""
device_uuid_map = {"arm1": "uuid-111", "arm2": "uuid-222"}
sim = _make_sim(device_uuid_map)
bridge = MagicMock()
bridge.publish_joint_state = MagicMock()
msg = _make_joint_state_msg(
["arm1_j1", "arm2_j1"],
[1.0, 2.0],
)
result = sim.joint_state_callback(msg)
for device_id, data in result.items():
bridge.publish_joint_state(
data["node_uuid"], data["joint_states"], data["resource_poses"]
)
assert bridge.publish_joint_state.call_count == 2
call_uuids = {c[0][0] for c in bridge.publish_joint_state.call_args_list}
assert call_uuids == {"uuid-111", "uuid-222"}
def test_bridge_called_with_resource_poses(self):
"""bridge 调用时携带 resource_poses"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_1": "panda_gripper_link",
})))
bridge = MagicMock()
msg = _make_joint_state_msg(["panda_j1"], [0.5])
result = sim.joint_state_callback(msg)
for device_id, data in result.items():
bridge.publish_joint_state(
data["node_uuid"], data["joint_states"], data["resource_poses"]
)
bridge.publish_joint_state.assert_called_once_with(
"uuid-panda",
{"panda_j1": 0.5},
{"plate_1": "panda_gripper_link"},
)
def test_bridge_no_call_for_empty_joints(self):
"""无关节数据时不调用 bridge"""
sim = _make_sim({"panda": "uuid-panda"})
bridge = MagicMock()
msg = _make_joint_state_msg([], [])
result = sim.joint_state_callback(msg)
for device_id, data in result.items():
bridge.publish_joint_state(
data["node_uuid"], data["joint_states"], data["resource_poses"]
)
bridge.publish_joint_state.assert_not_called()
def test_bridge_resource_poses_empty_when_no_resources(self):
"""无资源挂载时resource_poses 参数为空 dict"""
sim = _make_sim({"panda": "uuid-panda"})
bridge = MagicMock()
msg = _make_joint_state_msg(["panda_j1"], [0.5])
result = sim.joint_state_callback(msg)
for device_id, data in result.items():
bridge.publish_joint_state(
data["node_uuid"], data["joint_states"], data["resource_poses"]
)
bridge.publish_joint_state.assert_called_once_with(
"uuid-panda",
{"panda_j1": 0.5},
{},
)
def test_multi_bridge_all_called(self):
"""多个 bridge 都应被调用"""
sim = _make_sim({"arm": "uuid-arm"})
bridges = [MagicMock(), MagicMock()]
msg = _make_joint_state_msg(["arm_j1"], [0.5])
result = sim.joint_state_callback(msg)
for device_id, data in result.items():
for bridge in bridges:
bridge.publish_joint_state(
data["node_uuid"], data["joint_states"], data["resource_poses"]
)
for bridge in bridges:
bridge.publish_joint_state.assert_called_once()
# ==================== E6: 同类型设备多个实例 — 重复关节名场景 ====================
class TestDuplicateDeviceTypes:
"""
多个同类型设备(如 2 个 OT-2 移液器),关节名格式为 {device_id}_{joint_name}
设备 ID 不同(如 ot2_left, ot2_right但底层关节名相同如 pipette_j1
"""
def test_same_type_different_id(self):
"""同类型设备不同 ID"""
sim = _make_sim({
"ot2_left": "uuid-ot2-left",
"ot2_right": "uuid-ot2-right",
})
msg = _make_joint_state_msg(
["ot2_left_pipette_j1", "ot2_left_pipette_j2",
"ot2_right_pipette_j1", "ot2_right_pipette_j2"],
[0.1, 0.2, 0.3, 0.4],
)
result = sim.joint_state_callback(msg)
assert result["ot2_left"]["joint_states"]["ot2_left_pipette_j1"] == 0.1
assert result["ot2_left"]["joint_states"]["ot2_left_pipette_j2"] == 0.2
assert result["ot2_right"]["joint_states"]["ot2_right_pipette_j1"] == 0.3
assert result["ot2_right"]["joint_states"]["ot2_right_pipette_j2"] == 0.4
def test_same_type_with_resources_routed_correctly(self):
"""同类型设备各自抓取资源,按 link 前缀正确路由"""
sim = _make_sim({
"ot2_left": "uuid-ot2-left",
"ot2_right": "uuid-ot2-right",
})
sim.resource_pose_callback(_make_string_msg(json.dumps({
"plate_A": "ot2_left_gripper",
"plate_B": "ot2_right_gripper",
})))
msg = _make_joint_state_msg(
["ot2_left_j1", "ot2_right_j1"],
[0.5, 0.6],
)
result = sim.joint_state_callback(msg)
assert result["ot2_left"]["resource_poses"] == {"plate_A": "ot2_left_gripper"}
assert result["ot2_right"]["resource_poses"] == {"plate_B": "ot2_right_gripper"}
def test_numbered_devices_no_confusion(self):
"""编号设备: robot1 不应匹配 robot10 的关节"""
sim = _make_sim({
"robot1": "uuid-r1",
"robot10": "uuid-r10",
})
msg = _make_joint_state_msg(
["robot1_j1", "robot10_j1"],
[1.0, 10.0],
)
result = sim.joint_state_callback(msg)
assert result["robot1"]["joint_states"]["robot1_j1"] == 1.0
assert result["robot10"]["joint_states"]["robot10_j1"] == 10.0
def test_three_same_type_devices(self):
"""三个同类型设备"""
sim = _make_sim({
"pump_a": "uuid-pa",
"pump_b": "uuid-pb",
"pump_c": "uuid-pc",
})
msg = _make_joint_state_msg(
["pump_a_flow", "pump_b_flow", "pump_c_flow",
"pump_a_pressure", "pump_b_pressure"],
[1.0, 2.0, 3.0, 0.1, 0.2],
)
result = sim.joint_state_callback(msg)
assert len(result["pump_a"]["joint_states"]) == 2
assert len(result["pump_b"]["joint_states"]) == 2
assert len(result["pump_c"]["joint_states"]) == 1
# ==================== E7: 吞吐优化测试 ====================
class TestThroughputOptimizations:
"""测试死区过滤、抑频、增量 resource_poses 等优化行为"""
# --- 死区过滤 (Dead Band) ---
def test_dead_band_filters_tiny_change(self):
"""关节变化小于死区阈值 → 被过滤"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.01, min_interval=0.0)
msg1 = _make_joint_state_msg(["arm_j1"], [1.0])
result1 = sim.joint_state_callback(msg1, now=0.0)
assert "arm" in result1
# 微小变化 (0.001 < 0.01 死区)
msg2 = _make_joint_state_msg(["arm_j1"], [1.001])
result2 = sim.joint_state_callback(msg2, now=1.0)
assert result2 == {}
def test_dead_band_passes_significant_change(self):
"""关节变化大于死区阈值 → 通过"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.01, min_interval=0.0)
msg1 = _make_joint_state_msg(["arm_j1"], [1.0])
sim.joint_state_callback(msg1, now=0.0)
msg2 = _make_joint_state_msg(["arm_j1"], [1.05])
result2 = sim.joint_state_callback(msg2, now=1.0)
assert "arm" in result2
assert result2["arm"]["joint_states"]["arm_j1"] == pytest.approx(1.05)
def test_dead_band_first_message_always_passes(self):
"""首次消息总是通过(无历史值)"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=1000.0, min_interval=0.0)
msg = _make_joint_state_msg(["arm_j1"], [0.001])
result = sim.joint_state_callback(msg, now=0.0)
assert "arm" in result
def test_dead_band_any_joint_change_triggers(self):
"""多关节中只要有一个超过死区就全部发送"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.01, min_interval=0.0)
msg1 = _make_joint_state_msg(["arm_j1", "arm_j2"], [1.0, 2.0])
sim.joint_state_callback(msg1, now=0.0)
# j1 微变化j2 大变化
msg2 = _make_joint_state_msg(["arm_j1", "arm_j2"], [1.001, 2.5])
result2 = sim.joint_state_callback(msg2, now=1.0)
assert "arm" in result2
# 两个关节的值都应包含在结果中
assert result2["arm"]["joint_states"]["arm_j1"] == pytest.approx(1.001)
assert result2["arm"]["joint_states"]["arm_j2"] == pytest.approx(2.5)
# --- 抑频 (Throttle) ---
def test_throttle_filters_rapid_messages(self):
"""发送间隔内的消息被过滤"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.0, min_interval=0.1)
msg1 = _make_joint_state_msg(["arm_j1"], [1.0])
result1 = sim.joint_state_callback(msg1, now=0.0)
assert "arm" in result1
# 0.05s < 0.1s 间隔
msg2 = _make_joint_state_msg(["arm_j1"], [2.0])
result2 = sim.joint_state_callback(msg2, now=0.05)
assert result2 == {}
def test_throttle_passes_after_interval(self):
"""超过发送间隔后消息通过"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.0, min_interval=0.1)
msg1 = _make_joint_state_msg(["arm_j1"], [1.0])
sim.joint_state_callback(msg1, now=0.0)
msg2 = _make_joint_state_msg(["arm_j1"], [2.0])
result2 = sim.joint_state_callback(msg2, now=0.15)
assert "arm" in result2
def test_throttle_bypassed_by_resource_change(self):
"""resource_pose 变化时忽略抑频限制"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.0, min_interval=1.0)
msg1 = _make_joint_state_msg(["arm_j1"], [1.0])
sim.joint_state_callback(msg1, now=0.0)
# 资源变化 → 强制发送
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate": "arm_gripper"})))
msg2 = _make_joint_state_msg(["arm_j1"], [1.0])
result2 = sim.joint_state_callback(msg2, now=0.01) # 远小于 1.0 间隔
assert "arm" in result2
assert result2["arm"]["resource_poses"] == {"plate": "arm_gripper"}
# --- 增量 resource_poses ---
def test_resource_poses_only_sent_when_dirty(self):
"""resource_poses 仅在 dirty 时附带,否则为空"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate": "panda_gripper"})))
# 第一次发送dirty → 携带 resource_poses
msg1 = _make_joint_state_msg(["panda_j1"], [0.5])
result1 = sim.joint_state_callback(msg1)
assert result1["panda"]["resource_poses"] == {"plate": "panda_gripper"}
# dirty 已清除
assert sim._resource_poses_dirty is False
# 第二次发送not dirty → resource_poses 为空
msg2 = _make_joint_state_msg(["panda_j1"], [1.0])
result2 = sim.joint_state_callback(msg2)
assert result2["panda"]["resource_poses"] == {}
def test_resource_change_resets_dirty_after_send(self):
"""dirty 在发送后被重置,再次 resource_pose 变化后重新标记"""
sim = _make_sim({"panda": "uuid-panda"})
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate": "panda_deck"})))
msg = _make_joint_state_msg(["panda_j1"], [0.5])
sim.joint_state_callback(msg)
assert sim._resource_poses_dirty is False
# 再次资源变化
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate": "panda_gripper"})))
assert sim._resource_poses_dirty is True
msg2 = _make_joint_state_msg(["panda_j1"], [1.0])
result2 = sim.joint_state_callback(msg2)
assert result2["panda"]["resource_poses"] == {"plate": "panda_gripper"}
# --- 组合场景 ---
def test_dead_band_bypassed_by_resource_dirty(self):
"""关节无变化但 resource_pose 有变化 → 仍然发送"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.01, min_interval=0.0)
msg1 = _make_joint_state_msg(["arm_j1"], [1.0])
sim.joint_state_callback(msg1, now=0.0)
sim.resource_pose_callback(_make_string_msg(json.dumps({"plate": "arm_gripper"})))
# 关节值完全不变
msg2 = _make_joint_state_msg(["arm_j1"], [1.0])
result2 = sim.joint_state_callback(msg2, now=1.0)
assert "arm" in result2
assert result2["arm"]["resource_poses"] == {"plate": "arm_gripper"}
def test_high_frequency_stream_only_significant_pass(self):
"""模拟高频流: 只有显著变化的消息通过"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.01, min_interval=0.0)
t = 0.0
passed_count = 0
# 100 条消息,每条微小递增 0.001
for i in range(100):
t += 0.1
val = 1.0 + i * 0.001
msg = _make_joint_state_msg(["arm_j1"], [val])
result = sim.joint_state_callback(msg, now=t)
if result:
passed_count += 1
# 首次总通过 + 每 10 条左右(累计 0.01 变化)通过一次
assert passed_count < 20 # 远少于 100
assert passed_count >= 5 # 但不应为 0
def test_throttle_and_dead_band_combined(self):
"""同时受抑频和死区影响"""
sim = JointBridgeSimulator({"arm": "uuid-arm"}, dead_band=0.01, min_interval=0.5)
# 首条通过
msg1 = _make_joint_state_msg(["arm_j1"], [1.0])
assert sim.joint_state_callback(msg1, now=0.0) != {}
# 时间不够 + 变化不够 → 过滤
msg2 = _make_joint_state_msg(["arm_j1"], [1.001])
assert sim.joint_state_callback(msg2, now=0.1) == {}
# 时间够但变化不够 → 过滤
msg3 = _make_joint_state_msg(["arm_j1"], [1.002])
assert sim.joint_state_callback(msg3, now=1.0) == {}
# 时间够且变化够 → 通过
msg4 = _make_joint_state_msg(["arm_j1"], [1.05])
assert sim.joint_state_callback(msg4, now=1.5) != {}

View File

@@ -50,6 +50,17 @@ class BaseCommunicationClient(ABC):
"""
pass
def publish_joint_state(self, node_uuid: str, joint_states: dict, resource_poses: dict = None) -> None:
"""
发布高频关节状态数据push_joint_state action不写 DB
Args:
node_uuid: 设备节点的云端 UUID
joint_states: 关节名 → 角度/位置 的映射
resource_poses: 物料附着映射(可选)
"""
pass
@abstractmethod
def publish_job_status(
self, feedback_data: dict, job_id: str, status: str, return_info: Optional[dict] = None

View File

@@ -264,6 +264,12 @@ def parse_args():
default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware",
)
parser.add_argument(
"--external_devices_only",
action="store_true",
default=False,
help="Only load external device packages (--devices), skip built-in unilabos/devices/ scanning and YAML device registry",
)
parser.add_argument(
"--extra_resource",
action="store_true",
@@ -342,11 +348,18 @@ def main():
check_mode = args_dict.get("check_mode", False)
if not skip_env_check:
from unilabos.utils.environment_check import check_environment
from unilabos.utils.environment_check import check_environment, check_device_package_requirements
if not check_environment(auto_install=True):
print_status("环境检查失败,程序退出", "error")
os._exit(1)
# 第一次设备包依赖检查build_registry 之前,确保 import map 可用
devices_dirs_for_req = args_dict.get("devices", None)
if devices_dirs_for_req:
if not check_device_package_requirements(devices_dirs_for_req):
print_status("设备包依赖检查失败,程序退出", "error")
os._exit(1)
else:
print_status("跳过环境依赖检查", "warning")
@@ -477,19 +490,7 @@ def main():
BasicConfig.vis_2d_enable = args_dict["2d_vis"]
BasicConfig.check_mode = check_mode
from unilabos.resources.graphio import (
read_node_link_json,
read_graphml,
dict_from_graph,
)
from unilabos.app.communication import get_communication_client
from unilabos.registry.registry import build_registry
from unilabos.app.backend import start_backend
from unilabos.app.web import http_client
from unilabos.app.web import start_server
from unilabos.app.register import register_devices_and_resources
from unilabos.resources.graphio import modify_to_backend_format
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
# 显示启动横幅
print_unilab_banner(args_dict)
@@ -498,12 +499,14 @@ def main():
# check_mode 和 upload_registry 都会执行实际 import 验证
devices_dirs = args_dict.get("devices", None)
complete_registry = args_dict.get("complete_registry", False) or check_mode
external_only = args_dict.get("external_devices_only", False)
lab_registry = build_registry(
registry_paths=args_dict["registry_path"],
devices_dirs=devices_dirs,
upload_registry=BasicConfig.upload_registry,
check_mode=check_mode,
complete_registry=complete_registry,
external_only=external_only,
)
# Check mode: 注册表验证完成后直接退出
@@ -513,6 +516,20 @@ def main():
print_status(f"Check mode: 注册表验证完成 ({device_count} 设备, {resource_count} 资源),退出", "info")
os._exit(0)
# 以下导入依赖 ROS2 环境check_mode 已退出不需要
from unilabos.resources.graphio import (
read_node_link_json,
read_graphml,
dict_from_graph,
modify_to_backend_format,
)
from unilabos.app.communication import get_communication_client
from unilabos.app.backend import start_backend
from unilabos.app.web import http_client
from unilabos.app.web import start_server
from unilabos.app.register import register_devices_and_resources
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
# Step 1: 上传全部注册表到服务端,同步保存到 unilabos_data
if BasicConfig.upload_registry:
if BasicConfig.ak and BasicConfig.sk:
@@ -610,6 +627,10 @@ def main():
resource_tree_set.merge_remote_resources(remote_tree_set)
print_status("远端物料同步完成", "info")
# 第二次设备包依赖检查云端物料同步后community 包可能引入新的 requirements
# TODO: 当 community device package 功能上线后,在这里调用
# install_requirements_txt(community_pkg_path / "requirements.txt", label="community.xxx")
# 使用 ResourceTreeSet 代替 list
args_dict["resources_config"] = resource_tree_set
args_dict["devices_config"] = resource_tree_set

View File

@@ -1434,6 +1434,21 @@ class WebSocketClient(BaseCommunicationClient):
self.message_processor.send_message(message)
# logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}")
def publish_joint_state(self, node_uuid: str, joint_states: dict, resource_poses: dict = None) -> None:
"""发布高频关节状态push_joint_state不写 DB"""
if self.is_disabled or not self.is_connected():
return
message = {
"action": "push_joint_state",
"data": {
"node_uuid": node_uuid,
"joint_states": joint_states or {},
"resource_poses": resource_poses or {},
},
}
self.message_processor.send_message(message)
def publish_job_status(
self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None
) -> None:

View File

@@ -0,0 +1,88 @@
"""虚拟样品演示设备 — 用于前端 sample tracking 功能的极简 demo"""
import asyncio
import logging
import random
import time
from typing import Any, Dict, List, Optional
class VirtualSampleDemo:
"""虚拟样品追踪演示设备,提供两种典型返回模式:
- measure_samples: 等长输入输出 (前端按 index 自动对齐)
- split_and_measure: 输出比输入长,附带 samples 列标注归属
"""
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
if device_id is None and "id" in kwargs:
device_id = kwargs.pop("id")
if config is None and "config" in kwargs:
config = kwargs.pop("config")
self.device_id = device_id or "unknown_sample_demo"
self.config = config or {}
self.logger = logging.getLogger(f"VirtualSampleDemo.{self.device_id}")
self.data: Dict[str, Any] = {"status": "Idle"}
# ------------------------------------------------------------------
# Action 1: 等长输入输出,无 samples 列
# ------------------------------------------------------------------
async def measure_samples(self, concentrations: List[float]) -> Dict[str, Any]:
"""模拟光度测量。absorbance = concentration * 0.05 + noise
入参和出参 list 长度相等,前端按 index 自动对齐。
"""
self.logger.info(f"measure_samples: concentrations={concentrations}")
absorbance = [round(c * 0.05 + random.gauss(0, 0.005), 4) for c in concentrations]
return {"concentrations": concentrations, "absorbance": absorbance}
# ------------------------------------------------------------------
# Action 2: 输出比输入长,带 samples 列
# ------------------------------------------------------------------
async def split_and_measure(self, volumes: List[float], split_count: int = 3) -> Dict[str, Any]:
"""将每个样品均分为 split_count 份后逐份测量。
返回的 list 长度 = len(volumes) * split_count
附带 samples 列标注每行属于第几个输入样品 (0-based index)。
"""
self.logger.info(f"split_and_measure: volumes={volumes}, split_count={split_count}")
out_volumes: List[float] = []
readings: List[float] = []
samples: List[int] = []
for idx, vol in enumerate(volumes):
split_vol = round(vol / split_count, 2)
for _ in range(split_count):
out_volumes.append(split_vol)
readings.append(round(random.uniform(0.1, 1.0), 4))
samples.append(idx)
return {"volumes": out_volumes, "readings": readings, "samples": samples}
# ------------------------------------------------------------------
# Action 3: 入参和出参都带 samples 列(不等长)
# ------------------------------------------------------------------
async def analyze_readings(self, readings: List[float], samples: List[int]) -> Dict[str, Any]:
"""对 split_and_measure 的输出做二次分析。
入参 readings/samples 长度相同但 > 原始样品数,
出参同样带 samples 列,长度与入参一致。
"""
self.logger.info(f"analyze_readings: readings={readings}, samples={samples}")
scores: List[float] = []
passed: List[bool] = []
threshold = 0.4
for r in readings:
score = round(r * 100 + random.gauss(0, 2), 2)
scores.append(score)
passed.append(r >= threshold)
return {"scores": scores, "passed": passed, "samples": samples}
# ------------------------------------------------------------------
# 状态属性
# ------------------------------------------------------------------
@property
def status(self) -> str:
return self.data.get("status", "Idle")

View File

@@ -139,6 +139,7 @@ def scan_directory(
executor: ThreadPoolExecutor = None,
exclude_files: Optional[set] = None,
cache: Optional[Dict[str, Any]] = None,
include_files: Optional[List[Union[str, Path]]] = None,
) -> Dict[str, Any]:
"""
Recursively scan .py files under *root_dir* for @device and @resource
@@ -164,6 +165,7 @@ def scan_directory(
exclude_files: 要排除的文件名集合 (如 {"lab_resources.py"})
cache: Mutable cache dict (``load_scan_cache()`` result). Hits are read
from here; misses are written back so the caller can persist later.
include_files: 指定扫描的文件列表,提供时跳过目录递归收集,直接扫描这些文件。
"""
if executor is None:
raise ValueError("executor is required and must not be None")
@@ -175,7 +177,10 @@ def scan_directory(
python_path = Path(python_path).resolve()
# --- Collect files (depth/count limited) ---
py_files = _collect_py_files(root_dir, max_depth=max_depth, max_files=max_files, exclude_files=exclude_files)
if include_files is not None:
py_files = [Path(f).resolve() for f in include_files if Path(f).resolve().exists()]
else:
py_files = _collect_py_files(root_dir, max_depth=max_depth, max_files=max_files, exclude_files=exclude_files)
cache_files: Dict[str, Any] = cache.get("files", {}) if cache else {}

View File

@@ -2804,6 +2804,294 @@ virtual_rotavap:
- vacuum_pressure
type: object
version: 1.0.0
virtual_sample_demo:
category:
- virtual_device
class:
action_value_mappings:
analyze_readings:
feedback: {}
goal:
readings: readings
samples: samples
goal_default:
readings: []
samples: []
handles:
input:
- data_key: readings
data_source: handle
data_type: sample_list
handler_key: readings_in
label: 测量读数
- data_key: samples
data_source: handle
data_type: sample_index
handler_key: samples_in
label: 样品索引
output:
- data_key: scores
data_source: executor
data_type: sample_list
handler_key: scores_out
label: 分析得分
- data_key: passed
data_source: executor
data_type: sample_list
handler_key: passed_out
label: 是否通过
- data_key: samples
data_source: executor
data_type: sample_index
handler_key: samples_result_out
label: 样品索引
placeholder_keys: {}
result:
passed: passed
samples: samples
scores: scores
schema:
description: 对 split_and_measure 输出做二次分析,入参和出参都带 samples 列
properties:
feedback:
properties: {}
required: []
title: AnalyzeReadings_Feedback
type: object
goal:
properties:
readings:
description: 测量读数(来自 split_and_measure
items:
type: number
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
required:
- readings
- samples
title: AnalyzeReadings_Goal
type: object
result:
properties:
passed:
description: 是否通过阈值
items:
type: boolean
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
scores:
description: 分析得分
items:
type: number
type: array
required:
- scores
- passed
- samples
title: AnalyzeReadings_Result
type: object
required:
- goal
title: AnalyzeReadings
type: object
type: UniLabJsonCommandAsync
auto-cleanup:
feedback: {}
goal: {}
goal_default: {}
handles: {}
placeholder_keys: {}
result: {}
schema:
description: cleanup的参数schema
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result: {}
required:
- goal
title: cleanup参数
type: object
type: UniLabJsonCommandAsync
measure_samples:
feedback: {}
goal:
concentrations: concentrations
goal_default:
concentrations: []
handles:
output:
- data_key: concentrations
data_source: executor
data_type: sample_list
handler_key: concentrations_out
label: 浓度列表
- data_key: absorbance
data_source: executor
data_type: sample_list
handler_key: absorbance_out
label: 吸光度列表
placeholder_keys: {}
result:
absorbance: absorbance
concentrations: concentrations
schema:
description: 模拟光度测量,入参出参等长
properties:
feedback:
properties: {}
required: []
title: MeasureSamples_Feedback
type: object
goal:
properties:
concentrations:
description: 样品浓度列表
items:
type: number
type: array
required:
- concentrations
title: MeasureSamples_Goal
type: object
result:
properties:
absorbance:
description: 吸光度列表(与浓度等长)
items:
type: number
type: array
concentrations:
description: 原始浓度列表
items:
type: number
type: array
required:
- concentrations
- absorbance
title: MeasureSamples_Result
type: object
required:
- goal
title: MeasureSamples
type: object
type: UniLabJsonCommandAsync
split_and_measure:
feedback: {}
goal:
split_count: split_count
volumes: volumes
goal_default:
split_count: 3
volumes: []
handles:
output:
- data_key: readings
data_source: executor
data_type: sample_list
handler_key: readings_out
label: 测量读数
- data_key: samples
data_source: executor
data_type: sample_index
handler_key: samples_out
label: 样品索引
- data_key: volumes
data_source: executor
data_type: sample_list
handler_key: volumes_out
label: 均分体积
placeholder_keys: {}
result:
readings: readings
samples: samples
volumes: volumes
schema:
description: 均分样品后逐份测量,输出带 samples 列标注归属
properties:
feedback:
properties: {}
required: []
title: SplitAndMeasure_Feedback
type: object
goal:
properties:
split_count:
description: 每个样品均分的份数
type: integer
volumes:
description: 样品体积列表
items:
type: number
type: array
required:
- volumes
title: SplitAndMeasure_Goal
type: object
result:
properties:
readings:
description: 测量读数
items:
type: number
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
volumes:
description: 均分后的体积列表
items:
type: number
type: array
required:
- volumes
- readings
- samples
title: SplitAndMeasure_Result
type: object
required:
- goal
title: SplitAndMeasure
type: object
type: UniLabJsonCommandAsync
module: unilabos.devices.virtual.virtual_sample_demo:VirtualSampleDemo
status_types:
status: str
type: python
config_info: []
description: Virtual sample tracking demo device
handles: []
icon: ''
init_param_schema:
config:
properties:
config:
type: string
device_id:
type: string
required: []
type: object
data:
properties:
status:
type: string
required:
- status
type: object
version: 1.0.0
virtual_separator:
category:
- virtual_device

View File

@@ -112,7 +112,7 @@ class Registry:
# 统一入口
# ------------------------------------------------------------------
def setup(self, devices_dirs=None, upload_registry=False, complete_registry=False):
def setup(self, devices_dirs=None, upload_registry=False, complete_registry=False, external_only=False):
"""统一构建注册表入口。"""
if self._setup_called:
logger.critical("[UniLab Registry] setup方法已被调用过不允许多次调用")
@@ -123,24 +123,27 @@ class Registry:
)
# 1. AST 静态扫描 (快速, 无需 import)
self._run_ast_scan(devices_dirs, upload_registry=upload_registry)
self._run_ast_scan(devices_dirs, upload_registry=upload_registry, external_only=external_only)
# 2. Host node 内置设备
self._setup_host_node()
# 3. YAML 注册表加载 (兼容旧格式)
self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths):
sys_path = path.parent
logger.trace(f"[UniLab Registry] Path {i+1}/{len(self.registry_paths)}: {sys_path}")
sys.path.append(str(sys_path))
self.load_device_types(path, complete_registry=complete_registry)
if BasicConfig.enable_resource_load:
self.load_resource_types(path, upload_registry, complete_registry=complete_registry)
else:
logger.warning(
"[UniLab Registry] 资源加载已禁用 (enable_resource_load=False),跳过资源注册表加载"
)
# 3. YAML 注册表加载 (兼容旧格式) — external_only 模式下跳过
if external_only:
logger.info("[UniLab Registry] external_only 模式: 跳过 YAML 注册表加载")
else:
self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths):
sys_path = path.parent
logger.trace(f"[UniLab Registry] Path {i+1}/{len(self.registry_paths)}: {sys_path}")
sys.path.append(str(sys_path))
self.load_device_types(path, complete_registry=complete_registry)
if BasicConfig.enable_resource_load:
self.load_resource_types(path, upload_registry, complete_registry=complete_registry)
else:
logger.warning(
"[UniLab Registry] 资源加载已禁用 (enable_resource_load=False),跳过资源注册表加载"
)
self._startup_executor.shutdown(wait=True)
self._startup_executor = None
self._setup_called = True
@@ -253,7 +256,7 @@ class Registry:
# AST 静态扫描
# ------------------------------------------------------------------
def _run_ast_scan(self, devices_dirs=None, upload_registry=False):
def _run_ast_scan(self, devices_dirs=None, upload_registry=False, external_only=False):
"""
执行 AST 静态扫描,从 Python 代码中提取 @device / @resource 装饰器元数据。
无需 import 任何驱动模块,速度极快。
@@ -298,16 +301,30 @@ class Registry:
extra_dirs.append(d_path)
# 主扫描
exclude_files = {"lab_resources.py"} if not BasicConfig.extra_resource else None
scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor,
exclude_files=exclude_files, cache=ast_cache,
)
if exclude_files:
logger.info(
f"[UniLab Registry] 排除扫描文件: {exclude_files} "
f"(可通过 --extra_resource 启用加载)"
if external_only:
core_files = [
pkg_root / "ros" / "nodes" / "presets" / "host_node.py",
pkg_root / "resources" / "container.py",
]
scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor,
cache=ast_cache, include_files=core_files,
)
logger.info(
f"[UniLab Registry] external_only 模式: 仅扫描核心文件 "
f"({', '.join(f.name for f in core_files)})"
)
else:
exclude_files = {"lab_resources.py"} if not BasicConfig.extra_resource else None
scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor,
exclude_files=exclude_files, cache=ast_cache,
)
if exclude_files:
logger.info(
f"[UniLab Registry] 排除扫描文件: {exclude_files} "
f"(可通过 --extra_resource 启用加载)"
)
# 合并缓存统计
total_stats = scan_result.pop("_cache_stats", {"hits": 0, "misses": 0, "total": 0})
@@ -1534,9 +1551,9 @@ class Registry:
del resource_info["config_info"]
if "file_path" in resource_info:
del resource_info["file_path"]
complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items())))
resource_info["registry_type"] = "resource"
resource_info["file_path"] = str(file.absolute()).replace("\\", "/")
complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items())))
for rid in skip_ids:
data.pop(rid, None)
@@ -2175,7 +2192,7 @@ class Registry:
lab_registry = Registry()
def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False):
def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False, external_only=False):
"""
构建或获取Registry单例实例
"""
@@ -2189,7 +2206,7 @@ def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False
if path not in current_paths:
lab_registry.registry_paths.append(path)
lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry)
lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry, external_only=external_only)
# 将 AST 扫描的字符串类型替换为实际 ROS2 消息类(仅查找 ROS2 类型,不 import 设备模块)
lab_registry.resolve_all_types()

View File

@@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Unio
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Point
from sensor_msgs.msg import JointState as JointStateMsg
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.service import Service
from typing_extensions import TypedDict
@@ -348,6 +349,10 @@ class HostNode(BaseROS2DeviceNode):
else:
self.lab_logger().warning(f"[Host Node] Device {device_id} already existed, skipping.")
self.update_device_status_subscriptions()
# 订阅 joint_state_repub topic桥接关节数据到云端
self._init_joint_state_bridge()
# TODO: 需要验证 初始化所有控制器节点
if controllers_config:
update_rate = controllers_config["controller_manager"]["ros__parameters"]["update_rate"]
@@ -782,6 +787,179 @@ class HostNode(BaseROS2DeviceNode):
else:
self.lab_logger().trace(f"Status updated: {device_id}.{property_name} = {msg.data}")
"""关节数据 & 资源跟随桥接"""
# 吞吐优化参数
_JOINT_DEAD_BAND: float = 1e-4 # 关节角度变化小于此值视为无变化
_JOINT_MIN_INTERVAL: float = 0.05 # 最小发送间隔 (秒),限制到 ~20Hz
def _init_joint_state_bridge(self):
"""
订阅 /joint_states (sensor_msgs/JointState) 和 resource_pose (String)
构建 device_id → uuid 映射,并维护 resource_poses 状态。
吞吐优化:
- 死区过滤 (dead band): 关节角度变化 < 阈值时不发送
- 抑频 (throttle): 限制最大发送频率,避免 ROS2 1kHz 打满 WS
- 增量 resource_poses: 仅在 resource_pose 实际变化时才附带发送
"""
# 构建 device_id → cloud_uuid 映射(从 devices_config 中获取)
self._device_uuid_map: Dict[str, str] = {}
for tree in self.devices_config.trees:
node = tree.root_node
if node.res_content.type == "device" and node.res_content.uuid:
self._device_uuid_map[node.res_content.id] = node.res_content.uuid
# 按 device_id 长度降序排列,最长前缀优先匹配(避免 arm 抢先匹配 arm_left_j1
self._device_ids_sorted = sorted(self._device_uuid_map.keys(), key=len, reverse=True)
# 资源挂载状态:{resource_id: parent_link_name}
self._resource_poses: Dict[str, str] = {}
# resource_pose 变化标志,仅在真正变化时随关节数据发送
self._resource_poses_dirty: bool = False
# 吞吐优化状态
self._last_joint_values: Dict[str, float] = {} # 上次发送的关节值(全局)
self._last_send_time: float = -float("inf") # 上次发送时间戳(初始为-inf确保首条通过
self._last_sent_resource_poses: Dict[str, str] = {} # 上次发送的 resource_poses 快照
if not self._device_uuid_map:
self.lab_logger().debug("[Host Node] 无设备 UUID 映射,跳过关节桥接")
return
# 直接订阅 /joint_statessensor_msgs/JointState无需经过 JointRepublisher
self.create_subscription(
JointStateMsg,
"/joint_states",
self._joint_state_callback,
10,
callback_group=self.callback_group,
)
# 订阅 resource_pose资源挂载变化由 ResourceMeshManager 发布)
from std_msgs.msg import String as StdString
self.create_subscription(
StdString,
"resource_pose",
self._resource_pose_callback,
10,
callback_group=self.callback_group,
)
self.lab_logger().info(
f"[Host Node] 已订阅 /joint_states 和 resource_pose设备映射: {list(self._device_uuid_map.keys())}"
)
def _resource_pose_callback(self, msg):
"""
接收 ResourceMeshManager 发布的资源挂载变更。
msg.data 格式: JSON dict{"tip_rack_A1": "gripper_link", "plate_1": "deck_link"}
空 dict {} 表示无变化(心跳包)。
"""
try:
data = json.loads(msg.data)
except (json.JSONDecodeError, ValueError):
return
if not isinstance(data, dict) or not data:
return
# 检测实际变化
has_change = False
for k, v in data.items():
if self._resource_poses.get(k) != v:
has_change = True
break
if has_change:
self._resource_poses.update(data)
self._resource_poses_dirty = True
def _joint_state_callback(self, msg: JointStateMsg):
"""
直接接收 /joint_states (sensor_msgs/JointState),按设备分组后通过 bridge 发送到云端。
吞吐优化:
1. 抑频: 距上次发送 < _JOINT_MIN_INTERVAL 则跳过(除非有 resource_pose 变化)
2. 死区: 所有关节角度变化 < _JOINT_DEAD_BAND 则跳过(除非有 resource_pose 变化)
3. 增量 resource_poses: 仅在 dirty 时附带,否则发空 dict
"""
names = list(msg.name)
positions = list(msg.position)
if not names or len(names) != len(positions):
return
now = time.time()
resource_dirty = self._resource_poses_dirty
# 抑频检查resource_pose 变化时强制发送
if not resource_dirty and (now - self._last_send_time) < self._JOINT_MIN_INTERVAL:
return
# 死区过滤:检测是否有关节值实质变化
has_significant_change = False
for name, pos in zip(names, positions):
last_val = self._last_joint_values.get(name)
if last_val is None or abs(float(pos) - last_val) >= self._JOINT_DEAD_BAND:
has_significant_change = True
break
# 无关节变化且无资源变化 → 跳过
if not has_significant_change and not resource_dirty:
return
# 更新上次发送的关节值
for name, pos in zip(names, positions):
self._last_joint_values[name] = float(pos)
self._last_send_time = now
# 按设备 ID 分组关节数据(最长前缀优先匹配)
device_joints: Dict[str, Dict[str, float]] = {}
for name, pos in zip(names, positions):
matched_device = None
for device_id in self._device_ids_sorted:
if name.startswith(device_id + "_"):
matched_device = device_id
break
if matched_device:
if matched_device not in device_joints:
device_joints[matched_device] = {}
device_joints[matched_device][name] = float(pos)
elif len(self._device_uuid_map) == 1:
fallback_id = self._device_ids_sorted[0]
if fallback_id not in device_joints:
device_joints[fallback_id] = {}
device_joints[fallback_id][name] = float(pos)
# 构建设备级 resource_poses仅在 dirty 时附带实际数据)
device_resource_poses: Dict[str, Dict[str, str]] = {}
if resource_dirty:
for resource_id, link_name in self._resource_poses.items():
matched_device = None
for device_id in self._device_ids_sorted:
if link_name.startswith(device_id + "_"):
matched_device = device_id
break
if matched_device:
if matched_device not in device_resource_poses:
device_resource_poses[matched_device] = {}
device_resource_poses[matched_device][resource_id] = link_name
elif len(self._device_uuid_map) == 1:
fallback_id = self._device_ids_sorted[0]
if fallback_id not in device_resource_poses:
device_resource_poses[fallback_id] = {}
device_resource_poses[fallback_id][resource_id] = link_name
self._resource_poses_dirty = False
# 通过 bridge 发送 push_joint_state含 resource_poses
for device_id, joint_states in device_joints.items():
node_uuid = self._device_uuid_map.get(device_id)
if not node_uuid:
continue
resource_poses = device_resource_poses.get(device_id, {})
for bridge in self.bridges:
if hasattr(bridge, "publish_joint_state"):
bridge.publish_joint_state(node_uuid, joint_states, resource_poses)
def send_goal(
self,
item: "QueueItem",

View File

@@ -41,7 +41,7 @@ class JointRepublisher(BaseROS2DeviceNode):
json_dict["velocity"] = list(msg.velocity)
json_dict["effort"] = list(msg.effort)
self.msg.data = str(json_dict)
self.msg.data = json.dumps(json_dict)
self.joint_repub.publish(self.msg)
# print('-'*20)
# print(self.msg.data)

View File

@@ -6,20 +6,180 @@
import argparse
import importlib
import locale
import shutil
import subprocess
import sys
from pathlib import Path
from typing import List, Optional
from unilabos.utils.banner_print import print_status
# ---------------------------------------------------------------------------
# 底层安装工具
# ---------------------------------------------------------------------------
def _is_chinese_locale() -> bool:
try:
lang = locale.getdefaultlocale()[0]
return bool(lang and ("zh" in lang.lower() or "chinese" in lang.lower()))
except Exception:
return False
_USE_UV: Optional[bool] = None
def _has_uv() -> bool:
global _USE_UV
if _USE_UV is None:
_USE_UV = shutil.which("uv") is not None
return _USE_UV
def _install_packages(
packages: List[str],
upgrade: bool = False,
label: str = "",
) -> bool:
"""
安装/升级一组包。优先 uv pip install回退 sys pip。
逐个安装,任意一个失败不影响后续包。
Returns:
True if all succeeded, False otherwise.
"""
if not packages:
return True
is_chinese = _is_chinese_locale()
use_uv = _has_uv()
failed: List[str] = []
for pkg in packages:
action_word = "升级" if upgrade else "安装"
if label:
print_status(f"[{label}] 正在{action_word} {pkg}...", "info")
else:
print_status(f"正在{action_word} {pkg}...", "info")
if use_uv:
cmd = ["uv", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["--index-url", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
else:
cmd = [sys.executable, "-m", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
installer = "uv" if use_uv else "pip"
print_status(f"{pkg} {action_word}成功 (via {installer})", "success")
else:
stderr_short = result.stderr.strip().split("\n")[-1] if result.stderr else "unknown error"
print_status(f"× {pkg} {action_word}失败: {stderr_short}", "error")
failed.append(pkg)
except subprocess.TimeoutExpired:
print_status(f"× {pkg} {action_word}超时 (300s)", "error")
failed.append(pkg)
except Exception as e:
print_status(f"× {pkg} {action_word}异常: {e}", "error")
failed.append(pkg)
if failed:
print_status(f"{len(failed)} 个包操作失败: {', '.join(failed)}", "error")
return False
return True
# ---------------------------------------------------------------------------
# requirements.txt 安装(可多次调用)
# ---------------------------------------------------------------------------
def install_requirements_txt(req_path: str | Path, label: str = "") -> bool:
"""
读取一个 requirements.txt 文件,检查缺失的包并安装。
Args:
req_path: requirements.txt 文件路径
label: 日志前缀标签(如 "device_package_sim"
Returns:
True if all ok, False if any install failed.
"""
req_path = Path(req_path)
if not req_path.exists():
return True
tag = label or req_path.parent.name
print_status(f"[{tag}] 检查依赖: {req_path}", "info")
reqs: List[str] = []
with open(req_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and not line.startswith("-"):
reqs.append(line)
if not reqs:
return True
missing: List[str] = []
for req in reqs:
pkg_import = req.split(">=")[0].split("==")[0].split("<")[0].split("[")[0].split(">")[0].strip()
pkg_import = pkg_import.replace("-", "_")
try:
importlib.import_module(pkg_import)
except ImportError:
missing.append(req)
if not missing:
print_status(f"[{tag}] ✓ 依赖检查通过 ({len(reqs)} 个包)", "success")
return True
print_status(f"[{tag}] 缺失 {len(missing)} 个依赖: {', '.join(missing)}", "warning")
return _install_packages(missing, label=tag)
def check_device_package_requirements(devices_dirs: list[str]) -> bool:
"""
检查 --devices 指定的所有外部设备包目录中的 requirements.txt。
对每个目录查找 requirements.txt先在目录内找再在父目录找
"""
if not devices_dirs:
return True
all_ok = True
for d in devices_dirs:
d_path = Path(d).resolve()
req_file = d_path / "requirements.txt"
if not req_file.exists():
req_file = d_path.parent / "requirements.txt"
if not req_file.exists():
continue
if not install_requirements_txt(req_file, label=d_path.name):
all_ok = False
return all_ok
# ---------------------------------------------------------------------------
# UniLabOS 核心环境检查
# ---------------------------------------------------------------------------
class EnvironmentChecker:
"""环境检查器"""
def __init__(self):
# 定义必需的包及其安装名称的映射
self.required_packages = {
# 包导入名 : pip安装名
# "pymodbus.framer.FramerType": "pymodbus==3.9.2",
"websockets": "websockets",
"msgcenterpy": "msgcenterpy",
"orjson": "orjson",
@@ -28,33 +188,17 @@ class EnvironmentChecker:
"crcmod": "crcmod-plus",
}
# 特殊安装包(需要特殊处理的包)
self.special_packages = {"pylabrobot": "git+https://github.com/Xuwznln/pylabrobot.git"}
# 包版本要求(包名: 最低版本)
self.version_requirements = {
"msgcenterpy": "0.1.8", # msgcenterpy 最低版本要求
"msgcenterpy": "0.1.8",
}
self.missing_packages = []
self.failed_installs = []
self.packages_need_upgrade = []
# 检测系统语言
self.is_chinese = self._is_chinese_locale()
def _is_chinese_locale(self) -> bool:
"""检测系统是否为中文环境"""
try:
lang = locale.getdefaultlocale()[0]
if lang and ("zh" in lang.lower() or "chinese" in lang.lower()):
return True
except Exception:
pass
return False
self.missing_packages: List[tuple] = []
self.failed_installs: List[tuple] = []
self.packages_need_upgrade: List[tuple] = []
def check_package_installed(self, package_name: str) -> bool:
"""检查包是否已安装"""
try:
importlib.import_module(package_name)
return True
@@ -62,7 +206,6 @@ class EnvironmentChecker:
return False
def get_package_version(self, package_name: str) -> str | None:
"""获取已安装包的版本"""
try:
module = importlib.import_module(package_name)
return getattr(module, "__version__", None)
@@ -70,88 +213,32 @@ class EnvironmentChecker:
return None
def compare_version(self, current: str, required: str) -> bool:
"""
比较版本号
Returns:
True: current >= required
False: current < required
"""
try:
current_parts = [int(x) for x in current.split(".")]
required_parts = [int(x) for x in required.split(".")]
# 补齐长度
max_len = max(len(current_parts), len(required_parts))
current_parts.extend([0] * (max_len - len(current_parts)))
required_parts.extend([0] * (max_len - len(required_parts)))
return current_parts >= required_parts
except Exception:
return True # 如果无法比较,假设版本满足要求
def install_package(self, package_name: str, pip_name: str, upgrade: bool = False) -> bool:
"""安装包"""
try:
action = "升级" if upgrade else "安装"
print_status(f"正在{action} {package_name} ({pip_name})...", "info")
# 构建安装命令
cmd = [sys.executable, "-m", "pip", "install"]
# 如果是升级操作,添加 --upgrade 参数
if upgrade:
cmd.append("--upgrade")
cmd.append(pip_name)
# 如果是中文环境,使用清华镜像源
if self.is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
# 执行安装
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) # 5分钟超时
if result.returncode == 0:
print_status(f"{package_name} {action}成功", "success")
return True
else:
print_status(f"× {package_name} {action}失败: {result.stderr}", "error")
return False
except subprocess.TimeoutExpired:
print_status(f"× {package_name} {action}超时", "error")
return False
except Exception as e:
print_status(f"× {package_name} {action}异常: {str(e)}", "error")
return False
def upgrade_package(self, package_name: str, pip_name: str) -> bool:
"""升级包"""
return self.install_package(package_name, pip_name, upgrade=True)
return True
def check_all_packages(self) -> bool:
"""检查所有必需的包"""
print_status("开始检查环境依赖...", "info")
# 检查常规包
for import_name, pip_name in self.required_packages.items():
if not self.check_package_installed(import_name):
self.missing_packages.append((import_name, pip_name))
else:
# 检查版本要求
if import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
elif import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
if current_version and not self.compare_version(current_version, required_version):
print_status(
f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})",
"warning",
)
self.packages_need_upgrade.append((import_name, pip_name))
if current_version:
if not self.compare_version(current_version, required_version):
print_status(
f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})",
"warning",
)
self.packages_need_upgrade.append((import_name, pip_name))
# 检查特殊包
for package_name, install_url in self.special_packages.items():
if not self.check_package_installed(package_name):
self.missing_packages.append((package_name, install_url))
@@ -170,7 +257,6 @@ class EnvironmentChecker:
return False
def install_missing_packages(self, auto_install: bool = True) -> bool:
"""安装缺失的包"""
if not self.missing_packages and not self.packages_need_upgrade:
return True
@@ -178,62 +264,36 @@ class EnvironmentChecker:
if self.missing_packages:
print_status("缺失以下包:", "warning")
for import_name, pip_name in self.missing_packages:
print_status(f" - {import_name} (pip install {pip_name})", "warning")
print_status(f" - {import_name} ({pip_name})", "warning")
if self.packages_need_upgrade:
print_status("需要升级以下包:", "warning")
for import_name, pip_name in self.packages_need_upgrade:
print_status(f" - {import_name} (pip install --upgrade {pip_name})", "warning")
print_status(f" - {import_name} ({pip_name})", "warning")
return False
# 安装缺失的包
if self.missing_packages:
print_status(f"开始自动安装 {len(self.missing_packages)} 个缺失的包...", "info")
pkgs = [pip_name for _, pip_name in self.missing_packages]
if not _install_packages(pkgs, label="unilabos"):
self.failed_installs.extend(self.missing_packages)
success_count = 0
for import_name, pip_name in self.missing_packages:
if self.install_package(import_name, pip_name):
success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功安装 {success_count}/{len(self.missing_packages)} 个包", "success")
# 升级需要更新的包
if self.packages_need_upgrade:
print_status(f"开始自动升级 {len(self.packages_need_upgrade)} 个包...", "info")
pkgs = [pip_name for _, pip_name in self.packages_need_upgrade]
if not _install_packages(pkgs, upgrade=True, label="unilabos"):
self.failed_installs.extend(self.packages_need_upgrade)
upgrade_success_count = 0
for import_name, pip_name in self.packages_need_upgrade:
if self.upgrade_package(import_name, pip_name):
upgrade_success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功升级 {upgrade_success_count}/{len(self.packages_need_upgrade)} 个包", "success")
if self.failed_installs:
print_status(f"{len(self.failed_installs)} 个包操作失败:", "error")
for import_name, pip_name in self.failed_installs:
print_status(f" - {import_name} ({pip_name})", "error")
return False
return True
return not self.failed_installs
def verify_installation(self) -> bool:
"""验证安装结果"""
if not self.missing_packages and not self.packages_need_upgrade:
return True
print_status("验证安装结果...", "info")
failed_verification = []
# 验证新安装的包
for import_name, pip_name in self.missing_packages:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
# 验证升级的包
for import_name, pip_name in self.packages_need_upgrade:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
@@ -270,17 +330,14 @@ def check_environment(auto_install: bool = True, show_details: bool = True) -> b
"""
checker = EnvironmentChecker()
# 检查包
if checker.check_all_packages():
return True
# 安装缺失的包
if not checker.install_missing_packages(auto_install):
if show_details:
print_status("请手动安装缺失的包后重新启动程序", "error")
return False
# 验证安装
if not checker.verify_installation():
if show_details:
print_status("安装验证失败,请检查网络连接或手动安装", "error")
@@ -290,14 +347,12 @@ def check_environment(auto_install: bool = True, show_details: bool = True) -> b
if __name__ == "__main__":
# 命令行参数解析
parser = argparse.ArgumentParser(description="UniLabOS 环境依赖检查工具")
parser.add_argument("--no-auto-install", action="store_true", help="仅检查环境,不自动安装缺失的包")
parser.add_argument("--silent", action="store_true", help="静默模式,不显示详细信息")
args = parser.parse_args()
# 执行环境检查
auto_install = not args.no_auto_install
show_details = not args.silent

View File

@@ -82,7 +82,7 @@ def get_result_info_str(error: str, suc: bool, return_value=None) -> str:
"""
samples = None
if isinstance(return_value, dict):
if "samples" in return_value:
if "samples" in return_value and type(return_value["samples"]) in [list, tuple] and type(return_value["samples"][0]) == dict:
samples = return_value.pop("samples")
result_info = {"error": error, "suc": suc, "return_value": return_value, "samples": samples}