Compare commits

...

2 Commits

Author SHA1 Message Date
Junhan Chang
d776550a4b add virtual_sample_demo 样品追踪测试设备 2026-03-23 16:43:20 +08:00
Xuwznln
3d8123849a add external devices param
fix registry upload missing type
2026-03-23 15:01:16 +08:00
10 changed files with 1451 additions and 174 deletions

View File

@@ -0,0 +1,160 @@
---
name: add-device
description: Guide for adding new devices to Uni-Lab-OS (接入新设备). Uses @device decorator + AST auto-scanning instead of manual YAML. Walks through device category, communication protocol, driver creation with decorators, and graph file setup. Use when the user wants to add/integrate a new device, create a device driver, write a device class, or mentions 接入设备/添加设备/设备驱动/物模型.
---
# 添加新设备到 Uni-Lab-OS
**第一步:** 使用 Read 工具读取 `docs/ai_guides/add_device.md`,获取完整的设备接入指南。
该指南包含设备类别(物模型)列表、通信协议模板、常见错误检查清单等。搜索 `unilabos/devices/` 获取已有设备的实现参考。
---
## 装饰器参考
### @device — 设备类装饰器
```python
from unilabos.registry.decorators import device
# 单设备
@device(
id="my_device.vendor", # 注册表唯一标识(必填)
category=["temperature"], # 分类标签列表(必填)
description="设备描述", # 设备描述
display_name="显示名称", # UI 显示名称(默认用 id
icon="DeviceIcon.webp", # 图标文件名
version="1.0.0", # 版本号
device_type="python", # "python" 或 "ros2"
handles=[...], # 端口列表InputHandle / OutputHandle
model={...}, # 3D 模型配置
hardware_interface=HardwareInterface(...), # 硬件通信接口
)
# 多设备(同一个类注册多个设备 ID各自有不同的 handles 等配置)
@device(
ids=["pump.vendor.model_A", "pump.vendor.model_B"],
id_meta={
"pump.vendor.model_A": {"handles": [...], "description": "型号 A"},
"pump.vendor.model_B": {"handles": [...], "description": "型号 B"},
},
category=["pump_and_valve"],
)
```
### @action — 动作方法装饰器
```python
from unilabos.registry.decorators import action
@action # 无参:注册为 UniLabJsonCommand 动作
@action() # 同上
@action(description="执行操作") # 带描述
@action(
action_type=HeatChill, # 指定 ROS Action 消息类型
goal={"temperature": "temp"}, # Goal 字段映射
feedback={}, # Feedback 字段映射
result={}, # Result 字段映射
handles=[...], # 动作级别端口
goal_default={"temp": 25.0}, # Goal 默认值
placeholder_keys={...}, # 参数占位符
always_free=True, # 不受排队限制
auto_prefix=True, # 强制使用 auto- 前缀
parent=True, # 从父类 MRO 获取参数签名
)
```
**自动识别规则:**
-`@action` 的公开方法 → 注册为动作(方法名即动作名)
- **不带 `@action` 的公开方法** → 自动注册为 `auto-{方法名}` 动作
- `_` 开头的方法 → 不扫描
- `@not_action` 标记的方法 → 排除
### @topic_config — 状态属性配置
```python
from unilabos.registry.decorators import topic_config
@property
@topic_config(
period=5.0, # 发布周期(秒),默认 5.0
print_publish=False, # 是否打印发布日志
qos=10, # QoS 深度,默认 10
name="custom_name", # 自定义发布名称(默认用属性名)
)
def temperature(self) -> float:
return self.data.get("temperature", 0.0)
```
### 辅助装饰器
```python
from unilabos.registry.decorators import not_action, always_free
@not_action # 标记为非动作post_init、辅助方法等
@always_free # 标记为不受排队限制(查询类操作)
```
---
## 设备模板
```python
import logging
from typing import Any, Dict, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.registry.decorators import device, action, topic_config, not_action
@device(id="my_device", category=["my_category"], description="设备描述")
class MyDevice:
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
self.device_id = device_id or "my_device"
self.config = config or {}
self.logger = logging.getLogger(f"MyDevice.{self.device_id}")
self.data: Dict[str, Any] = {"status": "Idle"}
@not_action
def post_init(self, ros_node: BaseROS2DeviceNode) -> None:
self._ros_node = ros_node
@action
async def initialize(self) -> bool:
self.data["status"] = "Ready"
return True
@action
async def cleanup(self) -> bool:
self.data["status"] = "Offline"
return True
@action(description="执行操作")
def my_action(self, param: float = 0.0, name: str = "") -> Dict[str, Any]:
"""带 @action 装饰器 → 注册为 'my_action' 动作"""
return {"success": True}
def get_info(self) -> Dict[str, Any]:
"""无 @action → 自动注册为 'auto-get_info' 动作"""
return {"device_id": self.device_id}
@property
@topic_config()
def status(self) -> str:
return self.data.get("status", "Idle")
@property
@topic_config(period=2.0)
def temperature(self) -> float:
return self.data.get("temperature", 0.0)
```
### 要点
- `_ros_node: BaseROS2DeviceNode` 类型标注放在类体顶部
- `__init__` 签名固定为 `(self, device_id=None, config=None, **kwargs)`
- `post_init``@not_action` 标记,参数类型标注为 `BaseROS2DeviceNode`
- 运行时状态存储在 `self.data` 字典中
- 设备文件放在 `unilabos/devices/<category>/` 目录下

View File

@@ -0,0 +1,351 @@
---
name: add-resource
description: Guide for adding new resources (materials, bottles, carriers, decks, warehouses) to Uni-Lab-OS (添加新物料/资源). Uses @resource decorator for AST auto-scanning. Covers Bottle, Carrier, Deck, WareHouse definitions. Use when the user wants to add resources, define materials, create a deck layout, add bottles/carriers/plates, or mentions 物料/资源/resource/bottle/carrier/deck/plate/warehouse.
---
# 添加新物料资源
Uni-Lab-OS 的资源体系基于 PyLabRobot通过扩展实现 Bottle、Carrier、WareHouse、Deck 等实验室物料管理。使用 `@resource` 装饰器注册AST 自动扫描生成注册表条目。
---
## 资源类型
| 类型 | 基类 | 用途 | 示例 |
|------|------|------|------|
| **Bottle** | `Well` (PyLabRobot) | 单个容器(瓶、小瓶、烧杯、反应器) | 试剂瓶、粉末瓶 |
| **BottleCarrier** | `ItemizedCarrier` | 多槽位载架(放多个 Bottle | 6 位试剂架、枪头盒 |
| **WareHouse** | `ItemizedCarrier` | 堆栈/仓库(放多个 Carrier | 4x4 堆栈 |
| **Deck** | `Deck` (PyLabRobot) | 工作站台面(放多个 WareHouse | 反应站 Deck |
**层级关系:** `Deck``WareHouse``BottleCarrier``Bottle`
WareHouse 本质上和 Site 是同一概念 — 都是定义一组固定的放置位slot只不过 WareHouse 多嵌套了一层 Deck。两者都需要开发者根据实际物理尺寸自行计算各 slot 的偏移坐标。
---
## @resource 装饰器
```python
from unilabos.registry.decorators import resource
@resource(
id="my_resource_id", # 注册表唯一标识(必填)
category=["bottles"], # 分类标签列表(必填)
description="资源描述",
icon="", # 图标
version="1.0.0",
handles=[...], # 端口列表InputHandle / OutputHandle
model={...}, # 3D 模型配置
class_type="pylabrobot", # "python" / "pylabrobot" / "unilabos"
)
```
---
## 创建规范
### 命名规则
1. **`name` 参数作为前缀**:所有工厂函数必须接受 `name: str` 参数,创建子物料时以 `name` 作为前缀,确保实例名在运行时全局唯一
2. **Bottle 命名约定**:试剂瓶-Bottle烧杯-Beaker烧瓶-Flask小瓶-Vial
3. **函数名 = `@resource(id=...)`**:工厂函数名与注册表 id 保持一致
### 子物料命名示例
```python
# Carrier 内部的 sites 用 name 前缀
for k, v in sites.items():
v.name = f"{name}_{v.name}" # "堆栈1左_A01", "堆栈1左_B02" ...
# Carrier 中放置 Bottle 时用 name 前缀
carrier[0] = My_Reagent_Bottle(f"{name}_flask_1") # "堆栈1左_flask_1"
carrier[i] = My_Solid_Vial(f"{name}_vial_{ordering[i]}") # "堆栈1左_vial_A1"
# create_homogeneous_resources 使用 name_prefix
sites=create_homogeneous_resources(
klass=ResourceHolder,
locations=[...],
name_prefix=name, # 自动生成 "{name}_0", "{name}_1" ...
)
# Deck setup 中用仓库名称作为 name 传入
self.warehouses = {
"堆栈1左": my_warehouse_4x4("堆栈1左"), # WareHouse.name = "堆栈1左"
"试剂堆栈": my_reagent_stack("试剂堆栈"), # WareHouse.name = "试剂堆栈"
}
```
### 其他规范
- **max_volume 单位为 μL**500mL = 500000
- **尺寸单位为 mm**`diameter`, `height`, `size_x/y/z`, `dx/dy/dz`
- **BottleCarrier 必须设置 `num_items_x/y/z`**:用于前端渲染布局
- **Deck 的 `__init__` 必须接受 `setup=False`**:图文件中 `config.setup=true` 触发 `setup()`
- **按项目分组文件**:同一工作站的资源放在 `unilabos/resources/<project>/`
- **`__init__` 必须接受 `serialize()` 输出的所有字段**`serialize()` 输出会作为 `config` 回传到 `__init__`,因此必须通过显式参数或 `**kwargs` 接受,否则反序列化会报错
- **持久化运行时状态用 `serialize_state()`**:通过 `_unilabos_state` 字典存储可变信息(如物料内容、液体量),只存 JSON 可序列化的基本类型
---
## 资源模板
### Bottle
```python
from unilabos.registry.decorators import resource
from unilabos.resources.itemized_carrier import Bottle
@resource(id="My_Reagent_Bottle", category=["bottles"], description="我的试剂瓶")
def My_Reagent_Bottle(
name: str,
diameter: float = 70.0,
height: float = 120.0,
max_volume: float = 500000.0,
barcode: str = None,
) -> Bottle:
return Bottle(
name=name,
diameter=diameter,
height=height,
max_volume=max_volume,
barcode=barcode,
model="My_Reagent_Bottle",
)
```
**Bottle 参数:**
- `name`: 实例名称(运行时唯一,由上层 Carrier 以前缀方式传入)
- `diameter`: 瓶体直径 (mm)
- `height`: 瓶体高度 (mm)
- `max_volume`: 最大容积(**μL**500mL = 500000
- `barcode`: 条形码(可选)
### BottleCarrier
```python
from pylabrobot.resources import ResourceHolder
from pylabrobot.resources.carrier import create_ordered_items_2d
from unilabos.resources.itemized_carrier import BottleCarrier
from unilabos.registry.decorators import resource
@resource(id="My_6SlotCarrier", category=["bottle_carriers"], description="六槽位载架")
def My_6SlotCarrier(name: str) -> BottleCarrier:
sites = create_ordered_items_2d(
klass=ResourceHolder,
num_items_x=3, num_items_y=2,
dx=10.0, dy=10.0, dz=5.0,
item_dx=42.0, item_dy=35.0,
size_x=20.0, size_y=20.0, size_z=50.0,
)
# 子 site 用 name 作为前缀
for k, v in sites.items():
v.name = f"{name}_{v.name}"
carrier = BottleCarrier(
name=name, size_x=146.0, size_y=80.0, size_z=55.0,
sites=sites, model="My_6SlotCarrier",
)
carrier.num_items_x = 3
carrier.num_items_y = 2
carrier.num_items_z = 1
# 放置 Bottle 时用 name 作为前缀
ordering = ["A1", "B1", "A2", "B2", "A3", "B3"]
for i in range(6):
carrier[i] = My_Reagent_Bottle(f"{name}_vial_{ordering[i]}")
return carrier
```
### WareHouse / Deck 放置位
WareHouse 和 Site 本质上是同一概念都是定义一组固定放置位slot根据物理尺寸自行批量计算偏移坐标。WareHouse 只是多嵌套了一层 Deck 而已。推荐开发者直接根据实物测量数据计算各 slot 偏移量。
#### WareHouse使用 warehouse_factory
```python
from unilabos.resources.warehouse import warehouse_factory
from unilabos.registry.decorators import resource
@resource(id="my_warehouse_4x4", category=["warehouse"], description="4x4 堆栈仓库")
def my_warehouse_4x4(name: str) -> "WareHouse":
return warehouse_factory(
name=name,
num_items_x=4, num_items_y=4, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0, # 第一个 slot 的起始偏移
item_dx=147.0, item_dy=106.0, item_dz=130.0, # slot 间距
resource_size_x=127.0, resource_size_y=85.0, resource_size_z=100.0, # slot 尺寸
model="my_warehouse_4x4",
col_offset=0, # 列标签起始偏移0 → A01, 4 → A05
layout="row-major", # "row-major" 行优先 / "col-major" 列优先 / "vertical-col-major" 竖向
)
```
`warehouse_factory` 参数说明:
- `dx/dy/dz`:第一个 slot 相对 WareHouse 原点的偏移mm
- `item_dx/item_dy/item_dz`:相邻 slot 间距mm需根据实际物理间距测量
- `resource_size_x/y/z`:每个 slot 的可放置区域尺寸
- `layout`:影响 slot 标签和坐标映射
- `"row-major"`A01,A02,...,B01,B02,...(行优先,适合横向排列)
- `"col-major"`A01,B01,...,A02,B02,...(列优先)
- `"vertical-col-major"`竖向排列y 坐标反向
#### Deck 组装 WareHouse
Deck 通过 `setup()` 将多个 WareHouse 放置到指定坐标:
```python
from pylabrobot.resources import Deck, Coordinate
from unilabos.registry.decorators import resource
@resource(id="MyStation_Deck", category=["deck"], description="我的工作站 Deck")
class MyStation_Deck(Deck):
def __init__(self, name="MyStation_Deck", size_x=2700.0, size_y=1080.0, size_z=1500.0,
category="deck", setup=False, **kwargs) -> None:
super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z)
if setup:
self.setup()
def setup(self) -> None:
self.warehouses = {
"堆栈1左": my_warehouse_4x4("堆栈1左"),
"堆栈1右": my_warehouse_4x4("堆栈1右"),
}
self.warehouse_locations = {
"堆栈1左": Coordinate(-200.0, 400.0, 0.0), # 自行测量计算
"堆栈1右": Coordinate(2350.0, 400.0, 0.0),
}
for wh_name, wh in self.warehouses.items():
self.assign_child_resource(wh, location=self.warehouse_locations[wh_name])
```
#### Site 模式(前端定向放置)
适用于有固定孔位/槽位的设备(如移液站 PRCXI 9300Deck 通过 `sites` 列表定义前端展示的放置位,前端据此渲染可拖拽的孔位布局:
```python
import collections
from typing import Any, Dict, List, Optional
from pylabrobot.resources import Deck, Resource, Coordinate
from unilabos.registry.decorators import resource
@resource(id="MyLabDeck", category=["deck"], description="带 Site 定向放置的 Deck")
class MyLabDeck(Deck):
# 根据设备台面实测批量计算各 slot 坐标偏移
_DEFAULT_SITE_POSITIONS = [
(0, 0, 0), (138, 0, 0), (276, 0, 0), (414, 0, 0), # T1-T4
(0, 96, 0), (138, 96, 0), (276, 96, 0), (414, 96, 0), # T5-T8
]
_DEFAULT_SITE_SIZE = {"width": 128.0, "height": 86.0, "depth": 0}
_DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "tube_rack", "adaptor"]
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
sites: Optional[List[Dict[str, Any]]] = None, **kwargs):
super().__init__(size_x, size_y, size_z, name)
if sites is not None:
self.sites = [dict(s) for s in sites]
else:
self.sites = []
for i, (x, y, z) in enumerate(self._DEFAULT_SITE_POSITIONS):
self.sites.append({
"label": f"T{i + 1}", # 前端显示的槽位标签
"visible": True, # 是否在前端可见
"position": {"x": x, "y": y, "z": z}, # 槽位物理坐标
"size": dict(self._DEFAULT_SITE_SIZE), # 槽位尺寸
"content_type": list(self._DEFAULT_CONTENT_TYPE), # 允许放入的物料类型
})
self._ordering = collections.OrderedDict(
(site["label"], None) for site in self.sites
)
def assign_child_resource(self, resource: Resource,
location: Optional[Coordinate] = None,
reassign: bool = True,
spot: Optional[int] = None):
idx = spot
if spot is None:
for i, site in enumerate(self.sites):
if site.get("label") == resource.name:
idx = i
break
if idx is None:
for i in range(len(self.sites)):
if self._get_site_resource(i) is None:
idx = i
break
if idx is None:
raise ValueError(f"No available site for '{resource.name}'")
loc = Coordinate(**self.sites[idx]["position"])
super().assign_child_resource(resource, location=loc, reassign=reassign)
def serialize(self) -> dict:
data = super().serialize()
sites_out = []
for i, site in enumerate(self.sites):
occupied = self._get_site_resource(i)
sites_out.append({
"label": site["label"],
"visible": site.get("visible", True),
"occupied_by": occupied.name if occupied else None,
"position": site["position"],
"size": site["size"],
"content_type": site["content_type"],
})
data["sites"] = sites_out
return data
```
**Site 字段说明:**
| 字段 | 类型 | 说明 |
|------|------|------|
| `label` | str | 槽位标签(如 `"T1"`),前端显示名称,也用于匹配 resource.name |
| `visible` | bool | 是否在前端可见 |
| `position` | dict | 物理坐标 `{x, y, z}`mm需自行测量计算偏移 |
| `size` | dict | 槽位尺寸 `{width, height, depth}`mm |
| `content_type` | list | 允许放入的物料类型,如 `["plate", "tip_rack", "tube_rack", "adaptor"]` |
**参考实现:** `unilabos/devices/liquid_handling/prcxi/prcxi.py` 中的 `PRCXI9300Deck`4x4 共 16 个 site
---
## 文件位置
```
unilabos/resources/
├── <project>/ # 按项目分组
│ ├── bottles.py # Bottle 工厂函数
│ ├── bottle_carriers.py # Carrier 工厂函数
│ ├── warehouses.py # WareHouse 工厂函数
│ └── decks.py # Deck 类定义
```
---
## 验证
```bash
# 资源可导入
python -c "from unilabos.resources.my_project.bottles import My_Reagent_Bottle; print(My_Reagent_Bottle('test'))"
# 启动测试AST 自动扫描)
unilab -g <graph>.json
```
仅在以下情况仍需 YAML第三方库资源如 pylabrobot 内置资源,无 `@resource` 装饰器)。
---
## 关键路径
| 内容 | 路径 |
|------|------|
| Bottle/Carrier 基类 | `unilabos/resources/itemized_carrier.py` |
| WareHouse 基类 + 工厂 | `unilabos/resources/warehouse.py` |
| PLR 注册 | `unilabos/resources/plr_additional_res_reg.py` |
| 装饰器定义 | `unilabos/registry/decorators.py` |

View File

@@ -0,0 +1,292 @@
# 资源高级参考
本文件是 SKILL.md 的补充,包含类继承体系、序列化/反序列化、Bioyond 物料同步、非瓶类资源和仓库工厂模式。Agent 在需要实现这些功能时按需阅读。
---
## 1. 类继承体系
```
PyLabRobot
├── Resource (PLR 基类)
│ ├── Well
│ │ └── Bottle (unilabos) → 瓶/小瓶/烧杯/反应器
│ ├── Deck
│ │ └── 自定义 Deck 类 (unilabos) → 工作站台面
│ ├── ResourceHolder → 槽位占位符
│ └── Container
│ └── Battery (unilabos) → 组装好的电池
├── ItemizedCarrier (unilabos, 继承 Resource)
│ ├── BottleCarrier (unilabos) → 瓶载架
│ └── WareHouse (unilabos) → 堆栈仓库
├── ItemizedResource (PLR)
│ └── MagazineHolder (unilabos) → 子弹夹载架
└── ResourceStack (PLR)
└── Magazine (unilabos) → 子弹夹洞位
```
### Bottle 类细节
```python
class Bottle(Well):
def __init__(self, name, diameter, height, max_volume,
size_x=0.0, size_y=0.0, size_z=0.0,
barcode=None, category="container", model=None, **kwargs):
super().__init__(
name=name,
size_x=diameter, # PLR 用 diameter 作为 size_x/size_y
size_y=diameter,
size_z=height, # PLR 用 height 作为 size_z
max_volume=max_volume,
category=category,
model=model,
bottom_type="flat",
cross_section_type="circle"
)
```
注意 `size_x = size_y = diameter``size_z = height`
### ItemizedCarrier 核心方法
| 方法 | 说明 |
|------|------|
| `__getitem__(identifier)` | 通过索引或 Excel 标识(如 `"A01"`)访问槽位 |
| `__setitem__(identifier, resource)` | 向槽位放入资源 |
| `get_child_identifier(child)` | 获取子资源的标识符 |
| `capacity` | 总槽位数 |
| `sites` | 所有槽位字典 |
---
## 2. 序列化与反序列化
### PLR ↔ UniLab 转换
| 函数 | 位置 | 方向 |
|------|------|------|
| `ResourceTreeSet.from_plr_resources(resources)` | `resource_tracker.py` | PLR → UniLab |
| `ResourceTreeSet.to_plr_resources()` | `resource_tracker.py` | UniLab → PLR |
### `from_plr_resources` 流程
```
PLR Resource
↓ build_uuid_mapping (递归生成 UUID)
↓ resource.serialize() → dict
↓ resource.serialize_all_state() → states
↓ resource_plr_inner (递归构建 ResourceDictInstance)
ResourceTreeSet
```
关键:每个 PLR 资源通过 `unilabos_uuid` 属性携带 UUID`unilabos_extra` 携带扩展数据(如 `class` 名)。
### `to_plr_resources` 流程
```
ResourceTreeSet
↓ collect_node_data (收集 UUID、状态、扩展数据)
↓ node_to_plr_dict (转为 PLR 字典格式)
↓ find_subclass(type_name, PLRResource) (查找 PLR 子类)
↓ sub_cls.deserialize(plr_dict) (反序列化)
↓ loop_set_uuid, loop_set_extra (递归设置 UUID 和扩展)
PLR Resource
```
### Bottle 序列化
```python
class Bottle(Well):
def serialize(self) -> dict:
data = super().serialize()
return {**data, "diameter": self.diameter, "height": self.height}
@classmethod
def deserialize(cls, data: dict, allow_marshal=False):
barcode_data = data.pop("barcode", None)
instance = super().deserialize(data, allow_marshal=allow_marshal)
if barcode_data and isinstance(barcode_data, str):
instance.barcode = barcode_data
return instance
```
---
## 3. Bioyond 物料同步
### 双向转换函数
| 函数 | 位置 | 方向 |
|------|------|------|
| `resource_bioyond_to_plr(materials, type_mapping, deck)` | `graphio.py` | Bioyond → PLR |
| `resource_plr_to_bioyond(resources, type_mapping, warehouse_mapping)` | `graphio.py` | PLR → Bioyond |
### `resource_bioyond_to_plr` 流程
```
Bioyond 物料列表
↓ reverse_type_mapping: {typeName → (model, UUID)}
↓ 对每个物料:
typeName → 查映射 → model (如 "BIOYOND_PolymerStation_Reactor")
initialize_resource({"name": unique_name, "class": model})
↓ 设置 unilabos_extra (material_bioyond_id, material_bioyond_name 等)
↓ 处理 detail (子物料/坐标)
↓ 按 locationName 放入 deck.warehouses 对应槽位
PLR 资源列表
```
### `resource_plr_to_bioyond` 流程
```
PLR 资源列表
↓ 遍历每个资源:
载架(capacity > 1): 生成 details 子物料 + 坐标
单瓶: 直接映射
↓ type_mapping 查找 typeId
↓ warehouse_mapping 查找位置 UUID
↓ 组装 Bioyond 格式 (name, typeName, typeId, quantity, Parameters, locations)
Bioyond 物料列表
```
### BioyondResourceSynchronizer
工作站通过 `ResourceSynchronizer` 自动同步物料:
```python
class BioyondResourceSynchronizer(ResourceSynchronizer):
def sync_from_external(self) -> bool:
all_data = []
all_data.extend(api_client.stock_material('{"typeMode": 0}')) # 耗材
all_data.extend(api_client.stock_material('{"typeMode": 1}')) # 样品
all_data.extend(api_client.stock_material('{"typeMode": 2}')) # 试剂
unilab_resources = resource_bioyond_to_plr(
all_data,
type_mapping=self.workstation.bioyond_config["material_type_mappings"],
deck=self.workstation.deck
)
# 更新 deck 上的资源
```
---
## 4. 非瓶类资源
### ElectrodeSheet极片
路径:`unilabos/resources/battery/electrode_sheet.py`
```python
class ElectrodeSheet(ResourcePLR):
"""片状材料(极片、隔膜、弹片、垫片等)"""
_unilabos_state = {
"diameter": 0.0,
"thickness": 0.0,
"mass": 0.0,
"material_type": "",
"color": "",
"info": "",
}
```
工厂函数:`PositiveCan`, `PositiveElectrode`, `NegativeCan`, `NegativeElectrode`, `SpringWasher`, `FlatWasher`, `AluminumFoil`
### Battery电池
```python
class Battery(Container):
"""组装好的电池"""
_unilabos_state = {
"color": "",
"electrolyte_name": "",
"open_circuit_voltage": 0.0,
}
```
### Magazine / MagazineHolder子弹夹
```python
class Magazine(ResourceStack):
"""子弹夹洞位,可堆叠 ElectrodeSheet"""
# direction, max_sheets
class MagazineHolder(ItemizedResource):
"""多洞位子弹夹"""
# hole_diameter, hole_depth, max_sheets_per_hole
```
工厂函数 `magazine_factory()``create_homogeneous_resources` 生成洞位,可选预填 `ElectrodeSheet``Battery`
---
## 5. 仓库工厂模式参考
### 实际 warehouse 工厂函数示例
```python
# 行优先 4x4 仓库
def bioyond_warehouse_1x4x4(name: str) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=4, num_items_y=4, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=147.0, item_dy=106.0, item_dz=130.0,
layout="row-major", # A01,A02,A03,A04, B01,...
)
# 右侧 4x4 仓库(列名偏移)
def bioyond_warehouse_1x4x4_right(name: str) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=4, num_items_y=4, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=147.0, item_dy=106.0, item_dz=130.0,
col_offset=4, # A05,A06,A07,A08
layout="row-major",
)
# 竖向仓库(站内试剂存放)
def bioyond_warehouse_reagent_storage(name: str) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=1, num_items_y=2, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=147.0, item_dy=106.0, item_dz=130.0,
layout="vertical-col-major",
)
# 行偏移F 行开始)
def bioyond_warehouse_5x3x1(name: str, row_offset: int = 0) -> WareHouse:
return warehouse_factory(
name=name,
num_items_x=3, num_items_y=5, num_items_z=1,
dx=10.0, dy=10.0, dz=10.0,
item_dx=159.0, item_dy=183.0, item_dz=130.0,
row_offset=row_offset, # 0→A行起5→F行起
layout="row-major",
)
```
### layout 类型说明
| layout | 命名顺序 | 适用场景 |
|--------|---------|---------|
| `col-major` (默认) | A01,B01,C01,D01, A02,B02,... | 列优先,标准堆栈 |
| `row-major` | A01,A02,A03,A04, B01,B02,... | 行优先Bioyond 前端展示 |
| `vertical-col-major` | 竖向排列,标签从底部开始 | 竖向仓库(试剂存放、测密度) |
---
## 6. 关键路径
| 内容 | 路径 |
|------|------|
| Bottle/Carrier 基类 | `unilabos/resources/itemized_carrier.py` |
| WareHouse 类 + 工厂 | `unilabos/resources/warehouse.py` |
| ResourceTreeSet 转换 | `unilabos/resources/resource_tracker.py` |
| Bioyond 物料转换 | `unilabos/resources/graphio.py` |
| Bioyond 仓库定义 | `unilabos/resources/bioyond/warehouses.py` |
| 电池资源 | `unilabos/resources/battery/` |
| PLR 注册 | `unilabos/resources/plr_additional_res_reg.py` |

View File

@@ -264,6 +264,12 @@ def parse_args():
default=False, default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware", help="Test mode: all actions simulate execution and return mock results without running real hardware",
) )
parser.add_argument(
"--external_devices_only",
action="store_true",
default=False,
help="Only load external device packages (--devices), skip built-in unilabos/devices/ scanning and YAML device registry",
)
parser.add_argument( parser.add_argument(
"--extra_resource", "--extra_resource",
action="store_true", action="store_true",
@@ -342,11 +348,18 @@ def main():
check_mode = args_dict.get("check_mode", False) check_mode = args_dict.get("check_mode", False)
if not skip_env_check: if not skip_env_check:
from unilabos.utils.environment_check import check_environment from unilabos.utils.environment_check import check_environment, check_device_package_requirements
if not check_environment(auto_install=True): if not check_environment(auto_install=True):
print_status("环境检查失败,程序退出", "error") print_status("环境检查失败,程序退出", "error")
os._exit(1) os._exit(1)
# 第一次设备包依赖检查build_registry 之前,确保 import map 可用
devices_dirs_for_req = args_dict.get("devices", None)
if devices_dirs_for_req:
if not check_device_package_requirements(devices_dirs_for_req):
print_status("设备包依赖检查失败,程序退出", "error")
os._exit(1)
else: else:
print_status("跳过环境依赖检查", "warning") print_status("跳过环境依赖检查", "warning")
@@ -477,19 +490,7 @@ def main():
BasicConfig.vis_2d_enable = args_dict["2d_vis"] BasicConfig.vis_2d_enable = args_dict["2d_vis"]
BasicConfig.check_mode = check_mode BasicConfig.check_mode = check_mode
from unilabos.resources.graphio import (
read_node_link_json,
read_graphml,
dict_from_graph,
)
from unilabos.app.communication import get_communication_client
from unilabos.registry.registry import build_registry from unilabos.registry.registry import build_registry
from unilabos.app.backend import start_backend
from unilabos.app.web import http_client
from unilabos.app.web import start_server
from unilabos.app.register import register_devices_and_resources
from unilabos.resources.graphio import modify_to_backend_format
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
# 显示启动横幅 # 显示启动横幅
print_unilab_banner(args_dict) print_unilab_banner(args_dict)
@@ -498,12 +499,14 @@ def main():
# check_mode 和 upload_registry 都会执行实际 import 验证 # check_mode 和 upload_registry 都会执行实际 import 验证
devices_dirs = args_dict.get("devices", None) devices_dirs = args_dict.get("devices", None)
complete_registry = args_dict.get("complete_registry", False) or check_mode complete_registry = args_dict.get("complete_registry", False) or check_mode
external_only = args_dict.get("external_devices_only", False)
lab_registry = build_registry( lab_registry = build_registry(
registry_paths=args_dict["registry_path"], registry_paths=args_dict["registry_path"],
devices_dirs=devices_dirs, devices_dirs=devices_dirs,
upload_registry=BasicConfig.upload_registry, upload_registry=BasicConfig.upload_registry,
check_mode=check_mode, check_mode=check_mode,
complete_registry=complete_registry, complete_registry=complete_registry,
external_only=external_only,
) )
# Check mode: 注册表验证完成后直接退出 # Check mode: 注册表验证完成后直接退出
@@ -513,6 +516,20 @@ def main():
print_status(f"Check mode: 注册表验证完成 ({device_count} 设备, {resource_count} 资源),退出", "info") print_status(f"Check mode: 注册表验证完成 ({device_count} 设备, {resource_count} 资源),退出", "info")
os._exit(0) os._exit(0)
# 以下导入依赖 ROS2 环境check_mode 已退出不需要
from unilabos.resources.graphio import (
read_node_link_json,
read_graphml,
dict_from_graph,
modify_to_backend_format,
)
from unilabos.app.communication import get_communication_client
from unilabos.app.backend import start_backend
from unilabos.app.web import http_client
from unilabos.app.web import start_server
from unilabos.app.register import register_devices_and_resources
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
# Step 1: 上传全部注册表到服务端,同步保存到 unilabos_data # Step 1: 上传全部注册表到服务端,同步保存到 unilabos_data
if BasicConfig.upload_registry: if BasicConfig.upload_registry:
if BasicConfig.ak and BasicConfig.sk: if BasicConfig.ak and BasicConfig.sk:
@@ -610,6 +627,10 @@ def main():
resource_tree_set.merge_remote_resources(remote_tree_set) resource_tree_set.merge_remote_resources(remote_tree_set)
print_status("远端物料同步完成", "info") print_status("远端物料同步完成", "info")
# 第二次设备包依赖检查云端物料同步后community 包可能引入新的 requirements
# TODO: 当 community device package 功能上线后,在这里调用
# install_requirements_txt(community_pkg_path / "requirements.txt", label="community.xxx")
# 使用 ResourceTreeSet 代替 list # 使用 ResourceTreeSet 代替 list
args_dict["resources_config"] = resource_tree_set args_dict["resources_config"] = resource_tree_set
args_dict["devices_config"] = resource_tree_set args_dict["devices_config"] = resource_tree_set

View File

@@ -0,0 +1,88 @@
"""虚拟样品演示设备 — 用于前端 sample tracking 功能的极简 demo"""
import asyncio
import logging
import random
import time
from typing import Any, Dict, List, Optional
class VirtualSampleDemo:
"""虚拟样品追踪演示设备,提供两种典型返回模式:
- measure_samples: 等长输入输出 (前端按 index 自动对齐)
- split_and_measure: 输出比输入长,附带 samples 列标注归属
"""
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
if device_id is None and "id" in kwargs:
device_id = kwargs.pop("id")
if config is None and "config" in kwargs:
config = kwargs.pop("config")
self.device_id = device_id or "unknown_sample_demo"
self.config = config or {}
self.logger = logging.getLogger(f"VirtualSampleDemo.{self.device_id}")
self.data: Dict[str, Any] = {"status": "Idle"}
# ------------------------------------------------------------------
# Action 1: 等长输入输出,无 samples 列
# ------------------------------------------------------------------
async def measure_samples(self, concentrations: List[float]) -> Dict[str, Any]:
"""模拟光度测量。absorbance = concentration * 0.05 + noise
入参和出参 list 长度相等,前端按 index 自动对齐。
"""
self.logger.info(f"measure_samples: concentrations={concentrations}")
absorbance = [round(c * 0.05 + random.gauss(0, 0.005), 4) for c in concentrations]
return {"concentrations": concentrations, "absorbance": absorbance}
# ------------------------------------------------------------------
# Action 2: 输出比输入长,带 samples 列
# ------------------------------------------------------------------
async def split_and_measure(self, volumes: List[float], split_count: int = 3) -> Dict[str, Any]:
"""将每个样品均分为 split_count 份后逐份测量。
返回的 list 长度 = len(volumes) * split_count
附带 samples 列标注每行属于第几个输入样品 (0-based index)。
"""
self.logger.info(f"split_and_measure: volumes={volumes}, split_count={split_count}")
out_volumes: List[float] = []
readings: List[float] = []
samples: List[int] = []
for idx, vol in enumerate(volumes):
split_vol = round(vol / split_count, 2)
for _ in range(split_count):
out_volumes.append(split_vol)
readings.append(round(random.uniform(0.1, 1.0), 4))
samples.append(idx)
return {"volumes": out_volumes, "readings": readings, "samples": samples}
# ------------------------------------------------------------------
# Action 3: 入参和出参都带 samples 列(不等长)
# ------------------------------------------------------------------
async def analyze_readings(self, readings: List[float], samples: List[int]) -> Dict[str, Any]:
"""对 split_and_measure 的输出做二次分析。
入参 readings/samples 长度相同但 > 原始样品数,
出参同样带 samples 列,长度与入参一致。
"""
self.logger.info(f"analyze_readings: readings={readings}, samples={samples}")
scores: List[float] = []
passed: List[bool] = []
threshold = 0.4
for r in readings:
score = round(r * 100 + random.gauss(0, 2), 2)
scores.append(score)
passed.append(r >= threshold)
return {"scores": scores, "passed": passed, "samples": samples}
# ------------------------------------------------------------------
# 状态属性
# ------------------------------------------------------------------
@property
def status(self) -> str:
return self.data.get("status", "Idle")

View File

@@ -139,6 +139,7 @@ def scan_directory(
executor: ThreadPoolExecutor = None, executor: ThreadPoolExecutor = None,
exclude_files: Optional[set] = None, exclude_files: Optional[set] = None,
cache: Optional[Dict[str, Any]] = None, cache: Optional[Dict[str, Any]] = None,
include_files: Optional[List[Union[str, Path]]] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Recursively scan .py files under *root_dir* for @device and @resource Recursively scan .py files under *root_dir* for @device and @resource
@@ -164,6 +165,7 @@ def scan_directory(
exclude_files: 要排除的文件名集合 (如 {"lab_resources.py"}) exclude_files: 要排除的文件名集合 (如 {"lab_resources.py"})
cache: Mutable cache dict (``load_scan_cache()`` result). Hits are read cache: Mutable cache dict (``load_scan_cache()`` result). Hits are read
from here; misses are written back so the caller can persist later. from here; misses are written back so the caller can persist later.
include_files: 指定扫描的文件列表,提供时跳过目录递归收集,直接扫描这些文件。
""" """
if executor is None: if executor is None:
raise ValueError("executor is required and must not be None") raise ValueError("executor is required and must not be None")
@@ -175,6 +177,9 @@ def scan_directory(
python_path = Path(python_path).resolve() python_path = Path(python_path).resolve()
# --- Collect files (depth/count limited) --- # --- Collect files (depth/count limited) ---
if include_files is not None:
py_files = [Path(f).resolve() for f in include_files if Path(f).resolve().exists()]
else:
py_files = _collect_py_files(root_dir, max_depth=max_depth, max_files=max_files, exclude_files=exclude_files) py_files = _collect_py_files(root_dir, max_depth=max_depth, max_files=max_files, exclude_files=exclude_files)
cache_files: Dict[str, Any] = cache.get("files", {}) if cache else {} cache_files: Dict[str, Any] = cache.get("files", {}) if cache else {}

View File

@@ -2804,6 +2804,294 @@ virtual_rotavap:
- vacuum_pressure - vacuum_pressure
type: object type: object
version: 1.0.0 version: 1.0.0
virtual_sample_demo:
category:
- virtual_device
class:
action_value_mappings:
analyze_readings:
feedback: {}
goal:
readings: readings
samples: samples
goal_default:
readings: []
samples: []
handles:
input:
- data_key: readings
data_source: handle
data_type: sample_list
handler_key: readings_in
label: 测量读数
- data_key: samples
data_source: handle
data_type: sample_index
handler_key: samples_in
label: 样品索引
output:
- data_key: scores
data_source: executor
data_type: sample_list
handler_key: scores_out
label: 分析得分
- data_key: passed
data_source: executor
data_type: sample_list
handler_key: passed_out
label: 是否通过
- data_key: samples
data_source: executor
data_type: sample_index
handler_key: samples_result_out
label: 样品索引
placeholder_keys: {}
result:
passed: passed
samples: samples
scores: scores
schema:
description: 对 split_and_measure 输出做二次分析,入参和出参都带 samples 列
properties:
feedback:
properties: {}
required: []
title: AnalyzeReadings_Feedback
type: object
goal:
properties:
readings:
description: 测量读数(来自 split_and_measure
items:
type: number
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
required:
- readings
- samples
title: AnalyzeReadings_Goal
type: object
result:
properties:
passed:
description: 是否通过阈值
items:
type: boolean
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
scores:
description: 分析得分
items:
type: number
type: array
required:
- scores
- passed
- samples
title: AnalyzeReadings_Result
type: object
required:
- goal
title: AnalyzeReadings
type: object
type: UniLabJsonCommandAsync
auto-cleanup:
feedback: {}
goal: {}
goal_default: {}
handles: {}
placeholder_keys: {}
result: {}
schema:
description: cleanup的参数schema
properties:
feedback: {}
goal:
properties: {}
required: []
type: object
result: {}
required:
- goal
title: cleanup参数
type: object
type: UniLabJsonCommandAsync
measure_samples:
feedback: {}
goal:
concentrations: concentrations
goal_default:
concentrations: []
handles:
output:
- data_key: concentrations
data_source: executor
data_type: sample_list
handler_key: concentrations_out
label: 浓度列表
- data_key: absorbance
data_source: executor
data_type: sample_list
handler_key: absorbance_out
label: 吸光度列表
placeholder_keys: {}
result:
absorbance: absorbance
concentrations: concentrations
schema:
description: 模拟光度测量,入参出参等长
properties:
feedback:
properties: {}
required: []
title: MeasureSamples_Feedback
type: object
goal:
properties:
concentrations:
description: 样品浓度列表
items:
type: number
type: array
required:
- concentrations
title: MeasureSamples_Goal
type: object
result:
properties:
absorbance:
description: 吸光度列表(与浓度等长)
items:
type: number
type: array
concentrations:
description: 原始浓度列表
items:
type: number
type: array
required:
- concentrations
- absorbance
title: MeasureSamples_Result
type: object
required:
- goal
title: MeasureSamples
type: object
type: UniLabJsonCommandAsync
split_and_measure:
feedback: {}
goal:
split_count: split_count
volumes: volumes
goal_default:
split_count: 3
volumes: []
handles:
output:
- data_key: readings
data_source: executor
data_type: sample_list
handler_key: readings_out
label: 测量读数
- data_key: samples
data_source: executor
data_type: sample_index
handler_key: samples_out
label: 样品索引
- data_key: volumes
data_source: executor
data_type: sample_list
handler_key: volumes_out
label: 均分体积
placeholder_keys: {}
result:
readings: readings
samples: samples
volumes: volumes
schema:
description: 均分样品后逐份测量,输出带 samples 列标注归属
properties:
feedback:
properties: {}
required: []
title: SplitAndMeasure_Feedback
type: object
goal:
properties:
split_count:
description: 每个样品均分的份数
type: integer
volumes:
description: 样品体积列表
items:
type: number
type: array
required:
- volumes
title: SplitAndMeasure_Goal
type: object
result:
properties:
readings:
description: 测量读数
items:
type: number
type: array
samples:
description: 每行归属的输入样品 index (0-based)
items:
type: integer
type: array
volumes:
description: 均分后的体积列表
items:
type: number
type: array
required:
- volumes
- readings
- samples
title: SplitAndMeasure_Result
type: object
required:
- goal
title: SplitAndMeasure
type: object
type: UniLabJsonCommandAsync
module: unilabos.devices.virtual.virtual_sample_demo:VirtualSampleDemo
status_types:
status: str
type: python
config_info: []
description: Virtual sample tracking demo device
handles: []
icon: ''
init_param_schema:
config:
properties:
config:
type: string
device_id:
type: string
required: []
type: object
data:
properties:
status:
type: string
required:
- status
type: object
version: 1.0.0
virtual_separator: virtual_separator:
category: category:
- virtual_device - virtual_device

View File

@@ -112,7 +112,7 @@ class Registry:
# 统一入口 # 统一入口
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def setup(self, devices_dirs=None, upload_registry=False, complete_registry=False): def setup(self, devices_dirs=None, upload_registry=False, complete_registry=False, external_only=False):
"""统一构建注册表入口。""" """统一构建注册表入口。"""
if self._setup_called: if self._setup_called:
logger.critical("[UniLab Registry] setup方法已被调用过不允许多次调用") logger.critical("[UniLab Registry] setup方法已被调用过不允许多次调用")
@@ -123,12 +123,15 @@ class Registry:
) )
# 1. AST 静态扫描 (快速, 无需 import) # 1. AST 静态扫描 (快速, 无需 import)
self._run_ast_scan(devices_dirs, upload_registry=upload_registry) self._run_ast_scan(devices_dirs, upload_registry=upload_registry, external_only=external_only)
# 2. Host node 内置设备 # 2. Host node 内置设备
self._setup_host_node() self._setup_host_node()
# 3. YAML 注册表加载 (兼容旧格式) # 3. YAML 注册表加载 (兼容旧格式) — external_only 模式下跳过
if external_only:
logger.info("[UniLab Registry] external_only 模式: 跳过 YAML 注册表加载")
else:
self.registry_paths = [Path(path).absolute() for path in self.registry_paths] self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths): for i, path in enumerate(self.registry_paths):
sys_path = path.parent sys_path = path.parent
@@ -253,7 +256,7 @@ class Registry:
# AST 静态扫描 # AST 静态扫描
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _run_ast_scan(self, devices_dirs=None, upload_registry=False): def _run_ast_scan(self, devices_dirs=None, upload_registry=False, external_only=False):
""" """
执行 AST 静态扫描,从 Python 代码中提取 @device / @resource 装饰器元数据。 执行 AST 静态扫描,从 Python 代码中提取 @device / @resource 装饰器元数据。
无需 import 任何驱动模块,速度极快。 无需 import 任何驱动模块,速度极快。
@@ -298,6 +301,20 @@ class Registry:
extra_dirs.append(d_path) extra_dirs.append(d_path)
# 主扫描 # 主扫描
if external_only:
core_files = [
pkg_root / "ros" / "nodes" / "presets" / "host_node.py",
pkg_root / "resources" / "container.py",
]
scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor,
cache=ast_cache, include_files=core_files,
)
logger.info(
f"[UniLab Registry] external_only 模式: 仅扫描核心文件 "
f"({', '.join(f.name for f in core_files)})"
)
else:
exclude_files = {"lab_resources.py"} if not BasicConfig.extra_resource else None exclude_files = {"lab_resources.py"} if not BasicConfig.extra_resource else None
scan_result = scan_directory( scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor, scan_root, python_path=python_path, executor=self._startup_executor,
@@ -1534,9 +1551,9 @@ class Registry:
del resource_info["config_info"] del resource_info["config_info"]
if "file_path" in resource_info: if "file_path" in resource_info:
del resource_info["file_path"] del resource_info["file_path"]
complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items())))
resource_info["registry_type"] = "resource" resource_info["registry_type"] = "resource"
resource_info["file_path"] = str(file.absolute()).replace("\\", "/") resource_info["file_path"] = str(file.absolute()).replace("\\", "/")
complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items())))
for rid in skip_ids: for rid in skip_ids:
data.pop(rid, None) data.pop(rid, None)
@@ -2175,7 +2192,7 @@ class Registry:
lab_registry = Registry() lab_registry = Registry()
def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False): def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False, external_only=False):
""" """
构建或获取Registry单例实例 构建或获取Registry单例实例
""" """
@@ -2189,7 +2206,7 @@ def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False
if path not in current_paths: if path not in current_paths:
lab_registry.registry_paths.append(path) lab_registry.registry_paths.append(path)
lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry) lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry, external_only=external_only)
# 将 AST 扫描的字符串类型替换为实际 ROS2 消息类(仅查找 ROS2 类型,不 import 设备模块) # 将 AST 扫描的字符串类型替换为实际 ROS2 消息类(仅查找 ROS2 类型,不 import 设备模块)
lab_registry.resolve_all_types() lab_registry.resolve_all_types()

View File

@@ -6,20 +6,180 @@
import argparse import argparse
import importlib import importlib
import locale import locale
import shutil
import subprocess import subprocess
import sys import sys
from pathlib import Path
from typing import List, Optional
from unilabos.utils.banner_print import print_status from unilabos.utils.banner_print import print_status
# ---------------------------------------------------------------------------
# 底层安装工具
# ---------------------------------------------------------------------------
def _is_chinese_locale() -> bool:
try:
lang = locale.getdefaultlocale()[0]
return bool(lang and ("zh" in lang.lower() or "chinese" in lang.lower()))
except Exception:
return False
_USE_UV: Optional[bool] = None
def _has_uv() -> bool:
global _USE_UV
if _USE_UV is None:
_USE_UV = shutil.which("uv") is not None
return _USE_UV
def _install_packages(
packages: List[str],
upgrade: bool = False,
label: str = "",
) -> bool:
"""
安装/升级一组包。优先 uv pip install回退 sys pip。
逐个安装,任意一个失败不影响后续包。
Returns:
True if all succeeded, False otherwise.
"""
if not packages:
return True
is_chinese = _is_chinese_locale()
use_uv = _has_uv()
failed: List[str] = []
for pkg in packages:
action_word = "升级" if upgrade else "安装"
if label:
print_status(f"[{label}] 正在{action_word} {pkg}...", "info")
else:
print_status(f"正在{action_word} {pkg}...", "info")
if use_uv:
cmd = ["uv", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["--index-url", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
else:
cmd = [sys.executable, "-m", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
installer = "uv" if use_uv else "pip"
print_status(f"{pkg} {action_word}成功 (via {installer})", "success")
else:
stderr_short = result.stderr.strip().split("\n")[-1] if result.stderr else "unknown error"
print_status(f"× {pkg} {action_word}失败: {stderr_short}", "error")
failed.append(pkg)
except subprocess.TimeoutExpired:
print_status(f"× {pkg} {action_word}超时 (300s)", "error")
failed.append(pkg)
except Exception as e:
print_status(f"× {pkg} {action_word}异常: {e}", "error")
failed.append(pkg)
if failed:
print_status(f"{len(failed)} 个包操作失败: {', '.join(failed)}", "error")
return False
return True
# ---------------------------------------------------------------------------
# requirements.txt 安装(可多次调用)
# ---------------------------------------------------------------------------
def install_requirements_txt(req_path: str | Path, label: str = "") -> bool:
"""
读取一个 requirements.txt 文件,检查缺失的包并安装。
Args:
req_path: requirements.txt 文件路径
label: 日志前缀标签(如 "device_package_sim"
Returns:
True if all ok, False if any install failed.
"""
req_path = Path(req_path)
if not req_path.exists():
return True
tag = label or req_path.parent.name
print_status(f"[{tag}] 检查依赖: {req_path}", "info")
reqs: List[str] = []
with open(req_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and not line.startswith("-"):
reqs.append(line)
if not reqs:
return True
missing: List[str] = []
for req in reqs:
pkg_import = req.split(">=")[0].split("==")[0].split("<")[0].split("[")[0].split(">")[0].strip()
pkg_import = pkg_import.replace("-", "_")
try:
importlib.import_module(pkg_import)
except ImportError:
missing.append(req)
if not missing:
print_status(f"[{tag}] ✓ 依赖检查通过 ({len(reqs)} 个包)", "success")
return True
print_status(f"[{tag}] 缺失 {len(missing)} 个依赖: {', '.join(missing)}", "warning")
return _install_packages(missing, label=tag)
def check_device_package_requirements(devices_dirs: list[str]) -> bool:
"""
检查 --devices 指定的所有外部设备包目录中的 requirements.txt。
对每个目录查找 requirements.txt先在目录内找再在父目录找
"""
if not devices_dirs:
return True
all_ok = True
for d in devices_dirs:
d_path = Path(d).resolve()
req_file = d_path / "requirements.txt"
if not req_file.exists():
req_file = d_path.parent / "requirements.txt"
if not req_file.exists():
continue
if not install_requirements_txt(req_file, label=d_path.name):
all_ok = False
return all_ok
# ---------------------------------------------------------------------------
# UniLabOS 核心环境检查
# ---------------------------------------------------------------------------
class EnvironmentChecker: class EnvironmentChecker:
"""环境检查器""" """环境检查器"""
def __init__(self): def __init__(self):
# 定义必需的包及其安装名称的映射
self.required_packages = { self.required_packages = {
# 包导入名 : pip安装名
# "pymodbus.framer.FramerType": "pymodbus==3.9.2",
"websockets": "websockets", "websockets": "websockets",
"msgcenterpy": "msgcenterpy", "msgcenterpy": "msgcenterpy",
"orjson": "orjson", "orjson": "orjson",
@@ -28,33 +188,17 @@ class EnvironmentChecker:
"crcmod": "crcmod-plus", "crcmod": "crcmod-plus",
} }
# 特殊安装包(需要特殊处理的包)
self.special_packages = {"pylabrobot": "git+https://github.com/Xuwznln/pylabrobot.git"} self.special_packages = {"pylabrobot": "git+https://github.com/Xuwznln/pylabrobot.git"}
# 包版本要求(包名: 最低版本)
self.version_requirements = { self.version_requirements = {
"msgcenterpy": "0.1.8", # msgcenterpy 最低版本要求 "msgcenterpy": "0.1.8",
} }
self.missing_packages = [] self.missing_packages: List[tuple] = []
self.failed_installs = [] self.failed_installs: List[tuple] = []
self.packages_need_upgrade = [] self.packages_need_upgrade: List[tuple] = []
# 检测系统语言
self.is_chinese = self._is_chinese_locale()
def _is_chinese_locale(self) -> bool:
"""检测系统是否为中文环境"""
try:
lang = locale.getdefaultlocale()[0]
if lang and ("zh" in lang.lower() or "chinese" in lang.lower()):
return True
except Exception:
pass
return False
def check_package_installed(self, package_name: str) -> bool: def check_package_installed(self, package_name: str) -> bool:
"""检查包是否已安装"""
try: try:
importlib.import_module(package_name) importlib.import_module(package_name)
return True return True
@@ -62,7 +206,6 @@ class EnvironmentChecker:
return False return False
def get_package_version(self, package_name: str) -> str | None: def get_package_version(self, package_name: str) -> str | None:
"""获取已安装包的版本"""
try: try:
module = importlib.import_module(package_name) module = importlib.import_module(package_name)
return getattr(module, "__version__", None) return getattr(module, "__version__", None)
@@ -70,88 +213,32 @@ class EnvironmentChecker:
return None return None
def compare_version(self, current: str, required: str) -> bool: def compare_version(self, current: str, required: str) -> bool:
"""
比较版本号
Returns:
True: current >= required
False: current < required
"""
try: try:
current_parts = [int(x) for x in current.split(".")] current_parts = [int(x) for x in current.split(".")]
required_parts = [int(x) for x in required.split(".")] required_parts = [int(x) for x in required.split(".")]
# 补齐长度
max_len = max(len(current_parts), len(required_parts)) max_len = max(len(current_parts), len(required_parts))
current_parts.extend([0] * (max_len - len(current_parts))) current_parts.extend([0] * (max_len - len(current_parts)))
required_parts.extend([0] * (max_len - len(required_parts))) required_parts.extend([0] * (max_len - len(required_parts)))
return current_parts >= required_parts return current_parts >= required_parts
except Exception: except Exception:
return True # 如果无法比较,假设版本满足要求
def install_package(self, package_name: str, pip_name: str, upgrade: bool = False) -> bool:
"""安装包"""
try:
action = "升级" if upgrade else "安装"
print_status(f"正在{action} {package_name} ({pip_name})...", "info")
# 构建安装命令
cmd = [sys.executable, "-m", "pip", "install"]
# 如果是升级操作,添加 --upgrade 参数
if upgrade:
cmd.append("--upgrade")
cmd.append(pip_name)
# 如果是中文环境,使用清华镜像源
if self.is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
# 执行安装
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) # 5分钟超时
if result.returncode == 0:
print_status(f"{package_name} {action}成功", "success")
return True return True
else:
print_status(f"× {package_name} {action}失败: {result.stderr}", "error")
return False
except subprocess.TimeoutExpired:
print_status(f"× {package_name} {action}超时", "error")
return False
except Exception as e:
print_status(f"× {package_name} {action}异常: {str(e)}", "error")
return False
def upgrade_package(self, package_name: str, pip_name: str) -> bool:
"""升级包"""
return self.install_package(package_name, pip_name, upgrade=True)
def check_all_packages(self) -> bool: def check_all_packages(self) -> bool:
"""检查所有必需的包"""
print_status("开始检查环境依赖...", "info") print_status("开始检查环境依赖...", "info")
# 检查常规包
for import_name, pip_name in self.required_packages.items(): for import_name, pip_name in self.required_packages.items():
if not self.check_package_installed(import_name): if not self.check_package_installed(import_name):
self.missing_packages.append((import_name, pip_name)) self.missing_packages.append((import_name, pip_name))
else: elif import_name in self.version_requirements:
# 检查版本要求
if import_name in self.version_requirements:
current_version = self.get_package_version(import_name) current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name] required_version = self.version_requirements[import_name]
if current_version and not self.compare_version(current_version, required_version):
if current_version:
if not self.compare_version(current_version, required_version):
print_status( print_status(
f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})", f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})",
"warning", "warning",
) )
self.packages_need_upgrade.append((import_name, pip_name)) self.packages_need_upgrade.append((import_name, pip_name))
# 检查特殊包
for package_name, install_url in self.special_packages.items(): for package_name, install_url in self.special_packages.items():
if not self.check_package_installed(package_name): if not self.check_package_installed(package_name):
self.missing_packages.append((package_name, install_url)) self.missing_packages.append((package_name, install_url))
@@ -170,7 +257,6 @@ class EnvironmentChecker:
return False return False
def install_missing_packages(self, auto_install: bool = True) -> bool: def install_missing_packages(self, auto_install: bool = True) -> bool:
"""安装缺失的包"""
if not self.missing_packages and not self.packages_need_upgrade: if not self.missing_packages and not self.packages_need_upgrade:
return True return True
@@ -178,62 +264,36 @@ class EnvironmentChecker:
if self.missing_packages: if self.missing_packages:
print_status("缺失以下包:", "warning") print_status("缺失以下包:", "warning")
for import_name, pip_name in self.missing_packages: for import_name, pip_name in self.missing_packages:
print_status(f" - {import_name} (pip install {pip_name})", "warning") print_status(f" - {import_name} ({pip_name})", "warning")
if self.packages_need_upgrade: if self.packages_need_upgrade:
print_status("需要升级以下包:", "warning") print_status("需要升级以下包:", "warning")
for import_name, pip_name in self.packages_need_upgrade: for import_name, pip_name in self.packages_need_upgrade:
print_status(f" - {import_name} (pip install --upgrade {pip_name})", "warning") print_status(f" - {import_name} ({pip_name})", "warning")
return False return False
# 安装缺失的包
if self.missing_packages: if self.missing_packages:
print_status(f"开始自动安装 {len(self.missing_packages)} 个缺失的包...", "info") pkgs = [pip_name for _, pip_name in self.missing_packages]
if not _install_packages(pkgs, label="unilabos"):
self.failed_installs.extend(self.missing_packages)
success_count = 0
for import_name, pip_name in self.missing_packages:
if self.install_package(import_name, pip_name):
success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功安装 {success_count}/{len(self.missing_packages)} 个包", "success")
# 升级需要更新的包
if self.packages_need_upgrade: if self.packages_need_upgrade:
print_status(f"开始自动升级 {len(self.packages_need_upgrade)} 个包...", "info") pkgs = [pip_name for _, pip_name in self.packages_need_upgrade]
if not _install_packages(pkgs, upgrade=True, label="unilabos"):
self.failed_installs.extend(self.packages_need_upgrade)
upgrade_success_count = 0 return not self.failed_installs
for import_name, pip_name in self.packages_need_upgrade:
if self.upgrade_package(import_name, pip_name):
upgrade_success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功升级 {upgrade_success_count}/{len(self.packages_need_upgrade)} 个包", "success")
if self.failed_installs:
print_status(f"{len(self.failed_installs)} 个包操作失败:", "error")
for import_name, pip_name in self.failed_installs:
print_status(f" - {import_name} ({pip_name})", "error")
return False
return True
def verify_installation(self) -> bool: def verify_installation(self) -> bool:
"""验证安装结果"""
if not self.missing_packages and not self.packages_need_upgrade: if not self.missing_packages and not self.packages_need_upgrade:
return True return True
print_status("验证安装结果...", "info") print_status("验证安装结果...", "info")
failed_verification = [] failed_verification = []
# 验证新安装的包
for import_name, pip_name in self.missing_packages: for import_name, pip_name in self.missing_packages:
if not self.check_package_installed(import_name): if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name)) failed_verification.append((import_name, pip_name))
# 验证升级的包
for import_name, pip_name in self.packages_need_upgrade: for import_name, pip_name in self.packages_need_upgrade:
if not self.check_package_installed(import_name): if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name)) failed_verification.append((import_name, pip_name))
@@ -270,17 +330,14 @@ def check_environment(auto_install: bool = True, show_details: bool = True) -> b
""" """
checker = EnvironmentChecker() checker = EnvironmentChecker()
# 检查包
if checker.check_all_packages(): if checker.check_all_packages():
return True return True
# 安装缺失的包
if not checker.install_missing_packages(auto_install): if not checker.install_missing_packages(auto_install):
if show_details: if show_details:
print_status("请手动安装缺失的包后重新启动程序", "error") print_status("请手动安装缺失的包后重新启动程序", "error")
return False return False
# 验证安装
if not checker.verify_installation(): if not checker.verify_installation():
if show_details: if show_details:
print_status("安装验证失败,请检查网络连接或手动安装", "error") print_status("安装验证失败,请检查网络连接或手动安装", "error")
@@ -290,14 +347,12 @@ def check_environment(auto_install: bool = True, show_details: bool = True) -> b
if __name__ == "__main__": if __name__ == "__main__":
# 命令行参数解析
parser = argparse.ArgumentParser(description="UniLabOS 环境依赖检查工具") parser = argparse.ArgumentParser(description="UniLabOS 环境依赖检查工具")
parser.add_argument("--no-auto-install", action="store_true", help="仅检查环境,不自动安装缺失的包") parser.add_argument("--no-auto-install", action="store_true", help="仅检查环境,不自动安装缺失的包")
parser.add_argument("--silent", action="store_true", help="静默模式,不显示详细信息") parser.add_argument("--silent", action="store_true", help="静默模式,不显示详细信息")
args = parser.parse_args() args = parser.parse_args()
# 执行环境检查
auto_install = not args.no_auto_install auto_install = not args.no_auto_install
show_details = not args.silent show_details = not args.silent

View File

@@ -82,7 +82,7 @@ def get_result_info_str(error: str, suc: bool, return_value=None) -> str:
""" """
samples = None samples = None
if isinstance(return_value, dict): if isinstance(return_value, dict):
if "samples" in return_value: if "samples" in return_value and type(return_value["samples"]) in [list, tuple] and type(return_value["samples"][0]) == dict:
samples = return_value.pop("samples") samples = return_value.pop("samples")
result_info = {"error": error, "suc": suc, "return_value": return_value, "samples": samples} result_info = {"error": error, "suc": suc, "return_value": return_value, "samples": samples}