Compare commits

..

21 Commits

Author SHA1 Message Date
yxz321
f14e1bc4a0 plan: add modify reset plan 2026-05-21 19:53:58 +08:00
yxz321
247a0ee4c6 add and expose sync_from_external
从奔曜同步本地资源树+publish到deck
2026-05-21 19:37:02 +08:00
yxz321
a084031af0 improve: 使用调度状态接口做健康检查
- 将 Bioyond 启动健康检查从物料类型列表切换为 scheduler_status()。
- 避免 debug 日志输出完整物料类型列表。
- 保持连接监控响应更轻量、更易读。
2026-05-21 15:59:17 +08:00
yxz321
212f9ec448 plan: modify add_two_node plan for material unload 2026-05-20 19:19:02 +08:00
hanhua@dp.tech
2fd8f0d3f1 add plan 2026-05-20 11:55:08 +08:00
hanhua@dp.tech
a4678b7aa8 udpate reset 2026-05-18 18:26:16 +08:00
hanhua@dp.tech
72495bfc74 fix bug 2026-05-18 14:24:38 +08:00
hanhua@dp.tech
97ccc38c7f execute plan 2026-05-18 11:10:20 +08:00
yxz321
1df8fbd173 chore: cleanup peptide implementation to remove stale browser based pathway and other legacy helper functions. 2026-05-16 22:57:05 +08:00
yxz321
26155b8343 feat: PEP add Bioyond peptide station runtime
- Add the Bioyond peptide station package with the station-facing Day2 submission flow inlined into BioyondPeptideStation.
- Add LIMS sample Excel upload, Day2/Day3 order creation helpers, scheduler/reset controls, and manual-confirm start/reset actions.
- Register peptide material PLR resource classes and default peptide material type mappings for runtime resource synchronization.
- Add the Bioyond peptide deck definition and warehouse axis/key-axis metadata needed for peptide layout conversion.
- Update shared Bioyond warehouse/resource conversion helpers so peptide deck coordinates round-trip correctly.
- Include shared Bioyond raw-call debug logging support used by station actions, with a generic local debug output default.
- Register the peptide deck in PLR additional resources for deserialization/import visibility.
- Exclude private temp_benyao docs, HAR/API inputs, live diagnostics, and siRNA-only station/material files from this handoff commit.
2026-05-13 19:43:57 +08:00
Xuwznln
927c7e95f5 fix pack install 2 2026-05-09 01:22:42 +08:00
Xuwznln
16910fe25c fix pip install & git install failed 2026-05-08 23:50:00 +08:00
Xuwznln
c38987d94d fix pack build 1 2026-05-08 23:49:32 +08:00
Junhan Chang
e4132111bc Update SKILL.md 2026-05-08 00:08:04 +08:00
Junhan Chang
211ee3027d Update Skills 2026-05-07 23:01:37 +08:00
Xuwznln
32c195d875 Update registry for all param desc 2026-04-27 20:47:52 +08:00
Xuwznln
f145dc04bb Support display_name & desc in new registry system
(cherry picked from commit f71ea2a258)
2026-04-27 20:28:54 +08:00
Xuwznln
195fad9398 upgrade to 0.11.1 2026-04-22 19:54:16 +08:00
Xuwznln
898ed5d34b use gitee to install pylabrobot
fix virtual import
2026-04-22 19:51:10 +08:00
Xuwznln
60cbedc4b2 v0.11.0
(cherry picked from commit 67a74172dc)
2026-04-22 19:50:42 +08:00
Xuwznln
2d6a9f7db9 fix possible conversion error 2026-04-22 00:09:06 +08:00
58 changed files with 6402 additions and 407 deletions

View File

@@ -3,7 +3,7 @@
package:
name: unilabos
version: 0.10.19
version: 0.11.1
source:
path: ../../unilabos
@@ -54,7 +54,7 @@ requirements:
- pymodbus
- matplotlib
- pylibftdi
- uni-lab::unilabos-env ==0.10.19
- uni-lab::unilabos-env ==0.11.1
about:
repository: https://github.com/deepmodeling/Uni-Lab-OS

View File

@@ -2,7 +2,7 @@
package:
name: unilabos-env
version: 0.10.19
version: 0.11.1
build:
noarch: generic

View File

@@ -3,7 +3,7 @@
package:
name: unilabos-full
version: 0.10.19
version: 0.11.1
build:
noarch: generic
@@ -11,7 +11,7 @@ build:
requirements:
run:
# Base unilabos package (includes unilabos-env)
- uni-lab::unilabos ==0.10.19
- uni-lab::unilabos ==0.11.1
# Documentation tools
- sphinx
- sphinx_rtd_theme

View File

@@ -71,6 +71,22 @@ from unilabos.registry.decorators import action
- `_` 开头的方法 → 不扫描
- `@not_action` 标记的方法 → 排除
### 参数文档 → JSON Schema 元数据
`__init__` 和 action 方法 docstring 的 `Args:` 小节里,使用以下格式生成入参 schema 的显示信息:
```python
"""
Args:
param[显示名称]: 参数说明,会写入 JSON Schema 的 description。
"""
```
- `param[显示名称]` 的显示名称会写入 goal property 的 `title`
- `:` 后面的说明会写入 goal property 的 `description`
- 如果只写 `param: 参数说明``title` 会兜底为字段名,`description` 使用参数说明。
- 如果没有写参数文档,生成器也会兜底补齐 `title=<字段名>``description=""`,但新设备应优先写清楚显示名和说明。
### @topic_config — 状态属性配置
```python
@@ -105,13 +121,27 @@ import logging
from typing import Any, Dict, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.registry.decorators import device, action, topic_config, not_action
from unilabos.registry.decorators import action, device, not_action, topic_config
@device(id="my_device", category=["my_category"], description="设备描述")
@device(
id="my_device",
category=["my_category"],
description="设备描述",
display_name="设备显示名",
)
class MyDevice:
"""设备类说明。"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
"""
初始化设备。
Args:
device_id[设备ID]: 设备实例 ID默认使用 my_device。
config[设备配置]: 设备启动配置。
"""
self.device_id = device_id or "my_device"
self.config = config or {}
self.logger = logging.getLogger(f"MyDevice.{self.device_id}")
@@ -133,7 +163,13 @@ class MyDevice:
@action(description="执行操作")
def my_action(self, param: float = 0.0, name: str = "") -> Dict[str, Any]:
"""带 @action 装饰器 → 注册为 'my_action' 动作"""
"""
带 @action 装饰器 → 注册为 'my_action' 动作。
Args:
param[操作数值]: 操作使用的数值参数。
name[操作名称]: 操作名称或备注。
"""
return {"success": True}
def get_info(self) -> Dict[str, Any]:

View File

@@ -0,0 +1,450 @@
---
name: filter-workflow-by-tags
description: Query backend workflow list, aggregate all tags, and filter workflows by domain/scenario requirements using tags. Use when the user wants to search workflows, find workflows by tags, list available workflow tags, filter workflows by category/domain/scenario, or mentions 工作流筛选/标签查询/workflow tags/按领域查找工作流.
---
# Uni-Lab 工作流标签筛选指南
通过 Uni-Lab 云端 API 查询工作流列表汇总所有可用标签tags并根据领域和场景要求筛选工作流。
> **重要**:本指南中的 `Authorization: Lab <token>` 是 **Uni-Lab 平台专用的认证方式**`Lab` 是 Uni-Lab 的 auth scheme 关键字,**不是** HTTP Basic 认证。请勿将其替换为 `Basic`。
## 使用模式识别
**用户可能一开始就给出场景目标**(如"我要做有机合成实验"、"找柱层析相关的 protocol")。此时:
1. **识别场景关键词** → 映射到可能的 tags如 synthesis、organic、chromatography、purification
2. **直接执行完整流程**(获取 ak/sk/addr → 拉取所有工作流 → 汇总 tags → 按场景筛选)
3. **展示筛选结果** → 引导用户从候选 workflow 中**选择明确的实验 protocol**
4. **如果用户确认某个 workflow** → 记录 `workflow_uuid`,准备对接“与其他 Skill 的协作”
**如果用户未给场景目标**,则按标准 checklist 询问筛选条件。
---
## 前置条件
使用本指南前,**必须**先确认以下信息。如果缺少任何一项,**立即向用户询问并终止**,等补齐后再继续。
### 1. ak / sk → AUTH
询问用户的启动参数,从 `--ak` `--sk` 或 config.py 中获取。
生成 AUTH token
```bash
python -c "import base64,sys; print('Authorization: Lab ' + base64.b64encode(f'{sys.argv[1]}:{sys.argv[2]}'.encode()).decode())" <ak> <sk>
```
### 2. --addr → BASE URL
| `--addr` 值 | BASE |
| ------------- | ------------------------------------- |
| `test` | `https://leap-lab.test.bohrium.com` |
| `uat` | `https://leap-lab.uat.bohrium.com` |
| `local` | `http://127.0.0.1:48197` |
| 不传(默认) | `https://leap-lab.bohrium.com` |
确认后设置:
```bash
BASE="<根据 addr 确定的 URL>"
AUTH="Authorization: Lab <上面命令输出的 token>"
```
### 3. lab_uuid实验室 UUID
如果用户未提供 `lab_uuid`,通过以下 API 自动获取:
```bash
curl -s -X GET "$BASE/api/v1/edge/lab/info" -H "$AUTH"
```
返回 `data.uuid` 即为 `lab_uuid`
**三项全部就绪后才可开始。**
## Session State
在整个对话过程中agent 需要记住以下状态:
- `lab_uuid` — 实验室 UUID
- `all_workflows` — 完整工作流列表(分页获取后缓存到内存或临时文件)
- `all_tags` — 所有工作流的标签汇总
---
## API 端点
### 查询工作流列表(支持分页)
```
GET $BASE/api/v1/lab/workflow/owner/list?page=<page>&page_size=<page_size>&lab_uuid=$lab_uuid
```
**参数:**
- `page` — 页码,从 1 开始
- `page_size` — 每页数量,建议 1000
- `lab_uuid` — 实验室 UUID
**返回结构:**
```json
{
"code": 0,
"data": {
"has_more": true,
"data": [
{
"uuid": "9661bba2-1b9f-4687-a63d-910245df174b",
"name": "Untitled",
"description": "",
"user_id": "114211",
"published": false,
"tags": null
},
{
"uuid": "e0436638-190b-46bc-b1a1-2711d9602f6a",
"name": "Synthesis v2",
"user_id": "114211",
"published": true,
"tags": ["synthesis", "organic"]
}
]
}
}
```
**字段说明:**
- `has_more` — 若为 `true`,需要继续请求 `page+1`
- `tags` — 可能为 `null`、空数组或字符串数组;聚合时必须容忍 `null`
### 启动工作流(直接运行)
```
POST $BASE/api/v1/lab/workflow/<workflow_uuid>/run
```
**用途:** 直接启动一个 workflow 的默认执行(使用模板中预设的参数),无需创建 notebook。适用于快速测试或无参数变化的重复执行。
**请求体:** 空 JSON `{}` 或省略
**返回:**
```json
{
"code": 0,
"data": "<run_uuid>"
}
```
- `run_uuid` — 本次执行的唯一标识(不是 notebook UUID
**注意:**
- 该接口会使用 workflow 模板中保存的默认参数直接执行
- 如果 workflow 需要动态参数(如 CSV 路径、样品 UUID应使用 `POST /lab/notebook` 创建 notebook 并传入 `node_params`
- 返回的 `run_uuid` 可直接传入下方「查询任务状态」接口查询实时进度
### 查询任务状态
```
GET $BASE/api/v1/lab/mcp/task/<task_uuid>
```
**用途:** 查询由 `POST /lab/workflow/<uuid>/run` 返回的 `run_uuid`(即 task_uuid的实时执行状态包括整体状态和每个节点JOSJob On Station的执行详情。
**路径参数:**
- `task_uuid` — 等同于启动工作流接口返回的 `run_uuid`
**返回:**
```json
{
"code": 0,
"data": {
"status": "running",
"jos_status": [
{
"uuid": "d0e24bfe-8d99-450e-b19d-f25849dfbaad",
"node_name": "PRCXI_BioER_96_wellplate_slot_1",
"action_name": "create_resource",
"status": "success",
"return_info": {
"suc": true,
"error": "",
"return_value": { ... }
}
},
{
"uuid": "...",
"node_name": "...",
"action_name": "transfer_liquid",
"status": "pending",
"return_info": null
}
]
}
}
```
**字段说明:**
- `data.status` — 整体任务状态
- `running` — 正在执行(至少一个节点 pending 或 running
- `success` — 全部节点成功
- `failed` — 有节点失败
- `data.jos_status[]` — 节点级执行列表(按执行顺序)
- `uuid` — 节点执行实例 UUID
- `node_name` — 节点名称(资源/设备名或工位名)
- `action_name` — 动作类型(`create_resource``transfer_liquid``centrifuge`、等)
- `status` — 节点状态:`success``pending``running``failed`
- `return_info` — 执行返回,失败时 `suc=false``error` 有错误信息
**注意:**
- 此接口的 `task_uuid` **是** `POST /lab/workflow/<uuid>/run` 返回的 `run_uuid`,二者为同一个 ID 的不同称呼
- **不要**把 notebook UUID`POST /lab/notebook` 返回)传进来——那条路径用 `GET /lab/notebook/status` 查询
- `jos_status` 数组按节点执行顺序给出;从 pending 数量可以估算剩余进度
- 返回体可能较大(`return_info.return_value` 中可能包含完整 resource tree可在脚本中只提取 `status` + `node_name` + `action_name` 做摘要
**状态轮询示例:**
```bash
# 每 5 秒轮询一次直至完成
TASK="b183d97e-d2b5-4b24-b14b-820df04d87c0"
while :; do
st=$(curl -s -X GET "$BASE/api/v1/lab/mcp/task/$TASK" -H "$AUTH" \
| python3 -c "import json,sys; d=json.load(sys.stdin)['data']; \
print(d['status'], '|', sum(1 for j in d['jos_status'] if j['status']=='success'), '/', len(d['jos_status']))")
echo "$(date +%H:%M:%S) $st"
[[ "$st" == success* || "$st" == failed* ]] && break
sleep 5
done
```
---
## 完整工作流 Checklist
```
Task Progress:
- [ ] Step 0: 识别用户是否已给出场景目标(如"有机合成"、"柱层析"
- 若已给出 → 记录场景关键词,自动进入后续步骤
- 若未给出 → 在 Step 6 询问用户
- [ ] Step 1: 确认 ak/sk → 生成 AUTH token
- [ ] Step 2: 确认 --addr → 设置 BASE URL
- [ ] Step 3: GET /edge/lab/info → 获取 lab_uuid如用户未提供
- [ ] Step 4: 分页获取所有工作流(从 page=1 开始直到 has_more=false
- [ ] Step 5: 汇总所有非空 tags → 生成 all_tags去重、排序、附出现次数
- [ ] Step 6: 根据场景关键词Step 0 或新询问)在 all_tags 中做语义映射 → 确定候选 tags
- 若语义映射不唯一,列出候选 tags 让用户确认
- [ ] Step 7: 按候选 tags 筛选工作流(默认 any 模式,召回优先)
- [ ] Step 8: 展示筛选结果uuid、name、description、tags、published
- [ ] Step 9: 引导用户从结果中选择**明确的实验 protocol**
- 若结果只有 1 条 → 直接确认该 workflow_uuid
- 若结果 210 条 → 让用户按编号选择
- 若结果过多 → 提示收紧条件(加 tag、切换 all 模式、仅 published
- 若结果为空 → 放宽条件(去掉最稀有 tag或提示用户换关键词
- [ ] Step 10: 记录用户选中的 workflow_uuid并提示提交实验或查看详情
```
---
## 推荐路径:使用脚本
同目录下提供 `scripts/filter_workflows.py`,一次完成分页抓取、标签聚合与筛选:
```bash
# 1. 仅汇总标签(不筛选)
python scripts/filter_workflows.py \
--auth "<Lab base64token>" \
--base "$BASE" \
--lab-uuid "$lab_uuid" \
--summary-only
# 2. 按标签筛选ANY 模式:包含任一)
python scripts/filter_workflows.py \
--auth "<Lab base64token>" \
--base "$BASE" \
--lab-uuid "$lab_uuid" \
--tags synthesis organic \
--mode any
# 3. 按标签筛选ALL 模式:必须同时包含)
python scripts/filter_workflows.py \
--auth "<Lab base64token>" \
--base "$BASE" \
--lab-uuid "$lab_uuid" \
--tags synthesis organic \
--mode all \
--output filtered.json
# 4. 仅筛选已发布
python scripts/filter_workflows.py \
--auth "<Lab base64token>" \
--base "$BASE" \
--lab-uuid "$lab_uuid" \
--tags synthesis \
--published-only
```
**`--auth` 参数说明**:传入 `Authorization` 头中 `Lab` 之后的 base64 token不带 `Lab ` 前缀),脚本内部会自动补上 scheme。
**输出结构:**
```json
{
"total_workflows": 150,
"tag_counts": {"synthesis": 12, "organic": 8, "analysis": 5},
"all_tags": ["analysis", "organic", "synthesis"],
"filter": {"tags": ["synthesis", "organic"], "mode": "any"},
"filtered_workflows": [
{"uuid": "...", "name": "...", "description": "...", "tags": [...], "published": true}
]
}
```
---
## 手动路径curl + jq
如果脚本不可用或环境缺少 Python可用 shell 实现。
### 1. 分页抓取(写入 `all_workflows.json`
```bash
page=1
echo "[]" > all_workflows.json
while :; do
resp=$(curl -s -X GET \
"$BASE/api/v1/lab/workflow/owner/list?page=$page&page_size=1000&lab_uuid=$lab_uuid" \
-H "$AUTH")
page_data=$(echo "$resp" | jq -c '.data.data // []')
jq -c --argjson p "$page_data" '. + $p' all_workflows.json > _tmp.json && mv _tmp.json all_workflows.json
has_more=$(echo "$resp" | jq -r '.data.has_more')
[ "$has_more" != "true" ] && break
page=$((page + 1))
done
echo "Total: $(jq 'length' all_workflows.json)"
```
### 2. 汇总所有标签(含出现次数)
```bash
jq '[.[].tags // [] | .[]] | group_by(.) | map({tag: .[0], count: length}) | sort_by(-.count)' \
all_workflows.json
```
### 3. 按标签筛选
```bash
# ANY包含任一指定标签
jq --argjson want '["synthesis","organic"]' \
'[.[] | select((.tags // []) | any(. as $t | $want | index($t)))]' \
all_workflows.json
# ALL同时包含所有指定标签
jq --argjson want '["synthesis","organic"]' \
'[.[] | select(($want | all(. as $w | (.tags // []) | index($w))))]' \
all_workflows.json
```
---
## 筛选策略
agent 拿到用户的「领域 + 场景」自然语言描述时,按如下顺序选择 tag
1. **优先用户显式指定的 tags**:若用户明确给出标签词,直接精确匹配。
2. **从 all_tags 中做语义映射**:若用户描述是自然语言(如"有机合成、柱层析"),在 all_tags 中找语义相关项(如 `synthesis``organic``chromatography`)。必要时展示候选 tag 让用户确认。
3. **模式选择**
- 默认 `any`(更多召回),给出 tag 集合的并集匹配
- 用户强调"必须同时满足"时用 `all`
4. **空结果兜底**:如果筛选为空,放宽条件(去掉最稀有 tag、切换 any 模式),并提醒用户。
---
## 引导到明确的 Protocol
筛选完成后,**最终目标是让用户确认一个具体的 workflow_uuid**,而不是停留在"一堆候选"上。按结果数量采取不同策略:
| 结果数量 | 策略 |
| --------- | ---------------------------------------------------------------------------------------------------------------------------------- |
| 0 条 | 放宽筛选(去掉最稀有 tag → 切换 any 模式 → 去掉 `--published-only`)。仍为空则提示换关键词,或列出 `all_tags` 让用户重新选。 |
| 1 条 | 直接确认:"找到唯一匹配:`<name>` (uuid `<uuid>`),是否用它?"用户确认后记录 `workflow_uuid`。 |
| 210 条 | 编号列表展示,让用户选编号。每项给出 name、tags、description 摘要、published 状态。 |
| 1030 条 | 先展示 tag 分布帮助用户进一步收紧:列出匹配结果中最常见的子标签,提示"加一个 tag 可将结果缩小到 N 条"。 |
| >30 条 | 强制要求用户补充条件:仅 published、指定具体 tag 组合、或按名称关键词过滤。 |
**确认 workflow 后**
1.`workflow_uuid` 写入 session state
2. 提示用户下一步可用的 skill
- 提交实验 → 引导到“与其他 Skill 的协作”
- 查看 workflow 详细节点 → `GET /api/v1/lab/workflow/template/detail/<workflow_uuid>`
3. 若用户想换一个,回到筛选步骤。
---
## 展示结果
推荐格式(表格 + 汇总统计):
```
共 150 个工作流,其中 32 个匹配筛选条件 [tags: synthesis OR organic]
| UUID (短) | 名称 | Tags | 已发布 |
|-----------|--------------------------|------------------------------|--------|
| e0436638 | Synthesis v2 | synthesis, organic | ✓ |
| 5b60dbb8 | Grignard Protocol | synthesis, organometallic | ✓ |
| ... | ... | ... | ... |
所有可用标签(按频次):
synthesis (12), organic (8), analysis (5), purification (4), ...
```
如果用户下一步想执行某工作流 → 引导到“与其他 Skill 的协作”。
---
## 常见问题
### Q: tags 为 null 的工作流要不要展示?
默认**不展示**在筛选结果中(因为无法按 tag 匹配)。但在 `--summary-only` 或无筛选条件时,这些工作流仍会计入总数,并在输出中单独列出"未打标签"计数。
### Q: 如何按名称/描述做模糊匹配?
脚本未内置,但可在 jq 中组合:
```bash
jq '[.[] | select((.name + " " + (.description // "")) | test("organic"; "i"))]' all_workflows.json
```
### Q: `page_size=1000` 是否会被服务端限制?
接口通常允许最大 1000如果返回量少于 1000 且 `has_more=false`,说明已到末页。极端情况下若服务端返回错误,可降到 200 或 500 再试。
### Q: 工作流数量极大(>10k怎么办
1. 先跑 `--summary-only` 了解 tag 分布
2. 提示用户先限定 `--published-only` 或指定 tag
3. 考虑将 `all_workflows.json` 缓存到本地,下次直接复用
---
## 与其他 Skill 的协作
- 正常情况下,找到 workflow 之后可以直接用它提交实验(启动工作流的 api 端点 POST $BASE/api/v1/lab/workflow/<workflow_uuid>/run不用别的 skill
- **仅当需要进行多次实验时,使用 batch-submit-experiment** — 筛选到目标工作流后,`workflow_uuid` 直接用于实验提交
## 脚本依赖
`scripts/filter_workflows.py` 仅使用 Python 标准库(`urllib``json``argparse`),无需额外安装。

View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""分页拉取 Uni-Lab 工作流列表,汇总 tags 并按 tag 筛选。
使用示例:
python filter_workflows.py \
--auth <base64token> \
--base https://leap-lab.test.bohrium.com \
--lab-uuid a9059772-... \
--tags synthesis organic --mode any
仅依赖 Python 标准库。
"""
from __future__ import annotations
import argparse
import json
import sys
import urllib.error
import urllib.parse
import urllib.request
from collections import Counter
def fetch_all_workflows(base: str, auth_token: str, lab_uuid: str, page_size: int = 1000) -> list[dict]:
"""分页拉取所有 owner 工作流,直到 has_more=false。"""
workflows: list[dict] = []
page = 1
while True:
query = urllib.parse.urlencode(
{"page": page, "page_size": page_size, "lab_uuid": lab_uuid}
)
url = f"{base.rstrip('/')}/api/v1/lab/workflow/owner/list?{query}"
req = urllib.request.Request(
url,
headers={
"Authorization": f"Lab {auth_token}",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
sys.exit(f"[ERROR] HTTP {e.code} on page {page}: {e.read().decode('utf-8', 'ignore')}")
except urllib.error.URLError as e:
sys.exit(f"[ERROR] URL error on page {page}: {e.reason}")
if payload.get("code") != 0:
sys.exit(f"[ERROR] API returned non-zero code: {payload}")
data = payload.get("data") or {}
page_items = data.get("data") or []
workflows.extend(page_items)
if not data.get("has_more"):
break
page += 1
# 防御性兜底,避免接口异常导致无限循环
if page > 1000:
print(f"[WARN] page count exceeded 1000, stopping early", file=sys.stderr)
break
return workflows
def aggregate_tags(workflows: list[dict]) -> tuple[list[str], dict[str, int], int]:
"""返回 (sorted_tags, tag_counts, untagged_count)。"""
counter: Counter[str] = Counter()
untagged = 0
for wf in workflows:
tags = wf.get("tags")
if not tags:
untagged += 1
continue
for t in tags:
if isinstance(t, str) and t.strip():
counter[t.strip()] += 1
return sorted(counter.keys()), dict(counter), untagged
def filter_workflows(
workflows: list[dict],
want_tags: list[str],
mode: str,
published_only: bool,
) -> list[dict]:
"""按 tag 筛选。mode 取值 any / all。"""
want_set = {t.strip() for t in want_tags if t.strip()}
out: list[dict] = []
for wf in workflows:
if published_only and not wf.get("published"):
continue
if not want_set:
out.append(wf)
continue
tags = wf.get("tags") or []
tag_set = {t for t in tags if isinstance(t, str)}
if mode == "all":
if want_set.issubset(tag_set):
out.append(wf)
else: # any
if want_set & tag_set:
out.append(wf)
return out
def project_workflow(wf: dict) -> dict:
"""精简输出字段。"""
return {
"uuid": wf.get("uuid"),
"name": wf.get("name"),
"description": wf.get("description", ""),
"tags": wf.get("tags") or [],
"published": bool(wf.get("published")),
"user_id": wf.get("user_id"),
}
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Fetch & filter Uni-Lab workflows by tags.")
p.add_argument("--auth", required=True, help="Base64 token (the part after `Lab `).")
p.add_argument("--base", required=True, help="Base URL, e.g. https://leap-lab.test.bohrium.com")
p.add_argument("--lab-uuid", required=True, help="Lab UUID.")
p.add_argument("--tags", nargs="*", default=[], help="Tags to filter by (space separated).")
p.add_argument(
"--mode",
choices=["any", "all"],
default="any",
help="any: workflow contains at least one tag; all: workflow contains every tag.",
)
p.add_argument("--published-only", action="store_true", help="Only include published workflows.")
p.add_argument("--page-size", type=int, default=1000, help="Page size, default 1000.")
p.add_argument(
"--summary-only",
action="store_true",
help="Print tag summary without applying filter (still fetches everything).",
)
p.add_argument("--output", help="Write JSON result to this path. If omitted, print to stdout.")
return p.parse_args()
def main() -> None:
args = parse_args()
workflows = fetch_all_workflows(
base=args.base,
auth_token=args.auth,
lab_uuid=args.lab_uuid,
page_size=args.page_size,
)
sorted_tags, tag_counts, untagged = aggregate_tags(workflows)
if args.summary_only:
result = {
"total_workflows": len(workflows),
"untagged_count": untagged,
"tag_counts": tag_counts,
"all_tags": sorted_tags,
}
else:
filtered = filter_workflows(
workflows,
want_tags=args.tags,
mode=args.mode,
published_only=args.published_only,
)
result = {
"total_workflows": len(workflows),
"untagged_count": untagged,
"tag_counts": tag_counts,
"all_tags": sorted_tags,
"filter": {
"tags": args.tags,
"mode": args.mode,
"published_only": args.published_only,
},
"matched_count": len(filtered),
"filtered_workflows": [project_workflow(wf) for wf in filtered],
}
payload = json.dumps(result, ensure_ascii=False, indent=2)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(payload)
print(f"Wrote {len(workflows)} workflows summary → {args.output}", file=sys.stderr)
else:
print(payload)
if __name__ == "__main__":
main()

View File

@@ -38,7 +38,7 @@ jobs:
- name: Install ROS dependencies, uv and unilabos-msgs
run: |
echo Installing ROS dependencies...
mamba install -n check-env conda-forge::uv conda-forge::opencv robostack-staging::ros-humble-ros-core robostack-staging::ros-humble-action-msgs robostack-staging::ros-humble-std-msgs robostack-staging::ros-humble-geometry-msgs robostack-staging::ros-humble-control-msgs robostack-staging::ros-humble-nav2-msgs uni-lab::ros-humble-unilabos-msgs robostack-staging::ros-humble-cv-bridge robostack-staging::ros-humble-vision-opencv robostack-staging::ros-humble-tf-transformations robostack-staging::ros-humble-moveit-msgs robostack-staging::ros-humble-tf2-ros robostack-staging::ros-humble-tf2-ros-py conda-forge::transforms3d -c robostack-staging -c conda-forge -c uni-lab -y
mamba install -n check-env --override-channels -c robostack-staging -c conda-forge -c uni-lab conda-forge::uv conda-forge::opencv robostack-staging::ros-humble-ros-core robostack-staging::ros-humble-action-msgs robostack-staging::ros-humble-std-msgs robostack-staging::ros-humble-geometry-msgs robostack-staging::ros-humble-control-msgs robostack-staging::ros-humble-nav2-msgs uni-lab::ros-humble-unilabos-msgs robostack-staging::ros-humble-cv-bridge robostack-staging::ros-humble-vision-opencv robostack-staging::ros-humble-tf-transformations robostack-staging::ros-humble-moveit-msgs robostack-staging::ros-humble-tf2-ros robostack-staging::ros-humble-tf2-ros-py conda-forge::transforms3d -y
- name: Install pip dependencies and unilabos
run: |

View File

@@ -1,6 +1,10 @@
name: Build Conda-Pack Environment
on:
# 在 UniLabOS Conda Build 成功上传后自动构建非全量 conda-pack
workflow_run:
workflows: ["UniLabOS Conda Build"]
types: [completed]
workflow_dispatch:
inputs:
branch:
@@ -21,6 +25,16 @@ on:
jobs:
build-conda-pack:
if: |
github.event_name == 'workflow_dispatch' ||
(
github.event_name == 'workflow_run' &&
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.event == 'workflow_run'
)
env:
BUILD_FULL: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.build_full == 'true' }}
PACKAGE_REF: ${{ github.event.inputs.branch || github.event.workflow_run.head_sha || github.ref_name }}
strategy:
fail-fast: false
matrix:
@@ -54,7 +68,9 @@ jobs:
id: should_build
shell: bash
run: |
if [[ -z "${{ github.event.inputs.platforms }}" ]]; then
if [[ "${{ github.event_name }}" != "workflow_dispatch" ]]; then
echo "should_build=true" >> $GITHUB_OUTPUT
elif [[ -z "${{ github.event.inputs.platforms }}" ]]; then
echo "should_build=true" >> $GITHUB_OUTPUT
elif [[ "${{ github.event.inputs.platforms }}" == *"${{ matrix.platform }}"* ]]; then
echo "should_build=true" >> $GITHUB_OUTPUT
@@ -65,7 +81,7 @@ jobs:
- uses: actions/checkout@v6
if: steps.should_build.outputs.should_build == 'true'
with:
ref: ${{ github.event.inputs.branch }}
ref: ${{ github.event.inputs.branch || github.event.workflow_run.head_sha || github.ref }}
fetch-depth: 0
- name: Setup Miniforge (with mamba)
@@ -75,7 +91,7 @@ jobs:
miniforge-version: latest
use-mamba: true
python-version: '3.11.14'
channels: conda-forge,robostack-staging,uni-lab,defaults
channels: conda-forge,robostack-staging,uni-lab
channel-priority: flexible
activate-environment: unilab
auto-update-conda: false
@@ -86,13 +102,13 @@ jobs:
run: |
echo Installing unilabos and dependencies to unilab environment...
echo Using mamba for faster and more reliable dependency resolution...
echo Build full: ${{ github.event.inputs.build_full }}
if "${{ github.event.inputs.build_full }}"=="true" (
echo Build full: ${{ env.BUILD_FULL }}
if "${{ env.BUILD_FULL }}"=="true" (
echo Installing unilabos-full ^(complete package^)...
mamba install -n unilab uni-lab::unilabos-full conda-pack -c uni-lab -c robostack-staging -c conda-forge -y
mamba install -n unilab --override-channels -c uni-lab -c robostack-staging -c conda-forge uni-lab::unilabos-full conda-pack zstandard -y
) else (
echo Installing unilabos ^(minimal package^)...
mamba install -n unilab uni-lab::unilabos conda-pack -c uni-lab -c robostack-staging -c conda-forge -y
mamba install -n unilab --override-channels -c uni-lab -c robostack-staging -c conda-forge uni-lab::unilabos conda-pack zstandard -y
)
- name: Install conda-pack, unilabos and dependencies (Unix)
@@ -101,13 +117,13 @@ jobs:
run: |
echo "Installing unilabos and dependencies to unilab environment..."
echo "Using mamba for faster and more reliable dependency resolution..."
echo "Build full: ${{ github.event.inputs.build_full }}"
if [[ "${{ github.event.inputs.build_full }}" == "true" ]]; then
echo "Build full: ${{ env.BUILD_FULL }}"
if [[ "${{ env.BUILD_FULL }}" == "true" ]]; then
echo "Installing unilabos-full (complete package)..."
mamba install -n unilab uni-lab::unilabos-full conda-pack -c uni-lab -c robostack-staging -c conda-forge -y
mamba install -n unilab --override-channels -c uni-lab -c robostack-staging -c conda-forge uni-lab::unilabos-full conda-pack zstandard -y
else
echo "Installing unilabos (minimal package)..."
mamba install -n unilab uni-lab::unilabos conda-pack -c uni-lab -c robostack-staging -c conda-forge -y
mamba install -n unilab --override-channels -c uni-lab -c robostack-staging -c conda-forge uni-lab::unilabos conda-pack zstandard -y
fi
- name: Get latest ros-humble-unilabos-msgs version (Windows)
@@ -134,27 +150,27 @@ jobs:
if: steps.should_build.outputs.should_build == 'true' && matrix.platform == 'win-64'
run: |
echo Checking for available ros-humble-unilabos-msgs versions...
mamba search ros-humble-unilabos-msgs -c uni-lab -c robostack-staging -c conda-forge || echo Search completed
mamba search --override-channels -c uni-lab -c robostack-staging -c conda-forge ros-humble-unilabos-msgs || echo Search completed
echo.
echo Updating ros-humble-unilabos-msgs to latest version...
mamba update -n unilab ros-humble-unilabos-msgs -c uni-lab -c robostack-staging -c conda-forge -y || echo Already at latest version
mamba update -n unilab --override-channels -c uni-lab -c robostack-staging -c conda-forge ros-humble-unilabos-msgs -y || echo Already at latest version
- name: Check for newer ros-humble-unilabos-msgs (Unix)
if: steps.should_build.outputs.should_build == 'true' && matrix.platform != 'win-64'
shell: bash
run: |
echo "Checking for available ros-humble-unilabos-msgs versions..."
mamba search ros-humble-unilabos-msgs -c uni-lab -c robostack-staging -c conda-forge || echo "Search completed"
mamba search --override-channels -c uni-lab -c robostack-staging -c conda-forge ros-humble-unilabos-msgs || echo "Search completed"
echo ""
echo "Updating ros-humble-unilabos-msgs to latest version..."
mamba update -n unilab ros-humble-unilabos-msgs -c uni-lab -c robostack-staging -c conda-forge -y || echo "Already at latest version"
mamba update -n unilab --override-channels -c uni-lab -c robostack-staging -c conda-forge ros-humble-unilabos-msgs -y || echo "Already at latest version"
- name: Install latest unilabos from source (Windows)
if: steps.should_build.outputs.should_build == 'true' && matrix.platform == 'win-64'
run: |
echo Uninstalling existing unilabos...
mamba run -n unilab pip uninstall unilabos -y || echo unilabos not installed via pip
echo Installing unilabos from source (branch: ${{ github.event.inputs.branch }})...
echo Installing unilabos from source (ref: ${{ env.PACKAGE_REF }})...
mamba run -n unilab pip install .
echo Verifying installation...
mamba run -n unilab pip show unilabos
@@ -165,7 +181,7 @@ jobs:
run: |
echo "Uninstalling existing unilabos..."
mamba run -n unilab pip uninstall unilabos -y || echo "unilabos not installed via pip"
echo "Installing unilabos from source (branch: ${{ github.event.inputs.branch }})..."
echo "Installing unilabos from source (ref: ${{ env.PACKAGE_REF }})..."
mamba run -n unilab pip install .
echo "Verifying installation..."
mamba run -n unilab pip show unilabos
@@ -226,7 +242,9 @@ jobs:
if: steps.should_build.outputs.should_build == 'true' && matrix.platform == 'win-64'
run: |
echo Packing unilab environment with conda-pack...
mamba activate unilab && conda pack -n unilab -o unilab-env-${{ matrix.platform }}.tar.gz --ignore-missing-files
for /f "delims=" %%i in ('mamba run -n unilab python -c "import os; print(os.environ['CONDA_PREFIX'])"') do set "UNILAB_PREFIX=%%i"
echo Packing environment at: %UNILAB_PREFIX%
mamba run -n unilab conda-pack -p "%UNILAB_PREFIX%" -o unilab-env-${{ matrix.platform }}.tar.gz --ignore-missing-files
echo Pack file created:
dir unilab-env-${{ matrix.platform }}.tar.gz
@@ -235,8 +253,9 @@ jobs:
shell: bash
run: |
echo "Packing unilab environment with conda-pack..."
mamba install conda-pack -c conda-forge -y
conda pack -n unilab -o unilab-env-${{ matrix.platform }}.tar.gz --ignore-missing-files
UNILAB_PREFIX="$(mamba run -n unilab python -c 'import os; print(os.environ["CONDA_PREFIX"])')"
echo "Packing environment at: $UNILAB_PREFIX"
mamba run -n unilab conda-pack -p "$UNILAB_PREFIX" -o unilab-env-${{ matrix.platform }}.tar.gz --ignore-missing-files
echo "Pack file created:"
ls -lh unilab-env-${{ matrix.platform }}.tar.gz
@@ -267,7 +286,7 @@ jobs:
rem Create README using Python script
echo Creating: README.txt
python scripts\create_readme.py ${{ matrix.platform }} ${{ github.event.inputs.branch }} dist-package\README.txt
python scripts\create_readme.py ${{ matrix.platform }} ${{ env.PACKAGE_REF }} dist-package\README.txt
echo.
echo Distribution package contents:
@@ -303,7 +322,7 @@ jobs:
# Create README using Python script
echo "Creating: README.txt"
python scripts/create_readme.py ${{ matrix.platform }} ${{ github.event.inputs.branch }} dist-package/README.txt
python scripts/create_readme.py ${{ matrix.platform }} ${{ env.PACKAGE_REF }} dist-package/README.txt
echo ""
echo "Distribution package contents:"
@@ -314,7 +333,7 @@ jobs:
if: steps.should_build.outputs.should_build == 'true'
uses: actions/upload-artifact@v6
with:
name: unilab-pack-${{ matrix.platform }}-${{ github.event.inputs.branch }}
name: unilab-pack-${{ matrix.platform }}-${{ env.PACKAGE_REF }}
path: dist-package/
retention-days: 90
if-no-files-found: error
@@ -326,9 +345,9 @@ jobs:
echo Build Summary
echo ==========================================
echo Platform: ${{ matrix.platform }}
echo Branch: ${{ github.event.inputs.branch }}
echo Branch: ${{ env.PACKAGE_REF }}
echo Python version: 3.11.14
if "${{ github.event.inputs.build_full }}"=="true" (
if "${{ env.BUILD_FULL }}"=="true" (
echo Package: unilabos-full ^(complete^)
) else (
echo Package: unilabos ^(minimal^)
@@ -337,7 +356,7 @@ jobs:
echo Distribution package contents:
dir dist-package
echo.
echo Artifact name: unilab-pack-${{ matrix.platform }}-${{ github.event.inputs.branch }}
echo Artifact name: unilab-pack-${{ matrix.platform }}-${{ env.PACKAGE_REF }}
echo.
echo After download, extract the ZIP and run:
echo install_unilab.bat
@@ -351,9 +370,9 @@ jobs:
echo "Build Summary"
echo "=========================================="
echo "Platform: ${{ matrix.platform }}"
echo "Branch: ${{ github.event.inputs.branch }}"
echo "Branch: ${{ env.PACKAGE_REF }}"
echo "Python version: 3.11.14"
if [[ "${{ github.event.inputs.build_full }}" == "true" ]]; then
if [[ "${{ env.BUILD_FULL }}" == "true" ]]; then
echo "Package: unilabos-full (complete)"
else
echo "Package: unilabos (minimal)"
@@ -362,7 +381,7 @@ jobs:
echo "Distribution package contents:"
ls -lh dist-package/
echo ""
echo "Artifact name: unilab-pack-${{ matrix.platform }}-${{ github.event.inputs.branch }}"
echo "Artifact name: unilab-pack-${{ matrix.platform }}-${{ env.PACKAGE_REF }}"
echo ""
echo "After download:"
echo " install_unilab.sh"

View File

@@ -56,7 +56,7 @@ jobs:
miniforge-version: latest
use-mamba: true
python-version: '3.11.14'
channels: conda-forge,robostack-staging,uni-lab,defaults
channels: conda-forge,robostack-staging,uni-lab
channel-priority: flexible
activate-environment: unilab
auto-update-conda: false
@@ -66,7 +66,7 @@ jobs:
run: |
echo "Installing unilabos and dependencies to unilab environment..."
echo "Using mamba for faster and more reliable dependency resolution..."
mamba install -n unilab uni-lab::unilabos -c uni-lab -c robostack-staging -c conda-forge -y
mamba install -n unilab --override-channels -c uni-lab -c robostack-staging -c conda-forge uni-lab::unilabos -y
- name: Install latest unilabos from source
run: |
@@ -105,7 +105,7 @@ jobs:
test -f docs/_build/html/index.html && echo "✓ index.html exists" || echo "✗ index.html missing"
- name: Upload build artifacts
uses: actions/upload-pages-artifact@v5
uses: actions/upload-pages-artifact@v4
if: |
github.event.workflow_run.head_branch == 'main' ||
(github.event_name == 'workflow_dispatch' && github.event.inputs.deploy_to_pages == 'true')

View File

@@ -10,6 +10,9 @@ on:
# 支持 tag 推送(不依赖 CI Check
push:
tags: ['v*']
# GitHub Release 发布时自动构建并上传
release:
types: [published]
# 手动触发
workflow_dispatch:
inputs:
@@ -80,7 +83,7 @@ jobs:
- uses: actions/checkout@v6
with:
# 如果是 workflow_run 触发,使用触发 CI Check 的 commit
ref: ${{ github.event.workflow_run.head_sha || github.ref }}
ref: ${{ github.event.workflow_run.head_sha || github.event.release.tag_name || github.ref }}
fetch-depth: 0
- name: Check if platform should be built
@@ -96,12 +99,13 @@ jobs:
echo "should_build=false" >> $GITHUB_OUTPUT
fi
- name: Setup Miniconda
- name: Setup Miniforge
if: steps.should_build.outputs.should_build == 'true'
uses: conda-incubator/setup-miniconda@v3
with:
miniconda-version: 'latest'
channels: conda-forge,robostack-staging,defaults
miniforge-version: latest
use-mamba: true
channels: conda-forge,robostack-staging
channel-priority: strict
activate-environment: build-env
auto-update-conda: false
@@ -110,7 +114,7 @@ jobs:
- name: Install rattler-build and anaconda-client
if: steps.should_build.outputs.should_build == 'true'
run: |
conda install -c conda-forge rattler-build anaconda-client
mamba install --override-channels -c conda-forge rattler-build anaconda-client -y
- name: Show environment info
if: steps.should_build.outputs.should_build == 'true'
@@ -157,7 +161,13 @@ jobs:
retention-days: 30
- name: Upload to Anaconda.org (unilab organization)
if: steps.should_build.outputs.should_build == 'true' && github.event.inputs.upload_to_anaconda == 'true'
if: |
steps.should_build.outputs.should_build == 'true' &&
(
github.event_name == 'release' ||
startsWith(github.ref, 'refs/tags/') ||
github.event.inputs.upload_to_anaconda == 'true'
)
run: |
for package in $(find ./output -name "*.conda"); do
echo "Uploading $package to unilab organization..."

View File

@@ -1,14 +1,10 @@
name: UniLabOS Conda Build
on:
# 在 CI Check 成功后自动触发
# 在 Multi-Platform Conda Build 成功上传 msgs 后自动触发
workflow_run:
workflows: ["CI Check"]
workflows: ["Multi-Platform Conda Build"]
types: [completed]
branches: [main, dev]
# 标签推送时直接触发(发布版本)
push:
tags: ['v*']
# 手动触发
workflow_dispatch:
inputs:
@@ -33,30 +29,30 @@ on:
type: boolean
jobs:
# 等待 CI Check 完成的 job (仅用于 workflow_run 触发)
wait-for-ci:
# 等待上游 msgs 构建完成的 job (仅用于 workflow_run 触发)
wait-for-upstream:
runs-on: ubuntu-latest
if: github.event_name == 'workflow_run'
outputs:
should_continue: ${{ steps.check.outputs.should_continue }}
steps:
- name: Check CI status
- name: Check upstream workflow status
id: check
run: |
if [[ "${{ github.event.workflow_run.conclusion }}" == "success" ]]; then
if [[ "${{ github.event.workflow_run.conclusion }}" == "success" && ( "${{ github.event.workflow_run.event }}" == "release" || "${{ github.event.workflow_run.event }}" == "push" ) ]]; then
echo "should_continue=true" >> $GITHUB_OUTPUT
echo "CI Check passed, proceeding with build"
echo "Multi-Platform Conda Build passed for release/tag, proceeding with UniLabOS build"
else
echo "should_continue=false" >> $GITHUB_OUTPUT
echo "CI Check did not succeed (status: ${{ github.event.workflow_run.conclusion }}), skipping build"
echo "Upstream workflow is not a successful release/tag build (status: ${{ github.event.workflow_run.conclusion }}, event: ${{ github.event.workflow_run.event }}), skipping build"
fi
build:
needs: [wait-for-ci]
# 运行条件workflow_run 触发且 CI 成功,或者其他触发方式
needs: [wait-for-upstream]
# 运行条件workflow_run 触发且上游成功,或者手动触发
if: |
always() &&
(needs.wait-for-ci.result == 'skipped' || needs.wait-for-ci.outputs.should_continue == 'true')
(needs.wait-for-upstream.result == 'skipped' || needs.wait-for-upstream.outputs.should_continue == 'true')
strategy:
fail-fast: false
matrix:
@@ -79,7 +75,7 @@ jobs:
steps:
- uses: actions/checkout@v6
with:
# 如果是 workflow_run 触发,使用触发 CI Check 的 commit
# 如果是 workflow_run 触发,使用上游 conda 包构建的 commit
ref: ${{ github.event.workflow_run.head_sha || github.ref }}
fetch-depth: 0
@@ -96,12 +92,13 @@ jobs:
echo "should_build=false" >> $GITHUB_OUTPUT
fi
- name: Setup Miniconda
- name: Setup Miniforge
if: steps.should_build.outputs.should_build == 'true'
uses: conda-incubator/setup-miniconda@v3
with:
miniconda-version: 'latest'
channels: conda-forge,robostack-staging,uni-lab,defaults
miniforge-version: latest
use-mamba: true
channels: conda-forge,robostack-staging,uni-lab
channel-priority: strict
activate-environment: build-env
auto-update-conda: false
@@ -110,7 +107,7 @@ jobs:
- name: Install rattler-build and anaconda-client
if: steps.should_build.outputs.should_build == 'true'
run: |
conda install -c conda-forge rattler-build anaconda-client
mamba install --override-channels -c conda-forge rattler-build anaconda-client -y
- name: Show environment info
if: steps.should_build.outputs.should_build == 'true'
@@ -119,11 +116,11 @@ jobs:
conda list | grep -E "(rattler-build|anaconda-client)"
echo "Platform: ${{ matrix.platform }}"
echo "OS: ${{ matrix.os }}"
echo "Build full package: ${{ github.event.inputs.build_full || 'false' }}"
echo "Build full package: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.build_full == 'true' }}"
echo "Building packages:"
echo " - unilabos-env (environment dependencies)"
echo " - unilabos (with pip package)"
if [[ "${{ github.event.inputs.build_full }}" == "true" ]]; then
if [[ "${{ github.event_name == 'workflow_dispatch' && github.event.inputs.build_full == 'true' }}" == "true" ]]; then
echo " - unilabos-full (complete package)"
fi
@@ -134,7 +131,12 @@ jobs:
rattler-build build -r .conda/environment/recipe.yaml -c uni-lab -c robostack-staging -c conda-forge
- name: Upload unilabos-env to Anaconda.org (if enabled)
if: steps.should_build.outputs.should_build == 'true' && github.event.inputs.upload_to_anaconda == 'true'
if: |
steps.should_build.outputs.should_build == 'true' &&
(
github.event_name == 'workflow_run' ||
github.event.inputs.upload_to_anaconda == 'true'
)
run: |
echo "Uploading unilabos-env to uni-lab organization..."
for package in $(find ./output -name "unilabos-env*.conda"); do
@@ -149,7 +151,12 @@ jobs:
rattler-build build -r .conda/base/recipe.yaml -c uni-lab -c robostack-staging -c conda-forge --channel ./output
- name: Upload unilabos to Anaconda.org (if enabled)
if: steps.should_build.outputs.should_build == 'true' && github.event.inputs.upload_to_anaconda == 'true'
if: |
steps.should_build.outputs.should_build == 'true' &&
(
github.event_name == 'workflow_run' ||
github.event.inputs.upload_to_anaconda == 'true'
)
run: |
echo "Uploading unilabos to uni-lab organization..."
for package in $(find ./output -name "unilabos-0*.conda" -o -name "unilabos-[0-9]*.conda"); do
@@ -159,6 +166,7 @@ jobs:
- name: Build unilabos-full - Only when explicitly requested
if: |
steps.should_build.outputs.should_build == 'true' &&
github.event_name == 'workflow_dispatch' &&
github.event.inputs.build_full == 'true'
run: |
echo "Building unilabos-full package on ${{ matrix.platform }}..."
@@ -167,6 +175,7 @@ jobs:
- name: Upload unilabos-full to Anaconda.org (if enabled)
if: |
steps.should_build.outputs.should_build == 'true' &&
github.event_name == 'workflow_dispatch' &&
github.event.inputs.build_full == 'true' &&
github.event.inputs.upload_to_anaconda == 'true'
run: |

View File

@@ -0,0 +1,576 @@
# Peptide Station 新增三个节点:等待订单完成 + 下料确认 + take-out 同步
> 日期: 2026-05-20
> 目标文件: [peptide_station.py](../unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py)
> 参考实现:
> - [bioyond_cell_workstation.py](/Users/dp/python/yxz/unilabos/devices/workstation/bioyond_studio/bioyond_cell/bioyond_cell_workstation.py)`wait_for_order_finish`、`get_material_info`
> - [bioyond_rpc.py L782-824](../unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py)`take_out`
> - [workstation_architecture.md](../docs/developer_guide/examples/workstation_architecture.md)HTTP 报送进入 workstation运行态记录保存在 workstation 内存)
> 状态: 仅需求草稿,不写代码
---
## 一、需求背景
`BioyondPeptideStation` 当前实验流程在 `start_experiment`manual_confirm 启动调度器)之后即结束,缺少:
1. **等待奔耀回报实验完成**:调度器跑完后,奔耀通过 LIMS 推送 `POST /report/order_finish` 回调;目前 peptide_station 没有把这条推送封装成 action 节点,下游无法在工作流图上等待结果,也拿不到 `usedMaterials` 等下游所需信息。
2. **下料引导**:实验完成后操作员需要把样品/产物从仓位里取出,下料前需要看到每个物料对应的 **仓库 / 位置 / 物料名称 / 数量**;下料完成后还需要回写奔耀(调用 `take-out` 接口),让奔耀清空相应库位状态。
本轮新增三个 action 节点,串在 `start_experiment` 之后:
```text
submit_experiment_dayN
-> start_experiment(manual_confirm 上料)
-> wait_for_order_finish (等回调 + 生成 unloadTable)
-> confirm_unload_materials (manual_confirm 下料确认)
-> take_out_materials (调用 take-out 同步奔耀)
```
---
## 二、关键设计决策
### D1. 本轮只支持单订单,`order_ids` 只做占位
当前不实现多订单等待、乱序回调缓存、并发 wait 隔离。节点输入以 `order_id` 为主,`order_code` 可作为调试兜底。
`order_ids` 可在 handle/返回值中保留为占位字段,但实现只处理第一笔或直接忽略多订单列表。多订单、乱序回调、跨节点重跑复用缓存放到后续迭代。
### D2. `start_experiment` 需要显式透传订单字段
当前 `start_experiment` 只有输入 handles缺少输出 handles如果下游 wait 节点要接在 `start_experiment` 之后,必须让 `start_experiment` 透传:
- `order_id`
- `order_ids`(占位)
- `order_code`
- `resultTable`
实现时给 `start_experiment` 增加对应 `ActionOutputHandle`,并在返回值里保留这些字段。`submit_experiment_dayN``start_experiment` 嵌套字典也应包含 `order_code`,便于工作流编辑器连线。
### D3. `unloadTable` 必须与 `resultTable` 字段一致
`unloadTable` 不新增 `posX/posY/posZ/unit` 列,直接复用现有 `RESULT_TABLE_COLUMNS` 的四列:
```python
RESULT_TABLE_COLUMNS = [
{"name": "设备", "key": "whName"},
{"name": "位置", "key": "locationCode"},
{"name": "物料名称", "key": "materialName"},
{"name": "数量", "key": "quantity"},
]
```
`submit_experiment_dayN` 现有上料确认表 `resultTable` 形状如下;`unloadTable` 也必须保持同一 shape只改 `tableName` 和行数据来源:
```json
{
"data": [
{
"whName": "自动化堆栈",
"locationCode": "A1",
"materialName": "96孔板",
"quantity": "1"
}
],
"columns": [
{"name": "设备", "key": "whName"},
{"name": "位置", "key": "locationCode"},
{"name": "物料名称", "key": "materialName"},
{"name": "数量", "key": "quantity"}
],
"tableName": "resultTable"
}
```
`material-info` 官方 schema 中位置坐标字段是 `locations[].x/y/z`,不是 `posX/posY/posZ`。本轮下料表不展示坐标。
### D4. manual_confirm 只做人确认take-out 放到普通 action
`confirm_unload_materials` 只负责展示下料表、等待操作员确认并透传数据;真正的 `take-out` 调用放到后续普通 action `take_out_materials`
这样更接近 UniLab manual_confirm 的推荐模式manual_confirm 是人机确认检查点,副作用由独立普通 action 执行。
### D5. 本轮不做 unload context 缓存
虽然 workstation architecture 文档支持在 workstation 内存保存 HTTP 报送记录,但本轮暂不实现 `unload_context_cache` 或 order report 缓存。
因此本轮限制如下:
- `wait_for_order_finish` 只等待本次进入节点之后到达的 `/report/order_finish`
- 如果 push 早于 wait 节点到达,本轮不自动补救。
- 如果用户在 `confirm_unload_materials` approve 时忘记勾选,节点失败;当前架构不支持原地重新弹出同一个 manual_confirm也不在本轮实现失败节点重跑。
- 后续若要支持重跑复用,应在 `BioyondPeptideStation` 实例上新增 station runtime 的 `unload_context_cache`,按 `orderCode` 缓存 `unloadTable/material_ids/order_id` 等上下文。
---
## 三、节点 1`wait_for_order_finish`(等推送 + 生成 unloadTable
### 行为
1. 解析单订单目标:
- 首选 `order_id`
- 如果没有 `order_code`,通过 `self.hardware_interface.order_report(order_id)` 取返回数据中的 `code` 作为 `orderCode`
- `order_ids` 仅占位,本轮不实现多订单循环。
2. 设置 `self.last_order_code = order_code``self.last_order_report = None`,并 `self.order_finish_event.clear()`
3. 阻塞在 `self.order_finish_event.wait(timeout=timeout_seconds)` 等 LIMS 推送。
4. peptide_station override `process_order_finish_report(report_request, used_materials)`
- 先调用 `super().process_order_finish_report(...)` 保留父类行为(状态发布、物料同步等)。
-`report_request.data.orderCode == self.last_order_code` 时,把 `report_request.data` 存入 `self.last_order_report`,并 `set()` event。
- 非当前订单推送只记录日志,本轮不缓存。
5. 解除阻塞后解析 `status`
- `"30"` -> `success`
- `"-11"` -> `abnormal_stop`
- `"-12"` -> `manual_stop`
- 其它 -> `unknown_<status>`
- 超时 -> `timeout`
6.`report.usedMaterials[].materialId` 调用 `self.hardware_interface.material_info(material_id)`,带本地函数级 `material_info_cache` 避免重复请求。
7. 组装 `unloadTable``material_ids``preintake_ids``unload_summary` 并作为输出 handles 暴露。
### 入参goal_default
| 参数 | 类型 | 说明 |
|------|------|------|
| `order_id` | `str` | 来自 `start_experiment` 透传输出,必填优先 |
| `order_code` | `str` | 调试兜底;若已知 orderCode 可跳过 `order_report` 反查 |
| `order_ids` | `List[str]` | 占位字段;本轮不实现多订单 |
| `timeout_seconds` | `int` | 默认 `36000`10h |
| `poll_mode` | `bool` | 默认 `False`;如需要可沿用 bioyond_cell 的轮询等待风格 |
### 输出 handles
| key | data_type | 说明 |
|-----|-----------|------|
| `order_finish_status` | `str` | `success` / `abnormal_stop` / `manual_stop` / `timeout` / `unknown_*` |
| `order_finish_report` | `json` | 完整 `report_request.data` |
| `used_materials` | `json` | JSON 化后的 `usedMaterials` 列表 |
| `material_ids` | `json` | 从 `used_materials` 抽出的 `materialId` 列表,可为空 |
| `preintake_ids` | `json` | 本轮默认 `[]`,保留扩展点 |
| `unloadTable` | `table` | 下料表,字段与 `resultTable` 一致 |
| `unload_summary` | `json` | `{ "order_code": ..., "total_items": N, "missing_material_info": [...] }` |
| `order_id` | `str` | 透传给后续节点 |
| `order_code` | `str` | 透传给后续节点 |
| `order_ids` | `json` | 占位透传 |
### `unloadTable` 组装规则
返回结构:
```json
{
"data": [
{
"whName": "自动化堆栈",
"locationCode": "A1",
"materialName": "多肽产物",
"quantity": "10 mg"
}
],
"columns": [
{"name": "设备", "key": "whName"},
{"name": "位置", "key": "locationCode"},
{"name": "物料名称", "key": "materialName"},
{"name": "数量", "key": "quantity"}
],
"tableName": "unloadTable"
}
```
每行字段:
| key | 数据来源 |
|-----|----------|
| `whName` | `material_info.locations` 中匹配 `usedMaterials.locationId` 的 location 的 `whName`;匹配不到取第一条 location 的 `whName`;失败为空串 |
| `locationCode` | 匹配 location 的 `code`;匹配不到取第一条 location 的 `code`;再兜底 `usedMaterials.locationId` |
| `materialName` | `material_info.name`;失败为空串 |
| `quantity` | `usedMaterials.usedQuantity`,若 `material_info.unit` 存在则拼成字符串(如 `"10 mg"` |
`material-info` 失败时不抛异常,对应行尽量保留 `locationCode` / `quantity``whName``materialName` 用空串,并把 `materialId` 放入 `unload_summary.missing_material_info`
### 接口依赖
| 接口 | 调用方式 | 用途 |
|------|----------|------|
| `process_order_finish_report` 钩子 | 基类 HTTP 服务已注册 | 接 LIMS push |
| `POST /api/lims/order/order-report` | `self.hardware_interface.order_report(order_id)` | 从 `order_id` 反查 `orderCode` |
| `POST /api/lims/storage/material-info` | `self.hardware_interface.material_info(material_id)` | 查 `whName/locationCode/materialName/unit` |
#### `order_report(order_id)` API 形式
请求:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-20T10:50:00.123Z",
"data": "<orderId UUID>"
}
```
响应中本节点只依赖 `data.code`
```json
{
"code": 1,
"message": null,
"timestamp": 1779255000000,
"data": {
"id": "<orderId UUID>",
"name": "实验260520-103000",
"code": "EXP260520-103000",
"status": 30,
"statusName": "完成"
}
}
```
`BioyondV1RPC.order_report(order_id)` 已经返回响应中的 `data`,所以实现中应读取 `raw.get("code")`,不是 `raw["data"]["code"]`
#### `material_info(material_id)` API 形式
请求:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-20T10:50:00.123Z",
"data": "<materialId UUID>"
}
```
响应中本节点依赖:
```json
{
"id": "<materialId UUID>",
"name": "多肽产物",
"unit": "mg",
"locations": [
{
"id": "<locationId UUID>",
"whid": "<warehouse UUID>",
"whName": "自动化堆栈",
"code": "A1",
"x": 1,
"y": 1,
"z": 1,
"quantity": 10
}
]
}
```
`BioyondV1RPC.material_info(material_id)` 已经返回响应中的 `data`。字段映射:
| unloadTable key | 来源 |
|-----------------|------|
| `whName` | 匹配 `locationId``locations[].whName` |
| `locationCode` | 匹配 `locationId``locations[].code` |
| `materialName` | `name` |
| `quantity` | `usedMaterials[].usedQuantity` + `unit` |
### 实现要点
- `BioyondPeptideStation.__init__` 末尾追加 `self.order_finish_event = threading.Event()``self.last_order_code = None``self.last_order_report = None`
- 新增 `process_order_finish_report` override`super()`,再做单订单匹配。
- `used_materials` 参数是 `MaterialUsage` dataclass 列表;输出前必须转成 JSON dict。
- `unloadTable` 复用 `RESULT_TABLE_COLUMNS`,不新增 `UNLOAD_TABLE_COLUMNS`
---
## 四、节点 2`confirm_unload_materials`(人工下料确认)
### 行为
1. 接收节点 1 输出的 `order_id` / `order_code` / `material_ids` / `preintake_ids` / `unloadTable`
2. 进入 `NodeType.MANUAL_CONFIRM` 阻塞,操作员根据 `unloadTable` 物理下料。
3. 操作员勾选 `materials_unloaded=True` 并 approve 后,节点函数体继续。
4. 校验 `materials_unloaded == True`
- 为 True返回确认结果并透传 `order_id/material_ids/preintake_ids/unloadTable` 给节点 3。
- 为 False`RuntimeError("下料未确认,拒绝继续 take-out")`
### 入参goal_default
| 参数 | 类型 | 说明 |
|------|------|------|
| `order_id` | `str` | 来自节点 1必填 |
| `order_code` | `str` | 来自节点 1日志/排错用 |
| `material_ids` | `List[str]` | 来自节点 1可为空 |
| `preintake_ids` | `List[str]` | 来自节点 1默认 `[]` |
| `unloadTable` | `table` | 来自节点 1供人工确认展示 |
| `materials_unloaded` | `bool` | manual_confirm 勾选字段,默认 `False` |
| `timeout_seconds` | `int` | 默认 `3600` |
| `assignee_user_ids` | `List[str]` | `placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}` |
### 输出 handles
| key | data_type | 说明 |
|-----|-----------|------|
| `unload_confirmed` | `bool` | 是否已人工确认下料 |
| `order_id` | `str` | 透传 |
| `order_code` | `str` | 透传 |
| `material_ids` | `json` | 透传 |
| `preintake_ids` | `json` | 透传 |
| `unloadTable` | `table` | 透传 |
### 实现要点
- 装饰器使用 `node_type=NodeType.MANUAL_CONFIRM`
- `always_free=True``placeholder_keys``feedback_interval=300` 与现有 `start_experiment` 保持一致。
- 本节点不调用 `take_out`,只做确认与透传。
- 忘记勾选后不会自动重新显示下料指引;本轮不实现缓存或失败节点原地重跑。
---
## 五、节点 3`take_out_materials`(调用 take-out 同步奔耀)
### 行为
1. 接收节点 2 透传的 `order_id` / `material_ids` / `preintake_ids`
2. 校验 `order_id` 非空。
3. 调用 `self.hardware_interface.take_out(order_id, preintake_ids=preintake_ids, material_ids=material_ids)`
4. 返回 `take_out_result``unloaded_count``success`
### 入参
| 参数 | 类型 | 说明 |
|------|------|------|
| `order_id` | `str` | 必填 |
| `material_ids` | `List[str]` | 可为空;为空时由奔耀按 `orderId` 处理的能力以后现场确认 |
| `preintake_ids` | `List[str]` | 可为空,默认 `[]` |
| `order_code` | `str` | 日志/排错用 |
### 输出 handles
| key | data_type | 说明 |
|-----|-----------|------|
| `take_out_result` | `json` | `take-out` 原始响应 `{code, message, timestamp, data}` |
| `unloaded_count` | `int` | `len(material_ids)` |
| `success` | `bool` | `take_out_result.code == 1``data` 不为 False |
### 接口依赖
| 接口 | 调用方式 | 用途 |
|------|----------|------|
| `POST /api/lims/order/take-out` | `self.hardware_interface.take_out(order_id, preintake_ids, material_ids)` | 通知奔耀同步取出 |
请求体 schemahelper script 已核对):
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-20T10:50:00.123Z",
"data": {
"orderId": "<UUID>",
"preintakeIds": [],
"materialIds": ["<UUID-1>", "<UUID-2>"]
}
}
```
响应 schema
```json
{
"code": 1,
"message": null,
"timestamp": 1779255000000,
"data": true
}
```
`BioyondV1RPC.take_out(...)` 返回完整响应包,因此 `take_out_materials` 应保留原始包到 `take_out_result`
### 实现要点
- 本轮不修改 `sample_waste_removal`,保持 backward compatibility。
- 新节点只调用现有完整能力的 `take_out(...)`
- `preintake_ids` / `material_ids` 都按可选列表处理,默认 `[]`
- `take-out` 返回 `code != 1` 时返回 `success=False` 并记录 warning是否抛异常留作开放问题。
---
## 六、端到端工作流连线
```mermaid
flowchart LR
submit["submit_experiment_dayN"] --> start["start_experiment<br/>manual_confirm: 上料"]
start -->|order_id, order_code| wait["wait_for_order_finish<br/>等 order_finish + 生成 unloadTable"]
wait -->|order_id, material_ids,<br/>preintake_ids, unloadTable| confirm["confirm_unload_materials<br/>manual_confirm: 操作员下料确认"]
confirm -->|order_id, material_ids,<br/>preintake_ids| takeout["take_out_materials<br/>调用 take-out 同步奔耀"]
bioyond["奔耀 LIMS"] -.HTTP POST /report/order_finish.-> wait
wait -.material-info.-> bioyond
takeout -.take-out.-> bioyond
```
---
## 七、影响面与兼容性
- **`peptide_station.py`**
- 修改 `start_experiment`:增加 `order_id/order_code/order_ids/resultTable` 输出 handles并在返回值透传。
- 增加 `wait_for_order_finish``confirm_unload_materials``take_out_materials` 三个 action。
- 增加 `process_order_finish_report` override。
- 增加 `_build_unload_table(...)` 等私有辅助方法。
- **`bioyond_rpc.py` 不动**
- `take_out` 已有完整 schema 能力。
- `sample_waste_removal` 本轮不改,保持兼容。
- **基类 `station.py` 不动**
- override 中保留 `super().process_order_finish_report(...)` 调用。
- **HTTP 服务不动**
- `WorkstationHTTPService` 已支持 `/report/order_finish`
- **本轮不做缓存**
- 不新增 `unload_context_cache`
- 不支持 push 早于 wait 的自动补救。
- 不支持失败 manual_confirm 原地重跑。
- **测试**:补在现有路径 `unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py`
1. `start_experiment` 输出 handles/返回值透传 `order_id/order_code/order_ids/resultTable`
2. `process_order_finish_report` orderCode 匹配 / 不匹配时 event 是否触发。
3. `wait_for_order_finish` 单订单成功、超时、状态映射、`used_materials` JSON 化。
4. `_build_unload_table` 列顺序严格等于 `RESULT_TABLE_COLUMNS`,且无 `posX/posY/posZ/unit` 列。
5. `material-info` 失败时不抛异常,`missing_material_info` 正确记录。
6. `confirm_unload_materials` 未勾选时报错,勾选后透传下游字段且不调用 `take_out`
7. `take_out_materials` 调用 `hardware_interface.take_out(order_id, preintake_ids, material_ids)`,不调用 `sample_waste_removal`
---
## 八、待人类确认的开放问题
1. **过滤产物 vs 全量**`usedMaterials` 同时包含试剂、耗材、样品(`typeMode` 区分),下料表是否需要默认排除试剂/耗材?当前默认全量列出。
2. **take-out 失败是否阻塞工作流**:本计划暂定返回 `success=False` 并 warning不抛异常如果希望奔耀仓位状态必须一致可改为抛 `RuntimeError`
3. **后续缓存/重跑能力**:如果要支持 push 早到、忘勾选后重跑复用 `unloadTable`,后续应在 `BioyondPeptideStation` station runtime 上实现 `unload_context_cache`,但本轮不做。
4. **多订单**:本轮只保留 `order_ids` 占位,不实现多订单等待、乱序回调或并发 wait。
---
## 附录 AAPI schema 核对摘要
使用 `temp_benyao/scripts/api_helper.py --root temp_benyao/peptide` 核对:
### A.1 `POST /api/lims/storage/material-info`
请求:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-20T10:50:00.123Z",
"data": "<materialId UUID>"
}
```
响应关键字段:
```json
{
"id": "<materialId UUID>",
"typeName": "样品",
"code": "MAT-001",
"barCode": "BC-001",
"name": "多肽产物",
"quantity": 10,
"lockQuantity": 0,
"unit": "mg",
"status": 1,
"isUse": true,
"locations": [
{
"id": "<locationId UUID>",
"whid": "<warehouse UUID>",
"whName": "自动化堆栈",
"code": "A1",
"x": 1,
"y": 1,
"z": 1,
"quantity": 10
}
],
"detail": []
}
```
注意schema 没有 `posX/posY/posZ`,本轮也不展示坐标。
### A.2 `POST /api/lims/order/take-out`
请求:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-20T10:50:00.123Z",
"data": {
"orderId": "<orderId UUID>",
"preintakeIds": [],
"materialIds": ["<materialId UUID>"]
}
}
```
响应:
```json
{
"code": 1,
"message": null,
"timestamp": 1779255000000,
"data": true
}
```
源码中已有 `BioyondV1RPC.take_out(order_id, preintake_ids=None, material_ids=None)`,本轮复用它。
### A.3 `/report/order_finish`
`/report/order_finish` 不在 Peptide JSON OpenAPI specs 中schema 依据:
- `unilabos/devices/workstation/workstation_http_service.py`
- `temp_benyao/peptide/docs/reference/api_manual.md`
关键字段:
```json
{
"token": "token-from-lims",
"request_time": "2026-05-20 10:50:00.123",
"data": {
"orderCode": "EXP260520-103000",
"orderName": "实验260520-103000",
"startTime": "2026-05-20 09:00:00",
"endTime": "2026-05-20 10:50:00",
"status": "30",
"usedMaterials": [
{
"materialId": "<materialId UUID>",
"locationId": "<locationId UUID>",
"typeMode": "1",
"usedQuantity": 10
}
]
}
}
```
`WorkstationHTTPService` 会把 `usedMaterials[]` 转成 `MaterialUsage` dataclass 列表传给 `process_order_finish_report(report_request, used_materials)`peptide 输出 `used_materials` handle 前需要转回 JSON dict
```json
[
{
"materialId": "<materialId UUID>",
"locationId": "<locationId UUID>",
"typeMode": "1",
"usedQuantity": 10
}
]
```
---
## 附录 B本轮不实现的内容
- 不做 station runtime 的 `unload_context_cache`
- 不做多订单。
- 不做 push 早到后的补救。
- 不做 failed manual_confirm 原地重跑。
- 不改前端。
- 不改 `sample_waste_removal`

View File

@@ -0,0 +1,461 @@
# Peptide Four-Checkbox Reset Plan
Date: 2026-05-21 16:30
Status: Proposal only / not executed
## Scope
This plan replaces `2026-05-21_1556_peptide_reset_sirna_reference_plan.md` for Peptide reset work.
User direction captured here:
- `take_out` is unnecessary for Peptide reset.
- Do not add a material-cache refresh checkbox.
- Change reset to four checkbox-controlled operations:
- 调度器复位
- 订单状态复位
- 库位复位
- 仪器复位
- The first three checkboxes default to checked.
- The fourth checkbox, 仪器复位 / `reset_devices`, defaults to unchecked.
- Replace the current public `reset` action with:
- `reset_auto`: normal ILab action node. This is the renamed/replaced version of the current reset implementation.
- `reset_manual`: manual-confirm action node with a physical cleanup confirmation message.
## Evidence Summary
Current Peptide source:
- Reset action code is currently in `unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py`.
- Current Peptide reset selects `scheduler_reset`, `reset_order_status`, and `reset_location`, and passes ids to order/location resets.
- `BioyondV1RPC.reset_devices()` already calls `/api/lims/device/reset-devices` with only `apiKey` and `requestTime`.
- `BioyondV1RPC.scheduler_reset()` already calls `/api/lims/scheduler/reset` with only `apiKey` and `requestTime`.
- `BioyondV1RPC.reset_order_status(order_id)` and `reset_location(location_id)` currently send `data`, but live probes showed that omitted `data` succeeds.
Live Peptide no-data reset probes using `temp_benyao/peptide/peptide_station_config.example.json`:
- `POST /api/lims/order/reset-order-status` with request keys `["apiKey", "requestTime"]` returned HTTP 200 and `code=1`.
- `POST /api/lims/scheduler/reset` with request keys `["apiKey", "requestTime"]` returned HTTP 200 and `code=1`.
- `POST /api/lims/storage/reset-location` with request keys `["apiKey", "requestTime"]` returned HTTP 200 and `code=1`.
- `reset-devices` was not live-probed in this session, but the current RPC wrapper already sends no `data`.
Raw findings:
- `temp_benyao/peptide/_findings/2026-05-21_1613_reset_order_status_no_data_live.md`
- `temp_benyao/peptide/_findings/2026-05-21_1615_remaining_resets_no_data_live.md`
## Proposed Public Actions
### `reset_auto`
Normal action node. This is the auto/no-manual-confirm path. It replaces the current public `reset` action; do not leave a second public `reset` action unless a later compatibility request explicitly asks for an alias.
Checkbox schema rule:
- Use plain `bool` annotations in the action signature.
- Do not use `Annotated[bool, Field(...)]` for these checkbox params in this implementation plan.
- The current AST registry schema path does not unwrap `Annotated[...]`; plain `bool` is required so generated JSON Schema marks the fields as boolean and the renderer can show checkboxes.
- Put human-facing labels/descriptions in the method docstring or action description. If field-level `Field(description=...)` metadata is required later, add registry `Annotated` support and a schema test as a separate change.
Decorator shape:
```python
@action(
always_free=True,
goal_default={
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
},
description="自动复位调度器/订单状态/库位,可选仪器复位",
)
def reset_auto(
self,
reset_scheduler: bool = True,
reset_order_status: bool = True,
reset_location: bool = True,
reset_devices: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
"""自动复位调度器/订单状态/库位,可选仪器复位。
Args:
reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
"""
...
```
Implementation notes:
- Use real plain-`bool` parameters, not hidden `**kwargs` and not `Annotated`, so the action renderer can expose four checkboxes.
- Rename/replace the existing `reset` action as `reset_auto`; the implementation should not keep the old id-shaped `reset` action as another public path by default.
- Keep the three routine reset defaults checked.
- Keep `reset_devices` unchecked because it can be broader and more disruptive.
- Do not require or resolve order ids or location ids.
- Do not call `take_out`.
- Do not call `refresh_material_cache`.
### `reset_manual`
Manual-confirm node. It should show the operator a physical cleanup warning, then execute the same reset helper as `reset_auto` after the operator confirms.
Actual manual-confirm decorator pattern in this repo:
- Use `@action(node_type=NodeType.MANUAL_CONFIRM)`.
- Set `always_free=True`.
- Add `placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}`.
- Include `timeout_seconds: int` and `assignee_user_ids: list[str]`.
- Add `goal_default` for `timeout_seconds` and `assignee_user_ids`.
- Manual-confirm actions are normally side-effect-light, but existing Peptide `start_experiment` is already a `MANUAL_CONFIRM` action that performs scheduler start after the operator gate, so a reset-after-confirm pattern is compatible with current Peptide style.
Proposed confirmation text:
```text
请确认G3、CEM、Tecan、撕膜机、封膜机、打标机、旋转堆栈上下料位、3个转台等位置的物料已清理完毕
请开门检查冰箱、IDOT、酶标仪、离心机、LCMS内部没有遗留物料。
```
Decorator/function shape:
```python
RESET_MANUAL_CONFIRM_MESSAGE = (
"请确认G3、CEM、Tecan、撕膜机、封膜机、打标机、旋转堆栈上下料位、3个转台等位置的物料已清理完毕\n"
"请开门检查冰箱、IDOT、酶标仪、离心机、LCMS内部没有遗留物料。"
)
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
"physical_cleanup_confirmed": False,
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description=RESET_MANUAL_CONFIRM_MESSAGE,
)
def reset_manual(
self,
reset_scheduler: bool = True,
reset_order_status: bool = True,
reset_location: bool = True,
reset_devices: bool = False,
physical_cleanup_confirmed: bool = False,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""人工确认物理清理后执行复位。
Args:
reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
physical_cleanup_confirmed[物理清理确认]: 确认清理提示中的物料检查已经完成,默认不勾选。
"""
...
```
Execution rule:
- If `physical_cleanup_confirmed` is false, return a blocked result and do not call any reset API.
- If it is true, call the same internal helper as `reset_auto`.
- Return `confirmation_message` in the result payload so call logs preserve the exact operator instruction text.
Renderer caveat:
- `description` should carry the warning in generated action metadata.
- `physical_cleanup_confirmed` must remain a plain `bool` so it renders as a checkbox.
- The cleanup warning should be carried by the action `description` and the docstring param description. Do not rely on `Field(description=...)` unless registry `Annotated` support has been implemented and tested.
- If the current frontend does not show action descriptions or docstring field descriptions reliably, add a read-only string parameter such as `confirmation_message: str = RESET_MANUAL_CONFIRM_MESSAGE` with `goal_default`, or use a handle-based display only after renderer behavior is verified.
## Shared Internal Helper
Both public actions should delegate to one helper, for example:
```python
def _execute_reset_operations(
self,
*,
reset_scheduler: bool,
reset_order_status: bool,
reset_location: bool,
reset_devices: bool,
) -> Dict[str, Any]:
...
```
Call order:
1. `scheduler_reset`
2. `reset_order_status`
3. `reset_location`
4. `reset_devices`
Result shape:
```python
{
"selected_operations": [
{"key": "reset_scheduler", "label": "调度器复位", "selected": True},
{"key": "reset_order_status", "label": "订单状态复位", "selected": True},
{"key": "reset_location", "label": "库位复位", "selected": True},
{"key": "reset_devices", "label": "仪器复位", "selected": False},
],
"executed_calls": [
{"operation": "scheduler_reset", "endpoint": "/api/lims/scheduler/reset", "result": {"code": 1}},
],
"skipped_operations": [
{"operation": "reset_devices", "reason": "checkbox_disabled"},
],
"warnings": [],
}
```
Failure handling:
- Execute selected operations sequentially and record each result.
- If an operation returns non-`1` code, add a warning and continue unless the caller later requests fail-fast.
- If an RPC method raises, catch it, record an error entry, and continue to the next selected operation unless fail-fast is introduced.
## RPC Wrapper Adjustment
Adjust the two id-shaped wrappers to no-data calls:
- `BioyondV1RPC.reset_order_status()` should no longer require `order_id`.
- `BioyondV1RPC.reset_location()` should no longer require `location_id`.
Current no-data wrappers already exist:
- `scheduler_reset()`
- `reset_devices()`
Suggested RPC signatures:
```python
def scheduler_reset(self) -> int: ...
def reset_order_status(self) -> int: ...
def reset_location(self) -> int: ...
def reset_devices(self) -> int: ...
```
Compatibility option:
```python
def reset_order_status(self, order_id: Optional[str] = None) -> int:
del order_id
...
def reset_location(self, location_id: Optional[str] = None) -> int:
del location_id
...
```
This keeps older code from crashing while making the actual wire request no-data.
## Adjusted Runtime API Schemas
These are the schemas Peptide reset code should target at runtime after the live no-data probes. They intentionally omit `data`, even though OpenAPI models nullable `data` for these endpoints.
All four requests use:
```json
{
"apiKey": "string",
"requestTime": "date-time"
}
```
No `data` field should be sent by default.
All four responses use:
```json
{
"code": 1,
"message": "",
"timestamp": 0
}
```
### 调度器复位
Endpoint:
```text
POST /api/lims/scheduler/reset
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-21T08:15:16.494Z"
}
```
Live response:
```json
{
"code": 1,
"message": "",
"timestamp": 1779351316072
}
```
Notes:
- OpenAPI says `data` is nullable int32.
- Live Peptide accepted omitted `data`.
### 订单状态复位
Endpoint:
```text
POST /api/lims/order/reset-order-status
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-21T08:13:34.750Z"
}
```
Live response:
```json
{
"code": 1,
"message": "",
"timestamp": 1779351214422
}
```
Notes:
- OpenAPI says `data` is nullable string.
- Live Peptide accepted omitted `data`.
- Do not model this as order-id scoped unless Bioyond confirms backend behavior.
### 库位复位
Endpoint:
```text
POST /api/lims/storage/reset-location
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-21T08:15:18.924Z"
}
```
Live response:
```json
{
"code": 1,
"message": "",
"timestamp": 1779351318565
}
```
Notes:
- OpenAPI says `data` is nullable string.
- Live Peptide accepted omitted `data`.
- Do not model this as location-id scoped unless Bioyond confirms backend behavior.
### 仪器复位
Endpoint:
```text
POST /api/lims/device/reset-devices
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "date-time"
}
```
Expected response shape:
```json
{
"code": 1,
"message": "",
"timestamp": 0
}
```
Notes:
- OpenAPI says `data` is nullable string.
- Current `BioyondV1RPC.reset_devices()` already sends no `data`.
- This endpoint was not live-probed in the no-data reset session.
- Keep checkbox default unchecked.
## Tests To Add Before Implementation
1. `reset_auto` is not `NodeType.MANUAL_CONFIRM`.
2. `reset_manual` has `node_type=NodeType.MANUAL_CONFIRM`.
3. `reset_manual` metadata includes:
- `always_free=True`
- `placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}`
- `timeout_seconds=3600`
- `assignee_user_ids=[]`
- `physical_cleanup_confirmed=False`
4. Both reset actions expose four real boolean params:
- `reset_scheduler`
- `reset_order_status`
- `reset_location`
- `reset_devices`
5. The generated registry schema marks those reset params as JSON Schema `type: boolean`, not `object` or `string`, so the frontend can render checkboxes.
6. `reset_auto` replaces the current public `reset` action. Unless a later compatibility request adds an alias, no old id-shaped public `reset` action remains.
7. Goal defaults are:
- first three reset checkboxes `True`
- `reset_devices=False`
8. `reset_manual(..., physical_cleanup_confirmed=False)` does not call any RPC reset method.
9. `reset_auto()` with defaults calls:
- `scheduler_reset()`
- `reset_order_status()`
- `reset_location()`
- not `reset_devices()`
10. `reset_auto(reset_devices=True)` also calls `reset_devices()`.
11. `reset_order_status()` and `reset_location()` RPC wrappers send no `data` key.
12. No reset path calls `take_out`.
13. No reset path calls `refresh_material_cache`.
## Non-Goals
- Do not implement `take_out` in reset.
- Do not refresh `material_cache` from reset.
- Do not resolve order ids or location ids for reset.
- Do not add Project/cache/browser cleanup routes.
- Do not make `reset_devices` default-on.
- Do not execute this plan during planning.

View File

@@ -1,5 +1,5 @@
channel_sources:
- robostack,robostack-staging,conda-forge,defaults
- robostack,robostack-staging,conda-forge
gazebo:
- '11'

View File

@@ -1,6 +1,6 @@
package:
name: ros-humble-unilabos-msgs
version: 0.10.19
version: 0.11.1
source:
path: ../../unilabos_msgs
target_directory: src

View File

@@ -1,6 +1,6 @@
package:
name: unilabos
version: "0.10.19"
version: "0.11.1"
source:
path: ../..

View File

@@ -4,7 +4,7 @@ package_name = 'unilabos'
setup(
name=package_name,
version='0.10.19',
version='0.11.1',
packages=find_packages(),
include_package_data=True,
install_requires=['setuptools'],

View File

@@ -1 +1 @@
__version__ = "0.10.19"
__version__ = "0.11.1"

View File

@@ -12,6 +12,15 @@ from typing import Dict, Any, List
import networkx as nx
import yaml
# Windows 中文系统 stdout 默认 GBK无法编码 banner / emoji 日志中的 Unicode 字符
# 强制 stdout/stderr 用 UTF-8避免 print 触发 UnicodeEncodeError 导致进程崩溃
if sys.platform == "win32":
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
except (AttributeError, OSError):
pass
# 首先添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
unilabos_dir = os.path.dirname(os.path.dirname(current_dir))

View File

@@ -2,6 +2,8 @@ import time
import logging
from typing import Union, Dict, Optional
from unilabos.registry.decorators import topic_config
class VirtualMultiwayValve:
"""
@@ -41,13 +43,11 @@ class VirtualMultiwayValve:
def target_position(self) -> int:
return self._target_position
def get_current_position(self) -> int:
"""获取当前阀门位置 📍"""
return self._current_position
def get_current_port(self) -> str:
"""获取当前连接的端口名称 🔌"""
return self._current_position
@property
@topic_config()
def current_port(self) -> str:
"""当前连接的端口名称 🔌"""
return self.port
def set_position(self, command: Union[int, str]):
"""
@@ -169,12 +169,14 @@ class VirtualMultiwayValve:
self._status = "Idle"
self._valve_state = "Closed"
close_msg = f"🔒 阀门已关闭,保持在位置 {self._current_position} ({self.get_current_port()})"
close_msg = f"🔒 阀门已关闭,保持在位置 {self._current_position} ({self.port})"
self.logger.info(close_msg)
return close_msg
def get_valve_position(self) -> int:
"""获取阀门位置 - 兼容性方法 📍"""
@property
@topic_config()
def valve_position(self) -> int:
"""阀门位置 📍"""
return self._current_position
def set_valve_position(self, command: Union[int, str]):
@@ -229,19 +231,16 @@ class VirtualMultiwayValve:
self.logger.info(f"🔄 从端口 {self._current_position} 切换到泵位置...")
return self.set_to_pump_position()
def get_flow_path(self) -> str:
"""获取当前流路路径描述 🌊"""
current_port = self.get_current_port()
@property
@topic_config()
def flow_path(self) -> str:
"""当前流路路径描述 🌊"""
if self._current_position == 0:
flow_path = f"🚰 转移泵已连接 (位置 {self._current_position})"
else:
flow_path = f"🔌 端口 {self._current_position} 已连接 ({current_port})"
# 删除debug日志self.logger.debug(f"🌊 当前流路: {flow_path}")
return flow_path
return f"🚰 转移泵已连接 (位置 {self._current_position})"
return f"🔌 端口 {self._current_position} 已连接 ({self.current_port})"
def __str__(self):
current_port = self.get_current_port()
current_port = self.current_port
status_emoji = "" if self._status == "Idle" else "🔄" if self._status == "Busy" else ""
return f"🔄 VirtualMultiwayValve({status_emoji} 位置: {self._current_position}/{self.max_positions}, 端口: {current_port}, 状态: {self._status})"
@@ -253,7 +252,7 @@ if __name__ == "__main__":
print("🔄 === 虚拟九通阀门测试 === ✨")
print(f"🏠 初始状态: {valve}")
print(f"🌊 当前流路: {valve.get_flow_path()}")
print(f"🌊 当前流路: {valve.flow_path}")
# 切换到试剂瓶11号位
print(f"\n🔌 切换到1号位: {valve.set_position(1)}")

View File

@@ -3,6 +3,7 @@ import logging
import time as time_module
from typing import Dict, Any
from unilabos.registry.decorators import topic_config
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualStirrer:
@@ -314,9 +315,11 @@ class VirtualStirrer:
def min_speed(self) -> float:
return self._min_speed
def get_device_info(self) -> Dict[str, Any]:
"""获取设备状态信息 📊"""
info = {
@property
@topic_config()
def device_info(self) -> Dict[str, Any]:
"""设备状态快照信息 📊"""
return {
"device_id": self.device_id,
"status": self.status,
"operation_mode": self.operation_mode,
@@ -325,12 +328,9 @@ class VirtualStirrer:
"is_stirring": self.is_stirring,
"remaining_time": self.remaining_time,
"max_speed": self._max_speed,
"min_speed": self._min_speed
"min_speed": self._min_speed,
}
# self.logger.debug(f"📊 设备信息: 模式={self.operation_mode}, 速度={self.current_speed} RPM, 搅拌={self.is_stirring}")
return info
def __str__(self):
status_emoji = "" if self.operation_mode == "Idle" else "🌪️" if self.operation_mode == "Stirring" else "🛑" if self.operation_mode == "Settling" else ""
return f"🌪️ VirtualStirrer({status_emoji} {self.device_id}: {self.operation_mode}, {self.current_speed} RPM)"

View File

@@ -4,6 +4,7 @@ from enum import Enum
from typing import Union, Optional
import logging
from unilabos.registry.decorators import topic_config
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
@@ -385,8 +386,10 @@ class VirtualTransferPump:
"""获取当前体积"""
return self._current_volume
def get_remaining_capacity(self) -> float:
"""获取剩余容量"""
@property
@topic_config()
def remaining_capacity(self) -> float:
"""剩余容量 (ml)"""
return self.max_volume - self._current_volume
def is_empty(self) -> bool:

View File

@@ -14,20 +14,30 @@ Virtual Workbench Device - 模拟工作台设备
import logging
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum
from threading import Lock, RLock
from typing import Any, Dict, List, Optional, cast
from typing_extensions import TypedDict
from unilabos.registry.decorators import (
device, action, ActionInputHandle, ActionOutputHandle, DataSource, topic_config, not_action, NodeType
ActionInputHandle,
ActionOutputHandle,
DataSource,
NodeType,
action,
device,
not_action,
topic_config,
)
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, ResourceTreeSet
from unilabos.resources.resource_tracker import (
SampleUUIDsType,
LabSample,
ResourceTreeSet,
)
# ============ TypedDict 返回类型定义 ============
@@ -112,6 +122,7 @@ class HeatingStation:
@device(
id="virtual_workbench",
display_name="虚拟工作台",
category=["virtual_device"],
description="Virtual Workbench with 1 robotic arm and 3 heating stations for concurrent material processing",
)
@@ -137,7 +148,19 @@ class VirtualWorkbench:
HEATING_TIME: float = 60.0 # 加热时间(秒)
NUM_HEATING_STATIONS: int = 3 # 加热台数量
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
def __init__(
self,
device_id: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
**kwargs,
):
"""
初始化虚拟工作台。
Args:
device_id[设备ID]: 工作台设备实例 ID默认使用 virtual_workbench。
config[设备配置]: 可包含 arm_operation_time、heating_time、num_heating_stations。
"""
# 处理可能的不同调用方式
if device_id is None and "id" in kwargs:
device_id = kwargs.pop("id")
@@ -151,9 +174,13 @@ class VirtualWorkbench:
self.data: Dict[str, Any] = {}
# 从config中获取可配置参数
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", self.ARM_OPERATION_TIME))
self.ARM_OPERATION_TIME = float(
self.config.get("arm_operation_time", self.ARM_OPERATION_TIME)
)
self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS))
self.NUM_HEATING_STATIONS = int(
self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS)
)
# 机械臂状态和锁
self._arm_lock = Lock()
@@ -162,7 +189,8 @@ class VirtualWorkbench:
# 加热台状态
self._heating_stations: Dict[int, HeatingStation] = {
i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1)
i: HeatingStation(station_id=i)
for i in range(1, self.NUM_HEATING_STATIONS + 1)
}
self._stations_lock = RLock()
@@ -292,45 +320,113 @@ class VirtualWorkbench:
self.logger.info(f"机械臂已释放 (完成: {task})")
@action(
always_free=True, node_type=NodeType.MANUAL_CONFIRM, placeholder_keys={
"assignee_user_ids": "unilabos_manual_confirm"
}, goal_default={
"timeout_seconds": 3600,
"assignee_user_ids": []
}, feedback_interval=300,
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={"timeout_seconds": 3600, "assignee_user_ids": []},
feedback_interval=300,
handles=[
ActionInputHandle(key="target_device", data_type="device_id",
label="目标设备", data_key="target_device", data_source=DataSource.HANDLE),
ActionInputHandle(key="resource", data_type="resource",
label="待转移资源", data_key="resource", data_source=DataSource.HANDLE),
ActionInputHandle(key="mount_resource", data_type="resource",
label="目标孔位", data_key="mount_resource", data_source=DataSource.HANDLE),
ActionInputHandle(key="collector_mass", data_type="collector_mass",
label="极流体质量", data_key="collector_mass", data_source=DataSource.HANDLE),
ActionInputHandle(key="active_material", data_type="active_material",
label="活性物质含量", data_key="active_material", data_source=DataSource.HANDLE),
ActionInputHandle(key="capacity", data_type="capacity",
label="克容量", data_key="capacity", data_source=DataSource.HANDLE),
ActionInputHandle(key="battery_system", data_type="battery_system",
label="电池体系", data_key="battery_system", data_source=DataSource.HANDLE),
ActionInputHandle(
key="target_device",
data_type="device_id",
label="目标设备",
data_key="target_device",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="resource",
data_type="resource",
label="待转移资源",
data_key="resource",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="mount_resource",
data_type="resource",
label="目标孔位",
data_key="mount_resource",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="collector_mass",
data_type="collector_mass",
label="极流体质量",
data_key="collector_mass",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="active_material",
data_type="active_material",
label="活性物质含量",
data_key="active_material",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="capacity",
data_type="capacity",
label="克容量",
data_key="capacity",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="battery_system",
data_type="battery_system",
label="电池体系",
data_key="battery_system",
data_source=DataSource.HANDLE,
),
# transfer使用
ActionOutputHandle(key="target_device", data_type="device_id",
label="目标设备", data_key="target_device", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="resource", data_type="resource",
label="待转移资源", data_key="resource.@flatten", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="mount_resource", data_type="resource",
label="目标孔位", data_key="mount_resource.@flatten", data_source=DataSource.EXECUTOR),
ActionOutputHandle(
key="target_device",
data_type="device_id",
label="目标设备",
data_key="target_device",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="resource",
data_type="resource",
label="待转移资源",
data_key="resource.@flatten",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="mount_resource",
data_type="resource",
label="目标孔位",
data_key="mount_resource.@flatten",
data_source=DataSource.EXECUTOR,
),
# test使用
ActionOutputHandle(key="collector_mass", data_type="collector_mass",
label="极流体质量", data_key="collector_mass", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="active_material", data_type="active_material",
label="活性物质含量", data_key="active_material", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="capacity", data_type="capacity",
label="克容量", data_key="capacity", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="battery_system", data_type="battery_system",
label="电池体系", data_key="battery_system", data_source=DataSource.EXECUTOR),
]
ActionOutputHandle(
key="collector_mass",
data_type="collector_mass",
label="极流体质量",
data_key="collector_mass",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="active_material",
data_type="active_material",
label="活性物质含量",
data_key="active_material",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="capacity",
data_type="capacity",
label="克容量",
data_key="capacity",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="battery_system",
data_type="battery_system",
label="电池体系",
data_key="battery_system",
data_source=DataSource.EXECUTOR,
),
],
)
def manual_confirm(
self,
@@ -343,67 +439,156 @@ class VirtualWorkbench:
battery_system: List[str],
timeout_seconds: int,
assignee_user_ids: list[str],
**kwargs
**kwargs,
) -> dict:
"""
timeout_seconds: 超时时间默认3600秒
collector_mass: 极流体质量
active_material: 活性物质含量
capacity: 克容量mAh/g
battery_system: 电池体系
修改的结果无效,是只读的
人工确认资源转移和扣电测试参数。
Args:
resource[待转移资源]: 需要人工确认的资源列表。
target_device[目标设备]: 资源要转移到的目标设备 ID。
mount_resource[目标孔位]: 资源要挂载到的目标孔位列表。
collector_mass[极流体质量]: 每个样品对应的极流体质量。
active_material[活性物质含量]: 每个样品对应的活性物质含量。
capacity[克容量]: 每个样品对应的克容量,单位 mAh/g。
battery_system[电池体系]: 每个样品对应的电池体系名称。
timeout_seconds[超时时间]: 人工确认超时时间,单位秒。
assignee_user_ids[确认人]: 指定处理人工确认任务的用户 ID 列表。
Note:
修改的结果无效,是只读的。
"""
resource = ResourceTreeSet.from_plr_resources(resource).dump()
mount_resource = ResourceTreeSet.from_plr_resources(mount_resource).dump()
resource_tree = ResourceTreeSet.from_plr_resources(cast(Any, resource)).dump()
mount_resource_tree = ResourceTreeSet.from_plr_resources(cast(Any, mount_resource)).dump()
kwargs.update(locals())
kwargs.pop("kwargs")
kwargs.pop("self")
kwargs["resource"] = resource_tree
kwargs["mount_resource"] = mount_resource_tree
kwargs.pop("resource_tree")
kwargs.pop("mount_resource_tree")
return kwargs
@action(
description="转移物料",
handles=[
ActionInputHandle(key="target_device", data_type="device_id",
label="目标设备", data_key="target_device", data_source=DataSource.HANDLE),
ActionInputHandle(key="resource", data_type="resource",
label="待转移资源", data_key="resource", data_source=DataSource.HANDLE),
ActionInputHandle(key="mount_resource", data_type="resource",
label="目标孔位", data_key="mount_resource", data_source=DataSource.HANDLE),
]
ActionInputHandle(
key="target_device",
data_type="device_id",
label="目标设备",
data_key="target_device",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="resource",
data_type="resource",
label="待转移资源",
data_key="resource",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="mount_resource",
data_type="resource",
label="目标孔位",
data_key="mount_resource",
data_source=DataSource.HANDLE,
),
],
)
async def transfer(self, resource: List[ResourceSlot], target_device: DeviceSlot, mount_resource: List[ResourceSlot]):
future = ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True,
async def transfer(
self,
resource: List[ResourceSlot],
target_device: DeviceSlot,
mount_resource: List[ResourceSlot],
):
"""
转移资源到目标设备。
Args:
resource[待转移资源]: 待转移的资源列表。
target_device[目标设备]: 接收资源的目标设备 ID。
mount_resource[目标孔位]: 目标设备上的挂载孔位列表。
"""
future = ROS2DeviceNode.run_async_func(
self._ros_node.transfer_resource_to_another,
True,
**{
"plr_resources": resource,
"target_device_id": target_device,
"target_resources": mount_resource,
"sites": [None] * len(mount_resource),
})
},
)
result = await future
return result
@action(
description="扣电测试启动",
handles=[
ActionInputHandle(key="resource", data_type="resource",
label="待转移资源", data_key="resource", data_source=DataSource.HANDLE),
ActionInputHandle(key="mount_resource", data_type="resource",
label="目标孔位", data_key="mount_resource", data_source=DataSource.HANDLE),
ActionInputHandle(key="collector_mass", data_type="collector_mass",
label="极流体质量", data_key="collector_mass", data_source=DataSource.HANDLE),
ActionInputHandle(key="active_material", data_type="active_material",
label="活性物质含量", data_key="active_material", data_source=DataSource.HANDLE),
ActionInputHandle(key="capacity", data_type="capacity",
label="克容量", data_key="capacity", data_source=DataSource.HANDLE),
ActionInputHandle(key="battery_system", data_type="battery_system",
label="电池体系", data_key="battery_system", data_source=DataSource.HANDLE),
]
ActionInputHandle(
key="resource",
data_type="resource",
label="待转移资源",
data_key="resource",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="mount_resource",
data_type="resource",
label="目标孔位",
data_key="mount_resource",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="collector_mass",
data_type="collector_mass",
label="极流体质量",
data_key="collector_mass",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="active_material",
data_type="active_material",
label="活性物质含量",
data_key="active_material",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="capacity",
data_type="capacity",
label="克容量",
data_key="capacity",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="battery_system",
data_type="battery_system",
label="电池体系",
data_key="battery_system",
data_source=DataSource.HANDLE,
),
],
)
async def test(
self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], collector_mass: List[float], active_material: List[float], capacity: List[float], battery_system: list[str]
self,
resource: List[ResourceSlot],
mount_resource: List[ResourceSlot],
collector_mass: List[float],
active_material: List[float],
capacity: List[float],
battery_system: list[str],
):
"""
启动扣电测试。
Args:
resource[待测试资源]: 需要进行扣电测试的资源列表。
mount_resource[测试孔位]: 扣电测试使用的目标孔位列表。
collector_mass[极流体质量]: 每个样品对应的极流体质量。
active_material[活性物质含量]: 每个样品对应的活性物质含量。
capacity[克容量]: 每个样品对应的克容量,单位 mAh/g。
battery_system[电池体系]: 每个样品对应的电池体系名称。
"""
print(resource)
print(mount_resource)
print(collector_mass)
@@ -415,16 +600,11 @@ class VirtualWorkbench:
auto_prefix=True,
description="批量准备物料 - 虚拟起始节点, 生成A1-A5物料, 输出5个handle供后续节点使用",
handles=[
ActionOutputHandle(key="channel_1", data_type="workbench_material",
label="实验1", data_key="material_1", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_2", data_type="workbench_material",
label="实验2", data_key="material_2", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_3", data_type="workbench_material",
label="实验3", data_key="material_3", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_4", data_type="workbench_material",
label="实验4", data_key="material_4", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_5", data_type="workbench_material",
label="实验5", data_key="material_5", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="channel_1", data_type="workbench_material", label="实验1", data_key="material_1", data_source=DataSource.EXECUTOR), # noqa: E501
ActionOutputHandle(key="channel_2", data_type="workbench_material", label="实验2", data_key="material_2", data_source=DataSource.EXECUTOR), # noqa: E501
ActionOutputHandle(key="channel_3", data_type="workbench_material", label="实验3", data_key="material_3", data_source=DataSource.EXECUTOR), # noqa: E501
ActionOutputHandle(key="channel_4", data_type="workbench_material", label="实验4", data_key="material_4", data_source=DataSource.EXECUTOR), # noqa: E501
ActionOutputHandle(key="channel_5", data_type="workbench_material", label="实验5", data_key="material_5", data_source=DataSource.EXECUTOR), # noqa: E501
],
)
def prepare_materials(
@@ -437,6 +617,9 @@ class VirtualWorkbench:
作为工作流的起始节点, 生成指定数量的物料编号供后续节点使用。
输出5个handle (material_1 ~ material_5), 分别对应实验1~5。
Args:
count[物料数量]: 要生成的物料数量,默认生成 5 个。
"""
materials = [i for i in range(1, count + 1)]
@@ -457,7 +640,11 @@ class VirtualWorkbench:
LabSample(
sample_uuid=sample_uuid,
oss_path="",
extra={"material_uuid": content} if isinstance(content, str) else (content.serialize() if content else {}),
extra=(
{"material_uuid": content}
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
],
@@ -467,12 +654,27 @@ class VirtualWorkbench:
auto_prefix=True,
description="将物料从An位置移动到空闲加热台, 返回分配的加热台ID",
handles=[
ActionInputHandle(key="material_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE),
ActionOutputHandle(key="heating_station_output", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="material_number_output", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.EXECUTOR),
ActionInputHandle(
key="material_input",
data_type="workbench_material",
label="物料编号",
data_key="material_number",
data_source=DataSource.HANDLE,
),
ActionOutputHandle(
key="heating_station_output",
data_type="workbench_station",
label="加热台ID",
data_key="station_id",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="material_number_output",
data_type="workbench_material",
label="物料编号",
data_key="material_number",
data_source=DataSource.EXECUTOR,
),
],
)
def move_to_heating_station(
@@ -484,6 +686,9 @@ class VirtualWorkbench:
将物料从An位置移动到加热台
多线程并发调用时, 会竞争机械臂使用权, 并自动查找空闲加热台
Args:
material_number[物料编号]: 要移动的物料编号,对应 A1、A2 等起始位置。
"""
material_id = f"A{material_number}"
task_desc = f"移动{material_id}到加热台"
@@ -546,7 +751,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -569,7 +775,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -581,14 +788,34 @@ class VirtualWorkbench:
always_free=True,
description="启动指定加热台的加热程序",
handles=[
ActionInputHandle(key="station_id_input", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source=DataSource.HANDLE),
ActionInputHandle(key="material_number_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE),
ActionOutputHandle(key="heating_done_station", data_type="workbench_station",
label="加热完成-加热台ID", data_key="station_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="heating_done_material", data_type="workbench_material",
label="加热完成-物料编号", data_key="material_number", data_source=DataSource.EXECUTOR),
ActionInputHandle(
key="station_id_input",
data_type="workbench_station",
label="加热台ID",
data_key="station_id",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="material_number_input",
data_type="workbench_material",
label="物料编号",
data_key="material_number",
data_source=DataSource.HANDLE,
),
ActionOutputHandle(
key="heating_done_station",
data_type="workbench_station",
label="加热完成-加热台ID",
data_key="station_id",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="heating_done_material",
data_type="workbench_material",
label="加热完成-物料编号",
data_key="material_number",
data_source=DataSource.EXECUTOR,
),
],
)
def start_heating(
@@ -599,6 +826,10 @@ class VirtualWorkbench:
) -> StartHeatingResult:
"""
启动指定加热台的加热程序
Args:
station_id[加热台ID]: 要启动加热的加热台编号。
material_number[物料编号]: 当前加热台上的物料编号。
"""
self.logger.info(f"[加热台{station_id}] 开始加热")
@@ -615,7 +846,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -638,7 +870,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -658,7 +891,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -698,7 +932,9 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%")
if time.time() - last_countdown_log >= 5.0:
self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s")
self.logger.info(
f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s"
)
last_countdown_log = time.time()
if elapsed >= self.HEATING_TIME:
@@ -715,7 +951,9 @@ class VirtualWorkbench:
self._active_tasks[material_id]["status"] = "heating_completed"
self._update_data_status(f"加热台{station_id}加热完成")
self.logger.info(f"[加热台{station_id}] {material_id}加热完成 (用时{self.HEATING_TIME}s)")
self.logger.info(
f"[加热台{station_id}] {material_id}加热完成 (用时{self.HEATING_TIME}s)"
)
return {
"success": True,
@@ -729,7 +967,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -740,10 +979,20 @@ class VirtualWorkbench:
auto_prefix=True,
description="将物料从加热台移动到输出位置Cn",
handles=[
ActionInputHandle(key="output_station_input", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source=DataSource.HANDLE),
ActionInputHandle(key="output_material_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source=DataSource.HANDLE),
ActionInputHandle(
key="output_station_input",
data_type="workbench_station",
label="加热台ID",
data_key="station_id",
data_source=DataSource.HANDLE,
),
ActionInputHandle(
key="output_material_input",
data_type="workbench_material",
label="物料编号",
data_key="material_number",
data_source=DataSource.HANDLE,
),
],
)
def move_to_output(
@@ -754,6 +1003,10 @@ class VirtualWorkbench:
) -> MoveToOutputResult:
"""
将物料从加热台移动到输出位置Cn
Args:
station_id[加热台ID]: 已完成加热的加热台编号。
material_number[物料编号]: 要移动到输出位置的物料编号,对应 Cn。
"""
output_number = material_number
@@ -770,7 +1023,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -794,7 +1048,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -814,7 +1069,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()
@@ -896,7 +1152,8 @@ class VirtualWorkbench:
oss_path="",
extra=(
{"material_uuid": content}
if isinstance(content, str) else (content.serialize() if content else {})
if isinstance(content, str)
else (content.serialize() if content else {})
),
)
for sample_uuid, content in sample_uuids.items()

View File

@@ -779,6 +779,49 @@ class BioyondV1RPC(BaseRequest):
return response.get("data", {})
def take_out(
self,
order_id: str,
preintake_ids: list[str] | None = None,
material_ids: list[str] | None = None,
) -> dict:
"""取出订单关联通量/物料
参数:
order_id: 订单ID
preintake_ids: 通量ID列表可为空
material_ids: 物料ID列表可为空
返回值:
dict: 服务端响应包,失败返回空字典
"""
if not order_id:
self._logger.error("取出订单关联通量/物料错误: 缺少订单ID")
return {}
params = {
"orderId": order_id,
"preintakeIds": list(preintake_ids or []),
"materialIds": list(material_ids or []),
}
response = self.post(
url=f'{self.host}/api/lims/order/take-out',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params,
})
if not response:
return {}
if response['code'] != 1:
self._logger.error(f"取出订单关联通量/物料错误: {response.get('message', '')}")
return response
return response
def cancel_order(self, json_str: str) -> bool:
"""取消指定任务

View File

@@ -0,0 +1,459 @@
"""Per-action raw call/response log for Bioyond stations.
When a debug session is active, ``wrap_rpc_http`` replaces a ``BioyondV1RPC``
instance's ``post`` / ``get`` methods with closures that perform the HTTP
transport themselves, capture the request/response details, and append a record
to the active session before returning exactly what ``BaseRequest`` would have
returned. Outside of an active session the wrapped method delegates to the
original (unwrapped) implementation, leaving non-debug behavior intact.
The session writes a Markdown file under ``out_dir`` mirroring the format of
``bioyond_debug_records/2026-04-30_160316_day3_samplefile_only_raw_calls.md``
minus the "Raw Payload Argument" section.
This module has no dependency on ``BioyondV1RPC`` itself; the only contract is
that the wrapped instance descends from ``BaseRequest`` (i.e. has a logger
returned by ``self.get_logger()``).
"""
from __future__ import annotations
import contextvars
import copy
import inspect
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Iterator, List, Optional
import requests
__all__ = [
"CallRecord",
"CallLogContext",
"session",
"wrap_rpc_http",
"active_session",
]
_DEFAULT_TIMEOUT_GET = 30
_DEFAULT_TIMEOUT_POST = 120
@dataclass
class CallRecord:
"""One captured HTTP call inside a debug session."""
index: int
method: str
url: str
path: str
source: str
transport: str
http_status: Optional[int]
request_body: Any
response_body: Any
error: Optional[str] = None
@dataclass
class CallLogContext:
"""State for a single ``session()`` block.
A session lazily creates its file on the first appended record. Actions
that abort before any RPC produce no file.
"""
action: str
out_dir: Path
started_at: datetime
calls: List[CallRecord] = field(default_factory=list)
file_path: Optional[Path] = None
def append(self, record: CallRecord) -> None:
record.index = len(self.calls) + 1
self.calls.append(record)
self._write_file()
# -- file I/O -------------------------------------------------------------
def _resolve_file_path(self) -> Path:
if self.file_path is not None:
return self.file_path
timestamp = self.started_at.strftime("%Y-%m-%d_%H%M%S")
slug = _slugify_action(self.action)
candidate = self.out_dir / f"{timestamp}_{slug}_raw_calls.md"
suffix = 2
while candidate.exists():
candidate = (
self.out_dir
/ f"{timestamp}_{slug}_raw_calls_{suffix:02d}.md"
)
suffix += 1
self.file_path = candidate
return self.file_path
def _write_file(self) -> None:
path = self._resolve_file_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_render_markdown(self), encoding="utf-8")
_active_session: contextvars.ContextVar[Optional[CallLogContext]] = (
contextvars.ContextVar("_active_session", default=None)
)
def active_session() -> Optional[CallLogContext]:
"""Return the currently active :class:`CallLogContext`, if any."""
return _active_session.get()
@contextmanager
def session(action: str, out_dir: Path) -> Iterator[CallLogContext]:
"""Open a per-action debug session.
On entry, sets the module-level ``_active_session`` ContextVar so any
``wrap_rpc_http``'d clients on the same thread/task record their calls.
On exit, the previous active session (if any) is restored.
"""
ctx = CallLogContext(
action=str(action),
out_dir=Path(out_dir),
started_at=datetime.now(),
)
token = _active_session.set(ctx)
try:
yield ctx
finally:
_active_session.reset(token)
def wrap_rpc_http(rpc: Any) -> None:
"""Idempotently wrap ``rpc.post`` / ``rpc.get``.
When a session is active (``_active_session.get() is not None``), the
wrapped methods perform the HTTP call themselves with ``requests`` and
record the call before returning the same value ``BaseRequest`` would have
returned. When no session is active, the wrapped methods delegate to the
original implementation, preserving stock ``BaseRequest`` behavior.
Calling this twice on the same instance is a no-op. The wrapper does not
alter ``rpc.form_post`` (no Sirna action calls it as of plan 3).
"""
if rpc is None:
return
if getattr(rpc, "_debug_call_log_wrapped", False):
return
rpc._orig_post = rpc.post
rpc._orig_get = rpc.get
def _wrapped_post(
url: str,
params: Any = None,
files: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if files is not None:
kwargs["files"] = files
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_post(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="POST",
url=str(url),
path=_url_path(url),
source=source,
transport=_pick_transport(effective_params),
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.post(
url,
data=json.dumps(effective_params) if effective_params else None,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_POST,
files=files,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {response.request.body} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
else:
record.error = f"HTTP {response.status_code}: {response.text}"
try:
rpc.get_logger().error(
f"Request ERROR: ('Request ERROR:', {response.text!r})"
)
except Exception:
pass
return_value = None
ctx.append(record)
return return_value
def _wrapped_get(
url: str,
params: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_get(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="GET",
url=str(url),
path=_url_path(url),
source=source,
transport="params",
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.get(
url,
params=effective_params,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_GET,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {effective_params} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
ctx.append(record)
return return_value
rpc.post = _wrapped_post
rpc.get = _wrapped_get
rpc._debug_call_log_wrapped = True
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_URL_PATH_RE = re.compile(r"https?://[^/]+(/.*)?$")
_SLUG_RE = re.compile(r"[^A-Za-z0-9._-]+")
def _slugify_action(action: str) -> str:
slug = _SLUG_RE.sub("_", str(action)).strip("_")
return slug or "action"
def _url_path(url: Any) -> str:
text = str(url or "")
match = _URL_PATH_RE.match(text)
if match and match.group(1):
return match.group(1)
if text.startswith("/"):
return text
return text
def _pick_transport(params: Any) -> str:
if isinstance(params, dict) and "data" in params:
return "data"
return "params"
def _detect_source(rpc: Any) -> str:
"""Walk the call stack to find the outermost frame whose ``self`` is rpc."""
try:
stack = inspect.stack()
except Exception:
return ""
candidate = ""
try:
for frame_info in stack:
frame = frame_info.frame
if frame.f_locals.get("self", None) is rpc:
candidate = frame_info.function
return candidate
finally:
del stack
def _redact(params: Any) -> Any:
"""Return a copy of ``params`` with ``apiKey`` redacted."""
try:
cloned = copy.deepcopy(params)
except Exception:
return params
_redact_in_place(cloned)
return cloned
def _redact_in_place(value: Any) -> None:
if isinstance(value, dict):
for key in list(value.keys()):
if isinstance(key, str) and key.lower() == "apikey":
value[key] = "<redacted>"
else:
_redact_in_place(value[key])
elif isinstance(value, list):
for item in value:
_redact_in_place(item)
def _decode_response_body(response: Any) -> tuple[Any, Optional[str]]:
"""Best-effort response decoding used for both record + return value."""
text = getattr(response, "text", "")
try:
return response.json(), None
except Exception as exc:
if text:
return {"raw_text": text}, str(exc)
return None, str(exc)
# ---------------------------------------------------------------------------
# Markdown rendering
# ---------------------------------------------------------------------------
def _render_markdown(ctx: CallLogContext) -> str:
title = f"# {ctx.action} Raw Call/Response Log"
parts: List[str] = [title, ""]
parts.append("## LIMS Calls")
parts.append("")
parts.append("| # | Method | Path | Source | HTTP |")
parts.append("|---|---|---|---|---|")
for record in ctx.calls:
anchor = _row_anchor(record)
http = (
f"`{record.http_status}`"
if record.http_status is not None
else "`-`"
)
parts.append(
f"| [{record.index}](#{anchor}) | `{record.method}` | "
f"`{record.path}` | `{record.source}` | {http} |"
)
parts.append("")
for record in ctx.calls:
parts.append(f"## {record.index} {record.method} {record.path}")
parts.append("")
parts.append(f"- Source: `{record.source}`")
parts.append(f"- Transport: `{record.transport}`")
if record.http_status is not None:
parts.append(f"- HTTP status: `{record.http_status}`")
else:
parts.append("- HTTP status: `-`")
if record.error:
parts.append(f"- Error: {record.error}")
parts.append("")
parts.append("### Request Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.request_body))
parts.append("```")
parts.append("")
parts.append("### Response Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.response_body))
parts.append("```")
parts.append("")
return "\n".join(parts).rstrip() + "\n"
def _row_anchor(record: CallRecord) -> str:
"""Build a GitHub-style anchor matching ``## N METHOD /path``."""
raw = f"{record.index}-{record.method}-{record.path}"
raw = raw.lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw)
return raw.strip("-")
def _to_json_block(value: Any) -> str:
try:
return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=True)
except TypeError:
return json.dumps(str(value), ensure_ascii=False, indent=2)

View File

@@ -0,0 +1,3 @@
from .peptide_station import BioyondPeptideStation, fetch_workflow_list, load_peptide_config
__all__ = ["BioyondPeptideStation", "fetch_workflow_list", "load_peptide_config"]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,681 @@
"""多肽站 AST/参数/结果表 离线契约测试。"""
from __future__ import annotations
import importlib
import inspect
import json
import sys
from pathlib import Path
from typing import Any, Dict, List
from unittest.mock import MagicMock
import pytest
REPO_ROOT = Path(__file__).resolve().parents[6]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
MODULE_PATH = "unilabos.devices.workstation.bioyond_studio.peptide_station.peptide_station"
CLASS_NAME = "BioyondPeptideStation"
ORDER_GUID = "3a20eabe-bad5-ef95-49bd-7ffbd5df189d"
CREATE_ALLOCATION = {
ORDER_GUID: [
{
"materialId": "mat-tip",
"materialName": "200μL枪头盒",
"materialCode": "0008-00105",
"quantity": "1个",
"materialTypeMode": "Consumables",
"locationCode": "1-01",
"locationShowName": "1-01",
},
{
"materialId": "mat-plate",
"materialName": "96孔板",
"materialCode": "PLATE-96",
"quantity": "1",
"materialTypeMode": "Sample",
"locationCode": "A1",
"locationShowName": "A1-show",
},
{
"materialId": "mat-extra",
"materialName": "未知耗材",
"materialCode": "X-1",
"quantity": "2",
"materialTypeMode": "Future",
"locationCode": "Z9",
"locationShowName": "",
},
]
}
FLATTENED_LIVE = [
{"step": "39c78d4b-b5d3-f721-2001-9d52000084c3", "step_name": "S1", "Key": "SampleFile", "m": 0, "n": 0, "Value": "", "DisplayValue": "", "TaskDisplayable": 1},
{"step": "39c78d4b-b5d3-f721-2001-9d52000084c3", "step_name": "S1", "Key": "Example", "m": 0, "n": 0, "Value": "x", "DisplayValue": "x", "TaskDisplayable": 1},
{"step": "39c78d4b-b5d3-f721-2001-9d52000084c4", "step_name": "S2", "Key": "protocol", "m": 14, "n": 28, "Value": "", "DisplayValue": "", "TaskDisplayable": 1},
{"step": "39c78d4b-b5d3-f721-2001-9d52000084c5", "step_name": "S3", "Key": "CEMMethodFileName", "m": 0, "n": 0, "Value": "", "DisplayValue": "", "TaskDisplayable": 1},
]
def _import_module() -> Any:
return importlib.import_module(MODULE_PATH)
def _make_station() -> Any:
module = _import_module()
cls = getattr(module, CLASS_NAME)
station = object.__new__(cls)
station.bioyond_config = {"api_host": "http://test", "api_key": "k", "warehouse_mapping": {}}
rpc = MagicMock()
rpc.host = "http://test"
rpc.api_key = "k"
rpc.material_info.return_value = {"locations": [{"whName": "自动化堆栈", "code": "1-01"}]}
station.hardware_interface = rpc
return station
# ---------------------------------------------------------------------------
# 1. AST/导入面
# ---------------------------------------------------------------------------
def test_required_actions_exposed() -> None:
cls = getattr(_import_module(), CLASS_NAME)
required = {
"upload_sample_excel",
"list_sample_excels",
"get_step_parameters",
"submit_experiment",
"submit_experiment_day1",
"submit_experiment_day2",
"submit_experiment_day3",
"submit_experiment_day4",
"submit_experiment_day4_LCMS",
"start_experiment",
"reset",
"scheduler_start",
"scheduler_stop",
"scheduler_pause",
"scheduler_continue",
"get_order_list",
"get_order_report",
"get_aggregated_order_report",
}
have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)}
missing = sorted(required - have)
assert not missing, f"缺少动作: {missing}"
def test_manual_confirm_node_types() -> None:
module = _import_module()
cls = getattr(module, CLASS_NAME)
manual = {"submit_experiment_day1", "start_experiment"}
normal = {
"submit_experiment",
"submit_experiment_day2",
"submit_experiment_day3",
"submit_experiment_day4",
"submit_experiment_day4_LCMS",
"reset",
"scheduler_start",
"list_sample_excels",
"get_step_parameters",
"get_order_list",
"get_order_report",
}
for name in manual:
meta = getattr(getattr(cls, name), "_action_registry_meta", {})
assert meta.get("node_type") == module.NodeType.MANUAL_CONFIRM, name
for name in normal:
meta = getattr(getattr(cls, name), "_action_registry_meta", {})
assert meta.get("node_type") != module.NodeType.MANUAL_CONFIRM, name
def test_submit_and_reset_signatures_exclude_legacy_manual_confirm() -> None:
cls = getattr(_import_module(), CLASS_NAME)
for name in (
"submit_experiment",
"submit_experiment_day2",
"submit_experiment_day3",
"submit_experiment_day4",
"submit_experiment_day4_LCMS",
"reset",
):
params = inspect.signature(getattr(cls, name)).parameters
assert "timeout_seconds" not in params, name
assert "assignee_user_ids" not in params, name
def test_day1_submit_accepts_manual_confirm_kwargs() -> None:
"""plan: Day1 是 MANUAL_CONFIRM框架会注入 timeout_seconds/assignee_user_ids函数必须能接收。"""
cls = getattr(_import_module(), CLASS_NAME)
sig = inspect.signature(cls.submit_experiment_day1)
has_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values())
assert has_kwargs, "submit_experiment_day1 必须有 **kwargs 以容纳人工确认框架字段"
def test_typed_dicts_present() -> None:
module = _import_module()
for cls_name in (
"PeptideGenericSubmitRequiredParams",
"PeptideGenericSubmitOptionalParams",
"PeptideDay1RequiredParams",
"PeptideDay1OptionalParams",
"PeptideDay2RequiredParams",
"PeptideDay2OptionalParams",
"PeptideDay3RequiredParams",
"PeptideDay3OptionalParams",
"PeptideDay4RequiredParams",
"PeptideDay4OptionalParams",
"PeptideDay4LCMSRequiredParams",
"PeptideDay4LCMSOptionalParams",
):
assert hasattr(module, cls_name), cls_name
def test_workflow_constants_split() -> None:
module = _import_module()
assert module.DAY4_PEPTIDE_WORKFLOW_NAME == "Day4环肽酰化-酶标"
assert module.DAY4_LCMS_PEPTIDE_WORKFLOW_NAME == "Day4环肽酰化-酶标+LCMS"
assert module.DAY_WORKFLOW_BINDINGS["day4_lcms"]["sub_name"] == "Day4环肽酰化-酶标LCMS"
assert module.DAY1_CEM_METHOD_DEFAULT == "5microdouble-20250911.MPM"
# ---------------------------------------------------------------------------
# 2. Sample Excel
# ---------------------------------------------------------------------------
def test_list_sample_excels_modes() -> None:
station = _make_station()
records = [
{"fileName": "DPR019-a.xlsx", "relativePath": "upload\\sample\\DPR019-a.xlsx"},
{"fileName": "DPR019-b.xlsx", "relativePath": "upload\\sample\\DPR019-b.xlsx"},
]
station._list_sample_excels = MagicMock(return_value=records) # type: ignore[method-assign]
info = station.list_sample_excels(sample_excel_pattern="DPR019-a", deterministic_resolve=False)
assert "sample_excel_data" in info
assert "sample_excel_relative_path" not in info
resolved = station.list_sample_excels(sample_excel_pattern="DPR019-a", deterministic_resolve=True)
assert resolved["sample_excel_relative_path"] == "upload\\sample\\DPR019-a.xlsx"
with pytest.raises(Exception):
station.list_sample_excels(sample_excel_pattern="DPR019", deterministic_resolve=True)
def test_resolve_submit_sample_file_direct_path() -> None:
station = _make_station()
relative, selected = station._resolve_submit_sample_file({}, {}, "upload/sample/x.xlsx")
assert relative == "upload\\sample\\x.xlsx"
assert selected["fileName"] == "x.xlsx"
def test_filename_matches_pattern_substring_and_glob() -> None:
station = _make_station()
assert station._filename_matches_pattern("DPR019-20260421-thrombin-5.xlsx", "DPR019")
assert station._filename_matches_pattern("a.xlsx", "*.xlsx")
assert not station._filename_matches_pattern("a.xlsx", "*.docx")
assert station._filename_matches_pattern("a.xlsx", "")
# ---------------------------------------------------------------------------
# 3. Step parameter helper
# ---------------------------------------------------------------------------
def test_filter_step_parameters_preserves_zero_and_skips_unknown() -> None:
station = _make_station()
records = [
{"TaskDisplayable": 1, "Value": 0, "DisplayValue": ""},
{"TaskDisplayable": 1, "Value": "", "DisplayValue": ""},
{"TaskDisplayable": 0, "Value": "", "DisplayValue": ""},
{"TaskDisplayable": None, "Value": "", "DisplayValue": ""},
]
filtered = station._filter_step_parameter_records(records, True, True, True)
assert {(r.get("Value"), r.get("TaskDisplayable")) for r in filtered} == {(0, 1), ("", 1), ("", 0)}
def test_get_step_parameters_zero_match_returns_status() -> None:
station = _make_station()
station._query_workflow_records = MagicMock(return_value=[]) # type: ignore[method-assign]
out = station.get_step_parameters(workflow_name_filter="不存在")
status = out["step_parameters_raw_json"]
assert status.get("code") == -1
assert out["filtered_subworkflows"] == []
def test_get_step_parameters_multi_match_returns_status() -> None:
station = _make_station()
station._query_workflow_records = MagicMock(return_value=[ # type: ignore[method-assign]
{"workflowId": "w1", "workflowName": "A", "subworkflowId": "s1", "subworkflowName": "A1"},
{"workflowId": "w1", "workflowName": "A", "subworkflowId": "s2", "subworkflowName": "A2"},
])
out = station.get_step_parameters(workflow_name_filter="A")
assert out["step_parameters_raw_json"].get("code") == 0
assert len(out["filtered_subworkflows"]) == 2
def test_get_step_parameters_direct_sub_workflow_id() -> None:
station = _make_station()
station._query_step_parameters = MagicMock(return_value={ # type: ignore[method-assign]
"39c78d4b-b5d3-f721-2001-9d52000084c3": [
{"name": "S1", "m": 0, "n": 0, "parameterList": [
{"Key": "SampleFile", "TaskDisplayable": 1, "Value": "", "DisplayValue": ""},
]},
]
})
out = station.get_step_parameters(sub_workflow_id="39c78d4b-b5d3-f721-2001-9d52000084c3")
augmented = out["step_parameters_raw_json"]
assert augmented["code"] == 1
assert any(p["Key"] == "SampleFile" for p in augmented["data"]["filteredParameters"])
# ---------------------------------------------------------------------------
# 4. Partial parameter entries + live resolution
# ---------------------------------------------------------------------------
def test_partial_entries_inject_samplefile_and_overrides() -> None:
station = _make_station()
entries, warnings = station._build_partial_parameter_entries(
sample_excel_relative_path="upload\\sample\\f.xlsx",
day_key="day2",
parameter_overrides=[{"Key": "Example", "Value": 0}],
)
assert entries[0] == {"Key": "SampleFile", "Value": "upload\\sample\\f.xlsx"}
assert any(e["Key"] == "Example" and e["Value"] == 0 for e in entries)
assert warnings == []
def test_day1_partial_entries_inject_cem_default() -> None:
station = _make_station()
entries, _ = station._build_partial_parameter_entries(
sample_excel_relative_path="upload\\sample\\f.xlsx",
day_key="day1",
extra_autofill=[{"Key": "CEMMethodFileName", "Value": "5microdouble-20250911.MPM"}],
)
assert any(e["Key"] == "CEMMethodFileName" and e["Value"] == "5microdouble-20250911.MPM" for e in entries)
def test_overrides_duplicate_last_write_wins_warning() -> None:
station = _make_station()
entries, warnings = station._build_partial_parameter_entries(
sample_excel_relative_path="x",
day_key="day2",
parameter_overrides=[
{"Key": "Example", "m": 0, "n": 0, "Value": "first"},
{"Key": "Example", "m": 0, "n": 0, "Value": "second"},
],
)
example_entries = [e for e in entries if e["Key"] == "Example"]
assert len(example_entries) == 1
assert example_entries[0]["Value"] == "second"
assert any("重复" in w for w in warnings)
def test_resolve_against_live_unique_match_and_failure() -> None:
station = _make_station()
resolved = station._resolve_parameter_entries_against_live_steps(
[{"Key": "SampleFile", "Value": "upload\\sample\\f.xlsx"}], FLATTENED_LIVE
)
assert resolved[0]["step"] == "39c78d4b-b5d3-f721-2001-9d52000084c3"
assert resolved[0]["m"] == 0 and resolved[0]["n"] == 0
# 没有 protocol 在 m/n=0/0 处 → 0 匹配
with pytest.raises(Exception) as exc:
station._resolve_parameter_entries_against_live_steps(
[{"Key": "protocol", "m": 0, "n": 0, "Value": "v"}], FLATTENED_LIVE
)
assert "0 条" in str(exc.value)
def test_group_resolved_entries_uses_lowercase_keys() -> None:
station = _make_station()
grouped = station._group_resolved_entries_to_param_values([
{"step": "39c78d4b-b5d3-f721-2001-9d52000084c3", "Key": "SampleFile", "m": 0, "n": 0, "Value": "x"},
])
step_entries = grouped["39c78d4b-b5d3-f721-2001-9d52000084c3"]
assert step_entries[0] == {"key": "SampleFile", "value": "x", "m": 0, "n": 0}
def test_create_order_payload_shape() -> None:
station = _make_station()
payload = station._create_order_payload(
order_code="EXP260518-103000",
order_name="实验260518-103000",
sub_workflow_id="3a1d35f9-63ce-67d6-1784-9f6abcca4eda",
param_values={"39c78d4b-b5d3-f721-2001-9d52000084c3": [{"key": "SampleFile", "value": "x", "m": 0, "n": 0}]},
border_number=1,
extend_properties=None,
)
assert isinstance(payload, list) and len(payload) == 1
item = payload[0]
assert item["workFlowId"] == "3a1d35f9-63ce-67d6-1784-9f6abcca4eda"
assert item["paramValues"]
assert item["extendProperties"] == ""
assert item["borderNumber"] == 1
def test_order_identity_format() -> None:
station = _make_station()
code, name = station._build_order_identity("day2")
assert code.startswith("EXP") and len(code) == 16 # EXP + YYMMDD-HHmmss
assert name.startswith("实验")
code2, name2 = station._build_order_identity("day2", "自定义")
assert name2 == "自定义"
# ---------------------------------------------------------------------------
# 5. Generic submit / day wrappers (含会抦住 BUG 1 的用例)
# ---------------------------------------------------------------------------
def _wire_submit_pipeline(station: Any) -> None:
station._resolve_workflow_binding_from_names = MagicMock(return_value={ # type: ignore[method-assign]
"workflow_name": "DAY2多肽定量",
"root_workflow_id": "3a1d35f0-9436-895b-2eda-039a5465275e",
"sub_workflow_id": "3a1d35f0-9f7e-c2c1-0bc0-8d94b81d90ca",
"sub_workflow_name": "DAY2多肽定量",
"raw": {},
})
station._resolve_workflow_binding = MagicMock(side_effect=lambda day_key: station._resolve_workflow_binding_from_names("DAY2多肽定量")) # type: ignore[method-assign]
station._query_step_parameters = MagicMock(return_value={}) # type: ignore[method-assign]
station._flatten_step_parameters = MagicMock(return_value=FLATTENED_LIVE) # type: ignore[method-assign]
station._create_order = MagicMock(return_value=json.dumps(CREATE_ALLOCATION)) # type: ignore[method-assign]
def test_submit_experiment_generic_succeeds() -> None:
"""plan §「Generic And Day 1 Submit」line 919-924这条同时抦住 BUG 1binding= 关键字)。"""
station = _make_station()
_wire_submit_pipeline(station)
result = station.submit_experiment(
{"workflow_name": "DAY2多肽定量", "sample_excel_pattern": ""},
{"parameter_overrides": []},
sample_excel_relative_path="upload/sample/f.xlsx",
)
assert result["success"] is True
assert result["order_id"] == ORDER_GUID
assert result["resultTable"]["tableName"] == "resultTable"
def test_submit_experiment_rejects_day1_alias() -> None:
station = _make_station()
with pytest.raises(Exception):
station.submit_experiment(
{"workflow_name": "Day1线肽合成", "sample_excel_pattern": "x"},
{},
sample_excel_relative_path="upload/sample/f.xlsx",
)
def test_submit_experiment_day2_calls_pipeline() -> None:
station = _make_station()
_wire_submit_pipeline(station)
result = station.submit_experiment_day2(
{"sample_excel_pattern": ""},
{"parameter_overrides": []},
sample_excel_relative_path="upload/sample/f.xlsx",
)
assert result["success"] is True
assert result["order_ids"] == [ORDER_GUID]
assert result["auto_register_materials"] is True
assert result["material_registration"]["status"] == "not_implemented"
def test_day1_placeholder_does_not_call_create_order() -> None:
station = _make_station()
station._resolve_workflow_binding = MagicMock(return_value={ # type: ignore[method-assign]
"workflow_name": "Day1线肽合成",
"root_workflow_id": "rid",
"sub_workflow_id": "sid",
"sub_workflow_name": "Day1线肽合成",
"raw": {},
})
station._create_order = MagicMock(side_effect=AssertionError("Day1 不应触达 create_order")) # type: ignore[method-assign]
out = station.submit_experiment_day1(
{"sample_excel_pattern": "", "cem_method_file_name": ""},
{},
sample_excel_relative_path="upload/sample/f.xlsx",
# 模拟人工确认框架注入的字段(这条会抦住 BUG 3
timeout_seconds=3600,
assignee_user_ids=[],
materials_loaded=False,
)
assert out["status"] == "manual_confirm_placeholder"
assert out["cem_method_file_name"] == "5microdouble-20250911.MPM"
assert isinstance(out["partial_parameter_entries"], list)
# ---------------------------------------------------------------------------
# 6. Allocation map parsing + resultTable
# ---------------------------------------------------------------------------
def test_parse_allocation_map_extracts_order_id_and_groups() -> None:
station = _make_station()
parsed = station._parse_create_order_allocation_map(json.dumps(CREATE_ALLOCATION))
assert parsed["order_ids"] == [ORDER_GUID]
assert len(parsed["allocation_rows"]) == 3
assert set(parsed["materials_by_type"].keys()) == {"Consumables", "Sample", "Future"}
def test_parse_allocation_map_handles_python_str_repr() -> None:
"""RPC.create_order 返回的是 str(dict),含单引号。"""
station = _make_station()
parsed = station._parse_create_order_allocation_map(str(CREATE_ALLOCATION))
assert parsed["order_ids"] == [ORDER_GUID]
def test_parse_allocation_map_empty() -> None:
station = _make_station()
parsed = station._parse_create_order_allocation_map("{}")
assert parsed["allocation_rows"] == []
assert parsed["order_ids"] == []
def test_build_result_table_order_and_columns() -> None:
station = _make_station()
parsed = station._parse_create_order_allocation_map(json.dumps(CREATE_ALLOCATION))
table = station._build_result_table(parsed["materials_by_type"])
assert table["tableName"] == "resultTable"
assert [c["key"] for c in table["columns"]] == ["whName", "locationCode", "materialName", "quantity"]
# 顺序Sample → Consumables → Future未知 mode 保留在末尾)
names = [row["materialName"] for row in table["data"]]
assert names == ["96孔板", "200μL枪头盒", "未知耗材"]
# locationShowName 优先 locationCode
assert table["data"][0]["locationCode"] == "A1-show"
assert table["data"][1]["locationCode"] == "1-01"
def test_build_result_table_empty_returns_empty_data() -> None:
station = _make_station()
table = station._build_result_table({})
assert table["data"] == []
assert [c["key"] for c in table["columns"]] == ["whName", "locationCode", "materialName", "quantity"]
def test_resolve_wh_name_handles_material_info_failure() -> None:
station = _make_station()
station.hardware_interface.material_info.side_effect = RuntimeError("HTTP 500")
cache: Dict[str, Dict[str, Any]] = {}
assert station._resolve_wh_name_by_material_id("mat-1", cache) == ""
def test_submit_returns_warning_when_allocation_empty() -> None:
station = _make_station()
_wire_submit_pipeline(station)
station._create_order = MagicMock(return_value="{}") # type: ignore[method-assign]
result = station.submit_experiment_day2(
{"sample_excel_pattern": ""},
{},
sample_excel_relative_path="upload/sample/f.xlsx",
)
assert "create_order_allocation_unavailable_for_result_table" in result["warnings"]
# ---------------------------------------------------------------------------
# 7. Reports + workflow records
# ---------------------------------------------------------------------------
def test_get_order_list_passes_json_string() -> None:
station = _make_station()
station.hardware_interface.order_query.return_value = {"items": [], "totalCount": 0}
station.get_order_list(filter_text="abc", page_count=10)
args, kwargs = station.hardware_interface.order_query.call_args
payload = json.loads(args[0])
assert payload["filter"] == "abc"
assert payload["pageCount"] == 10
def test_get_order_report_calls_typed_rpc() -> None:
station = _make_station()
station.hardware_interface.order_report.return_value = {"id": ORDER_GUID, "name": "x", "preIntakes": [], "resultList": []}
out = station.get_order_report(ORDER_GUID)
station.hardware_interface.order_report.assert_called_once_with(ORDER_GUID)
assert out["success"] is True
assert out["summary"]["id"] == ORDER_GUID
def test_get_aggregated_order_report_is_todo_placeholder() -> None:
station = _make_station()
out = station.get_aggregated_order_report(ORDER_GUID)
assert out["status"] == "not_implemented"
def test_query_workflow_records_filters_unsaved_subworkflows() -> None:
station = _make_station()
station.hardware_interface.query_workflow.return_value = {
"items": [
{
"id": "rid",
"name": "Day3线肽环化",
"subWorkflows": [
{"id": "saved-id", "name": "Day3线肽环化", "isSaved": True},
{"id": "draft-id", "name": "Day3线肽环化-草稿", "isSaved": False},
],
}
]
}
records = station._query_workflow_records("Day3线肽环化")
assert [r["subworkflowId"] for r in records] == ["saved-id"]
# ---------------------------------------------------------------------------
# 8. Debug / fetch_workflow_list 守护
# ---------------------------------------------------------------------------
def test_module_fetch_workflow_list_is_debug_guarded() -> None:
module = _import_module()
assert module.DEBUG_CLI_ENABLED is False
with pytest.raises(AssertionError):
module.fetch_workflow_list(config={"api_host": "http://x", "api_key": "k"})
def test_station_fetch_workflow_list_uses_rpc() -> None:
station = _make_station()
station.hardware_interface.query_workflow.return_value = {"items": [], "totalCount": 0}
station.fetch_workflow_list(filter_text="Day2")
args, _ = station.hardware_interface.query_workflow.call_args
payload = json.loads(args[0])
assert payload["filter"] == "Day2"
assert payload["includeDetail"] is True
# ---------------------------------------------------------------------------
# 9. start_experiment 装载闸门
# ---------------------------------------------------------------------------
def test_start_experiment_blocks_when_materials_not_loaded() -> None:
station = _make_station()
station.hardware_interface.scheduler_start.return_value = 1
with pytest.raises(RuntimeError):
station.start_experiment(
order_id=ORDER_GUID,
resultTable={"data": [{"materialName": "x"}]},
materials_loaded=False,
)
def test_start_experiment_starts_when_table_empty() -> None:
station = _make_station()
station.hardware_interface.scheduler_start.return_value = 1
result = station.start_experiment(order_id=ORDER_GUID, resultTable={"data": []})
assert result["success"] is True
assert result["order_ids"] == [ORDER_GUID]
# ---------------------------------------------------------------------------
# 10. Reset
# ---------------------------------------------------------------------------
def test_reset_signature_drops_legacy_params_and_uses_literal() -> None:
"""plan 调整:删除 dry_run/order_id/location_idreset_operations 用 Literal 注解。"""
cls = getattr(_import_module(), CLASS_NAME)
sig = inspect.signature(cls.reset)
params = sig.parameters
for legacy in ("dry_run", "order_id", "location_id"):
assert legacy not in params, f"reset 不应再有 {legacy} 入参"
assert "reset_operations" in params
assert any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()), \
"reset 必须保留 **kwargs 以兜底 reset_order_id/reset_location_id"
annotation = params["reset_operations"].annotation
rendered = annotation if isinstance(annotation, str) else repr(annotation)
for op in ("scheduler_reset", "reset_order_status", "reset_location"):
assert op in rendered, f"reset_operations 的 Literal 必须包含 {op}"
def test_reset_goal_default_contains_all_operations() -> None:
"""像 sirna 一样goal_default 默认勾选全部三个 reset 操作。"""
cls = getattr(_import_module(), CLASS_NAME)
meta = getattr(cls.reset, "_action_registry_meta", {})
goal_default = meta.get("goal_default") or {}
assert goal_default.get("reset_operations") == [
"scheduler_reset",
"reset_order_status",
"reset_location",
]
def test_reset_executes_typed_rpc_calls() -> None:
station = _make_station()
station.hardware_interface.scheduler_reset.return_value = 1
station.hardware_interface.reset_order_status.return_value = 1
station.hardware_interface.reset_location.return_value = 1
out = station.reset(
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"],
reset_order_id=ORDER_GUID,
reset_location_id="loc-1",
)
station.hardware_interface.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_called_once_with(ORDER_GUID)
station.hardware_interface.reset_location.assert_called_once_with("loc-1")
assert out["selected_operations"] == [
"scheduler_reset",
"reset_order_status",
"reset_location",
]
assert len(out["executed_calls"]) == 3
assert out["skipped_operations"] == []
def test_reset_skips_when_ids_missing() -> None:
"""没有 order_id / location_id 时应该 skip 而不是抛错。"""
station = _make_station()
station.hardware_interface.scheduler_reset.return_value = 1
out = station.reset(
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"],
)
station.hardware_interface.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_not_called()
station.hardware_interface.reset_location.assert_not_called()
skipped_ops = {item["operation"] for item in out["skipped_operations"]}
assert skipped_ops == {"reset_order_status", "reset_location"}

View File

@@ -7,6 +7,7 @@ Bioyond Workstation Implementation
import time
import traceback
import threading
from contextlib import contextmanager
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
import json
@@ -14,6 +15,7 @@ from pathlib import Path
from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
from unilabos.devices.workstation.bioyond_studio import debug_call_log
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.resources.warehouse import WareHouse
from unilabos.utils.log import logger
@@ -54,13 +56,17 @@ class ConnectionMonitor:
def _monitor_loop(self):
while self._running:
try:
# 使用 lightweight API 检查连接
# query_matial_type_list 是比较快的查询
start_time = time.time()
result = self.workstation.hardware_interface.material_type_list()
# 使用轻量级调度状态接口检查连接,避免启动时打印完整物料类型列表。
result = self.workstation.hardware_interface.scheduler_status()
status = "online" if result else "offline"
msg = "Connection established" if status == "online" else "Failed to get material type list"
if status == "online":
msg = (
f"Scheduler status={result.get('status')}, "
f"hasTask={result.get('hasTask')}"
)
else:
msg = "Failed to get scheduler status"
if status != self._last_status:
logger.info(f"Bioyond连接状态变更: {self._last_status} -> {status}")
@@ -174,6 +180,8 @@ class BioyondResourceSynchronizer(ResourceSynchronizer):
logger.warning("从Bioyond获取的物料数据为空")
return False
self._update_material_cache_from_stock(all_bioyond_data)
# 转换为UniLab格式
unilab_resources = resource_bioyond_to_plr(
all_bioyond_data,
@@ -187,6 +195,29 @@ class BioyondResourceSynchronizer(ResourceSynchronizer):
logger.error(f"从Bioyond同步物料数据失败: {e}")
return False
def _update_material_cache_from_stock(self, materials: List[Dict[str, Any]]) -> None:
"""用本次库存查询结果同步 RPC 的 name -> material id 缓存。"""
material_cache = getattr(self.bioyond_api_client, "material_cache", None)
if not isinstance(material_cache, dict):
return
before_count = len(material_cache)
for material in materials:
material_name = material.get("name")
material_id = material.get("id")
if material_name and material_id:
material_cache[material_name] = material_id
for detail_material in material.get("detail", []) or []:
detail_name = detail_material.get("name")
detail_id = detail_material.get("detailMaterialId") or detail_material.get("id")
if detail_name and detail_id:
material_cache[detail_name] = detail_id
logger.debug(
f"已用Bioyond库存同步物料缓存: {before_count} -> {len(material_cache)}"
)
def sync_to_external(self, resource: Any) -> bool:
"""将本地物料数据变更同步到Bioyond系统"""
try:
@@ -678,6 +709,70 @@ class BioyondWorkstation(WorkstationBase):
集成Bioyond物料管理的工作站实现
"""
# 子类(如 sirna / peptide覆写以指定默认 raw-call 日志目录。
# 路径相对仓库根;为 None 时若 debug_log=True 仍会写入临时位置。
_DEBUG_LOG_DEFAULT_DIR: Optional[str] = None
def _create_bioyond_rpc(self, config: Dict[str, Any]) -> BioyondV1RPC:
"""创建 Bioyond RPC 客户端并应用调试包装。
所有创建 ``BioyondV1RPC`` 的路径饿汉初始化、Sirna 延迟初始化、
以及未来的前端重新配置路径)都应通过该 helper
以确保 debug_log 包装与命名/日志策略保持一致。
"""
rpc = BioyondV1RPC(config)
debug_call_log.wrap_rpc_http(rpc)
return rpc
def _set_hardware_interface(self, rpc: BioyondV1RPC) -> BioyondV1RPC:
"""将已构造的 RPC 客户端设置到 ``self.hardware_interface``,并应用调试包装。"""
debug_call_log.wrap_rpc_http(rpc)
self.hardware_interface = rpc
return rpc
def _debug_log_resolved_dir(self) -> Path:
"""解析 ``debug_log_dir`` 为绝对路径。"""
configured = (getattr(self, "bioyond_config", {}) or {}).get("debug_log_dir")
default_dir = getattr(self, "_DEBUG_LOG_DEFAULT_DIR", None)
candidate = configured or default_dir or "bioyond_debug_records"
path = Path(candidate)
if not path.is_absolute():
repo_root = Path(__file__).resolve().parents[4]
path = repo_root / path
return path
def _ensure_debug_log_state(self) -> None:
"""从 ``self.bioyond_config`` 派生 ``_debug_log_enabled`` / ``_debug_log_dir``。
每次进入 ``_debug_call_session`` 时都重新解析,以兼容前端在运行时
修改 ``bioyond_config['debug_log']`` 或目录的场景;同时也容忍
子类(如 Sirna 延迟初始化)在 ``__init__`` 早期未触发本方法。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
self._debug_log_enabled = bool(cfg.get("debug_log"))
self._debug_log_dir = self._debug_log_resolved_dir()
@contextmanager
def _debug_call_session(self, action_name: str):
"""在 action 体外加一层 debug 会话上下文。
- ``debug_log`` 关闭时是空上下文,开销为 0。
- ``debug_log`` 开启时进入 :func:`debug_call_log.session`,所有
已被 ``wrap_rpc_http`` 包装过的 RPC 客户端都会捕获本次 action
产生的 HTTP 调用并写入 Markdown 文件。
子类(如 ``end_experiment``、``manual_unload`` 等)可以直接在
action 体里以 ``with self._debug_call_session("action_name"):`` 包裹。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
enabled = bool(cfg.get("debug_log"))
if not enabled:
yield None
return
out_dir = BioyondWorkstation._debug_log_resolved_dir(self)
with debug_call_log.session(action_name, out_dir) as ctx:
yield ctx
def _publish_task_status(
self,
task_id: str,
@@ -862,7 +957,7 @@ class BioyondWorkstation(WorkstationBase):
self.bioyond_config = {}
print("警告: 未提供 bioyond_config请确保在 JSON 配置文件中提供完整配置")
self.hardware_interface = BioyondV1RPC(self.bioyond_config)
self.hardware_interface = self._create_bioyond_rpc(self.bioyond_config)
def resource_tree_add(self, resources: List[ResourcePLR]) -> None:
"""添加资源到资源树并更新ROS节点
@@ -1338,11 +1433,7 @@ class BioyondWorkstation(WorkstationBase):
if self.hardware_interface:
self.hardware_interface.scheduler_reset()
# 新物料缓存
if self.hardware_interface:
self.hardware_interface.refresh_material_cache()
# 重新同步资源
# 重新同步资源,并用同一次库存查询结果更新物料缓存
if self.resource_synchronizer:
self.resource_synchronizer.sync_from_external()

View File

@@ -32,7 +32,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union
MAX_SCAN_DEPTH = 10 # 最大目录递归深度
MAX_SCAN_FILES = 1000 # 最大扫描文件数量
_CACHE_VERSION = 1 # 缓存格式版本号,格式变更时递增
_CACHE_VERSION = 2 # 缓存格式版本号,格式变更时递增
# 合法的装饰器来源模块
_REGISTRY_DECORATOR_MODULE = "unilabos.registry.decorators"
@@ -258,8 +258,6 @@ def scan_directory(
}
# ---------------------------------------------------------------------------
# File-level parsing
# ---------------------------------------------------------------------------
@@ -361,6 +359,7 @@ def _parse_file(
"actions": class_body.get("actions", {}),
"status_properties": class_body.get("status_properties", {}),
"init_params": class_body.get("init_params", []),
"init_docstring": class_body.get("init_docstring"),
"auto_methods": class_body.get("auto_methods", {}),
"import_map": import_map,
}
@@ -497,7 +496,6 @@ def _collect_imports(tree: ast.Module, module_path: str = "") -> Dict[str, str]:
return import_map
# ---------------------------------------------------------------------------
# Decorator finding & argument extraction
# ---------------------------------------------------------------------------
@@ -768,6 +766,7 @@ def _extract_class_body(
"actions": {}, # method_name -> action_info
"status_properties": {}, # prop_name -> status_info
"init_params": [], # [{"name": ..., "type": ..., "default": ...}, ...]
"init_docstring": None,
"auto_methods": {}, # method_name -> method_info (no @action decorator)
}
@@ -780,6 +779,7 @@ def _extract_class_body(
# --- __init__ ---
if method_name == "__init__":
result["init_params"] = _extract_method_params(item, import_map)
result["init_docstring"] = ast.get_docstring(item)
continue
# --- Skip private/dunder ---

View File

@@ -51,14 +51,18 @@ Qone_nmr:
properties:
check_interval:
default: 60
description: 检查间隔时间默认60秒
type: string
expected_count:
default: 1
description: 期望生成的.nmr文件数量默认1个
type: string
monitor_dir:
description: 要监督的目录路径如果未指定则使用self.monitor_directory
type: string
stability_checks:
default: 3
description: 文件大小稳定性检查次数默认3次
type: string
required: []
type: object
@@ -85,11 +89,14 @@ Qone_nmr:
goal:
properties:
output_dir:
description: 输出目录如果未指定使用self.output_directory
type: string
string_list:
description: 字符串列表
type: string
txt_encoding:
default: utf-8
description: 文件编码
type: string
required:
- string_list
@@ -151,6 +158,13 @@ Qone_nmr:
additionalProperties: false
properties:
string:
description: '包含多个字符串的输入数据,支持两种格式:
1. 逗号分隔:如 "A 1 B 2 C 3, X 10 Y 20 Z 30"
2. 换行分隔:如 "A 1 B 2 C 3
X 10 Y 20 Z 30"'
type: string
title: StrSingleInput_Goal
type: object

View File

@@ -491,14 +491,17 @@ bioyond_cell:
goal:
properties:
material_names:
description: 物料名称列表;默认使用 [LiPF6, LiDFOB, DTD, LiFSI, LiPO2F2]
items:
type: string
type: array
type_id:
default: 3a190ca0-b2f6-9aeb-8067-547e72c11469
description: 物料类型ID
type: string
warehouse_name:
default: 粉末加样头堆栈
description: 目标仓库名(用于取位置信息)
type: string
required: []
type: object
@@ -527,12 +530,16 @@ bioyond_cell:
goal:
properties:
location_name_or_id:
description: 具体库位名称(如 A01或库位 UUID由用户指定。
type: string
material_name:
description: 物料名称(会优先匹配配置模板)。
type: string
type_id:
description: 物料类型 ID若为空则尝试从配置推断
type: string
warehouse_name:
description: 需要入库的仓库名称;若为空则仅创建不入库。
type: string
required:
- material_name
@@ -661,15 +668,20 @@ bioyond_cell:
goal:
properties:
board_type:
description: 板类型,如 "5ml分液瓶板"、"配液瓶(小)板"
type: string
bottle_type:
description: 瓶类型,如 "5ml分液瓶"、"配液瓶(小)"
type: string
location_code:
description: 库位编号,例如 "A01"
type: string
name:
description: 物料名称
type: string
warehouse_name:
default: 手动堆栈
description: 仓库名称,默认为 "手动堆栈",支持 "自动堆栈-左"、"自动堆栈-右" 等
type: string
required:
- name
@@ -1956,19 +1968,19 @@ bioyond_cell:
properties:
source_wh_id:
default: 3a19debc-84b4-0359-e2d4-b3beea49348b
description: 来源仓库ID
description: 来源仓库 Id (默认为3号仓库)
type: string
source_x:
default: 1
description: 来源位置X坐标
description: 来源位置 X 坐标
type: integer
source_y:
default: 1
description: 来源位置Y坐标
description: 来源位置 Y 坐标
type: integer
source_z:
default: 1
description: 来源位置Z坐标
description: 来源位置 Z 坐标
type: integer
required: []
type: object
@@ -2061,9 +2073,11 @@ bioyond_cell:
goal:
properties:
order_code:
description: 任务编号
type: string
timeout:
default: 36000
description: 超时时间(秒)
type: integer
required:
- order_code
@@ -2092,12 +2106,15 @@ bioyond_cell:
goal:
properties:
order_code:
description: 任务编号
type: string
poll_interval:
default: 0.5
description: 轮询间隔(秒),默认 0.5 秒
type: number
timeout:
default: 36000
description: 超时时间(秒)
type: integer
required:
- order_code
@@ -2154,10 +2171,15 @@ bioyond_cell:
config:
properties:
bioyond_config:
description: '从 JSON 文件加载的 bioyond 配置字典
包含 api_host, api_key, HTTP_host, HTTP_port 等配置'
type: object
deck:
description: Deck 配置(可选,会从 JSON 中自动处理)
type: string
protocol_type:
description: 协议类型(可选)
type: string
required: []
type: object

View File

@@ -47,8 +47,10 @@ bioyond_dispensing_station:
goal:
properties:
report_request:
description: WorkstationReportRequest 对象,包含任务完成信息
type: string
used_materials:
description: 物料使用记录列表
type: string
required:
- report_request
@@ -102,6 +104,7 @@ bioyond_dispensing_station:
goal:
properties:
material_name:
description: 物料名称
type: string
required:
- material_name
@@ -611,10 +614,10 @@ bioyond_dispensing_station:
goal:
properties:
target_device_id:
description: 目标反应站设备ID(从设备列表中选择,所有转移组使用同一个目标设备
description: 目标反应站设备ID(所有转移组使用同一个设备)
type: string
transfer_groups:
description: 转移任务组列表每组包含物料名称、目标堆栈和目标库位,可以添加多组
description: '转移任务组列表,每组包含:'
type: array
required:
- target_device_id
@@ -694,10 +697,13 @@ bioyond_dispensing_station:
config:
properties:
config:
description: 配置字典,应包含material_type_mappings等配置
type: object
deck:
description: Deck对象
type: string
protocol_type:
description: 协议类型(由ROS系统传递,此处忽略)
type: string
required: []
type: object

View File

@@ -150,15 +150,15 @@ coincellassemblyworkstation_device:
properties:
assembly_pressure:
default: 4200
description: 电池压制力(N)
description: 电池压制力 (N)
type: integer
assembly_type:
default: 7
description: 组装类型(7=不用铝箔垫, 8=使用铝箔垫)
description: 组装类型 (7=不用铝箔垫, 8=使用铝箔垫)
type: integer
battery_clean_ignore:
default: false
description: 是否忽略电池清洁步骤
description: 是否忽略电池清洁
type: boolean
battery_pressure_mode:
default: true
@@ -166,29 +166,29 @@ coincellassemblyworkstation_device:
type: boolean
dual_drop_first_volume:
default: 25
description: 二次滴液第一次排液体积(μL)
description: 二次滴液第一次排液体积 (μL)
type: integer
dual_drop_mode:
default: false
description: 电解液添加模式(false=单次滴液, true=二次滴液)
description: 电解液添加模式 (False=单次滴液, True=二次滴液)
type: boolean
dual_drop_start_timing:
default: false
description: 二次滴液开始滴液时机(false=正极片前, true=正极片后)
description: 二次滴液开始滴液时机 (False=正极片前, True=正极片后)
type: boolean
dual_drop_suction_timing:
default: false
description: 二次滴液吸液时机(false=正常吸液, true=先吸液)
description: 二次滴液吸液时机 (False=正常吸液, True=先吸液)
type: boolean
elec_num:
description: 电解液瓶数
type: string
elec_use_num:
description: 每瓶电解液组装电池数
description: 每瓶电解液组装电池数
type: string
elec_vol:
default: 50
description: 电解液吸液量(μL)
description: 电解液吸液量 (μL)
type: integer
file_path:
default: /Users/sml/work
@@ -196,7 +196,7 @@ coincellassemblyworkstation_device:
type: string
fujipian_juzhendianwei:
default: 0
description: 负极片矩阵点位。盘位置从1开始计数有效范围1-8, 13-20 (写入值比实际位置少1例如写0取盘位1写1取盘位2)
description: 负极片矩阵点位
type: integer
fujipian_panshu:
default: 0
@@ -204,7 +204,7 @@ coincellassemblyworkstation_device:
type: integer
gemo_juzhendianwei:
default: 0
description: 隔膜矩阵点位。盘位置从1开始计数有效范围1-8, 13-20 (写入值比实际位置少1例如写0取盘位1写1取盘位2)
description: 隔膜矩阵点位
type: integer
gemopanshu:
default: 0
@@ -216,7 +216,7 @@ coincellassemblyworkstation_device:
type: boolean
qiangtou_juzhendianwei:
default: 0
description: 枪头盒矩阵点位。盘位置从1开始计数有效范围1-32, 64-96 (写入值比实际位置少1例如写0取盘位1写1取盘位2)
description: 枪头盒矩阵点位
type: integer
required:
- elec_num
@@ -308,7 +308,13 @@ coincellassemblyworkstation_device:
properties:
material_search_enable:
default: false
description: 是否启用物料搜寻功能。设备初始化后会弹出物料搜寻确认弹窗,此参数控制自动点击"是"(启用)或"否"(不启用)。默认为false(不启用物料搜寻)
description: '是否启用物料搜寻功能。
设备初始化后会弹出物料搜寻确认弹窗,
此参数控制自动点击''是''(启用)或''否''(不启用)。
默认为False不启用物料搜寻。'
type: boolean
required: []
type: object
@@ -547,15 +553,15 @@ coincellassemblyworkstation_device:
properties:
assembly_pressure:
default: 4200
description: 电池压制力(N)
description: 电池压制力 (N)
type: integer
assembly_type:
default: 7
description: 组装类型(7=不用铝箔垫, 8=使用铝箔垫)
description: 组装类型 (7=不用铝箔垫, 8=使用铝箔垫)
type: integer
battery_clean_ignore:
default: false
description: 是否忽略电池清洁步骤
description: 是否忽略电池清洁
type: boolean
battery_pressure_mode:
default: true
@@ -563,29 +569,29 @@ coincellassemblyworkstation_device:
type: boolean
dual_drop_first_volume:
default: 25
description: 二次滴液第一次排液体积(μL)
description: 二次滴液第一次排液体积 (μL)
type: integer
dual_drop_mode:
default: false
description: 电解液添加模式(false=单次滴液, true=二次滴液)
description: 电解液添加模式 (False=单次滴液, True=二次滴液)
type: boolean
dual_drop_start_timing:
default: false
description: 二次滴液开始滴液时机(false=正极片前, true=正极片后)
description: 二次滴液开始滴液时机 (False=正极片前, True=正极片后)
type: boolean
dual_drop_suction_timing:
default: false
description: 二次滴液吸液时机(false=正常吸液, true=先吸液)
description: 二次滴液吸液时机 (False=正常吸液, True=先吸液)
type: boolean
elec_num:
description: 电解液瓶数如果在workflow中已通过handles连接上游(create_orders的bottle_count输出),则此参数会自动从上游获取,无需手动填写;如果单独使用此函数(没有上游连接),则必须手动填写电解液瓶数
description: 电解液瓶数
type: string
elec_use_num:
description: 每瓶电解液组装电池数
description: 每瓶电解液组装电池数
type: string
elec_vol:
default: 50
description: 电解液吸液量(μL)
description: 电解液吸液量 (μL)
type: integer
file_path:
default: /Users/sml/work
@@ -593,7 +599,7 @@ coincellassemblyworkstation_device:
type: string
fujipian_juzhendianwei:
default: 0
description: 负极片矩阵点位。盘位置从1开始计数有效范围1-8, 13-20 (写入值比实际位置少1例如写0取盘位1写1取盘位2)
description: 负极片矩阵点位
type: integer
fujipian_panshu:
default: 0
@@ -601,7 +607,7 @@ coincellassemblyworkstation_device:
type: integer
gemo_juzhendianwei:
default: 0
description: 隔膜矩阵点位。盘位置从1开始计数有效范围1-8, 13-20 (写入值比实际位置少1例如写0取盘位1写1取盘位2)
description: 隔膜矩阵点位
type: integer
gemopanshu:
default: 0
@@ -613,7 +619,7 @@ coincellassemblyworkstation_device:
type: boolean
qiangtou_juzhendianwei:
default: 0
description: 枪头盒矩阵点位。盘位置从1开始计数有效范围1-32, 64-96 (写入值比实际位置少1例如写0取盘位1写1取盘位2)
description: 枪头盒矩阵点位
type: integer
required:
- elec_num

View File

@@ -18,6 +18,7 @@ xyz_stepper_controller:
goal:
properties:
degrees:
description: 角度值
type: number
required:
- degrees
@@ -44,6 +45,7 @@ xyz_stepper_controller:
goal:
properties:
axis:
description: 电机轴
type: object
required:
- axis
@@ -71,6 +73,7 @@ xyz_stepper_controller:
properties:
enable:
default: true
description: True为使能False为失能
type: boolean
required: []
type: object
@@ -99,9 +102,11 @@ xyz_stepper_controller:
goal:
properties:
axis:
description: 电机轴
type: object
enable:
default: true
description: True为使能False为失能
type: boolean
required:
- axis
@@ -152,6 +157,7 @@ xyz_stepper_controller:
goal:
properties:
axis:
description: 电机轴
type: object
required:
- axis
@@ -183,16 +189,21 @@ xyz_stepper_controller:
properties:
acceleration:
default: 1000
description: 加速度(rpm/s)
type: integer
axis:
description: 电机轴
type: object
position:
description: 目标位置(步数)
type: integer
precision:
default: 100
description: 到位精度
type: integer
speed:
default: 5000
description: 运行速度(rpm)
type: integer
required:
- axis
@@ -225,16 +236,21 @@ xyz_stepper_controller:
properties:
acceleration:
default: 1000
description: 加速度
type: integer
axis:
description: 电机轴
type: object
degrees:
description: 目标角度(度)
type: number
precision:
default: 100
description: 精度
type: integer
speed:
default: 5000
description: 移动速度
type: integer
required:
- axis
@@ -267,16 +283,21 @@ xyz_stepper_controller:
properties:
acceleration:
default: 1000
description: 加速度
type: integer
axis:
description: 电机轴
type: object
precision:
default: 100
description: 精度
type: integer
revolutions:
description: 目标圈数
type: number
speed:
default: 5000
description: 移动速度
type: integer
required:
- axis
@@ -309,15 +330,20 @@ xyz_stepper_controller:
properties:
acceleration:
default: 1000
description: 加速度
type: integer
speed:
default: 5000
description: 运行速度
type: integer
x:
description: X轴目标位置
type: integer
y:
description: Y轴目标位置
type: integer
z:
description: Z轴目标位置
type: integer
required: []
type: object
@@ -350,15 +376,20 @@ xyz_stepper_controller:
properties:
acceleration:
default: 1000
description: 加速度
type: integer
speed:
default: 5000
description: 移动速度
type: integer
x_deg:
description: X轴目标角度
type: number
y_deg:
description: Y轴目标角度
type: number
z_deg:
description: Z轴目标角度
type: number
required: []
type: object
@@ -391,15 +422,20 @@ xyz_stepper_controller:
properties:
acceleration:
default: 1000
description: 加速度
type: integer
speed:
default: 5000
description: 移动速度
type: integer
x_rev:
description: X轴目标圈数
type: number
y_rev:
description: Y轴目标圈数
type: number
z_rev:
description: Z轴目标圈数
type: number
required: []
type: object
@@ -427,6 +463,7 @@ xyz_stepper_controller:
goal:
properties:
revolutions:
description: 圈数
type: number
required:
- revolutions
@@ -456,10 +493,13 @@ xyz_stepper_controller:
properties:
acceleration:
default: 1000
description: 加速度(rpm/s)
type: integer
axis:
description: 电机轴
type: object
speed:
description: 运行速度(rpm),正值正转,负值反转
type: integer
required:
- axis
@@ -487,6 +527,7 @@ xyz_stepper_controller:
goal:
properties:
steps:
description: 步数
type: integer
required:
- steps
@@ -513,6 +554,7 @@ xyz_stepper_controller:
goal:
properties:
steps:
description: 步数
type: integer
required:
- steps
@@ -564,9 +606,11 @@ xyz_stepper_controller:
goal:
properties:
axis:
description: 电机轴
type: object
timeout:
default: 30.0
description: 超时时间(秒)
type: number
required:
- axis
@@ -591,11 +635,14 @@ xyz_stepper_controller:
properties:
baudrate:
default: 115200
description: 波特率
type: integer
port:
description: 串口端口名
type: string
timeout:
default: 1.0
description: 通信超时时间
type: number
required:
- port

View File

@@ -510,9 +510,11 @@ liquid_handler:
goal:
properties:
msg:
description: information to be printed
type: string
seconds:
default: 0
description: seconds to wait
type: string
required: []
type: object
@@ -2963,15 +2965,22 @@ liquid_handler:
additionalProperties: false
properties:
channel:
description: int
maximum: 2147483647
minimum: -2147483648
type: integer
dis_to_top:
description: 'float
Height in mm to move to relative to the well top.'
maximum: 1.7976931348623157e+308
minimum: -1.7976931348623157e+308
type: number
well:
additionalProperties: false
description: 'Well
The target well.'
properties:
category:
type: string
@@ -4829,11 +4838,13 @@ liquid_handler:
config:
properties:
backend:
description: Backend to use.
type: object
channel_num:
default: 8
type: integer
deck:
description: Deck to use.
type: object
simulator:
default: false
@@ -4883,14 +4894,17 @@ liquid_handler.biomek:
bind_parent_id:
type: string
liquid_input_slot:
description: 液体输入槽列表
items:
type: integer
type: array
liquid_type:
description: 液体类型列表
items:
type: string
type: array
liquid_volume:
description: 液体体积列表
items:
type: integer
type: array
@@ -4901,6 +4915,7 @@ liquid_handler.biomek:
type: object
type: array
slot_on_deck:
description: 甲板上的槽位
type: integer
required:
- resource_tracker
@@ -5036,20 +5051,27 @@ liquid_handler.biomek:
additionalProperties: false
properties:
none_keys:
description: 需要设置为None的键列表
items:
type: string
type: array
protocol_author:
description: 协议作者
type: string
protocol_date:
description: 协议日期
type: string
protocol_description:
description: 协议描述
type: string
protocol_name:
description: 协议名称
type: string
protocol_type:
description: 协议类型
type: string
protocol_version:
description: 协议版本
type: string
title: LiquidHandlerProtocolCreation_Goal
type: object

View File

@@ -87,7 +87,7 @@ neware_battery_test_system:
properties:
filepath:
default: bts_status.json
description: 输出JSON文件路径
description: 输出文件路径
type: string
required: []
type: object
@@ -146,7 +146,7 @@ neware_battery_test_system:
goal:
properties:
plate_num:
description: 盘号 (1 或 2),如果为null则返回所有盘的状态
description: 盘号 (1 或 2),如果为None则返回所有盘的状态
type: integer
required: []
type: object
@@ -237,11 +237,11 @@ neware_battery_test_system:
goal:
properties:
csv_path:
description: 输入CSV文件的绝对路径
description: 输入CSV文件路径
type: string
output_dir:
default: .
description: 输出目录用于存储XML和备份文件),默认当前目录
description: 输出目录用于存储XML文件和备份,默认当前目录
type: string
required:
- csv_path
@@ -302,14 +302,14 @@ neware_battery_test_system:
goal:
properties:
backup_dir:
description: 备份目录路径默认使用最近一次submit_from_csvbackup_dir
description: 备份目录路径默认使用最近一次 submit_from_csvbackup_dir
type: string
file_pattern:
default: '*'
description: 文件通配符模式,例如 *.csv 或 Battery_*.nda
description: 文件通配符模式,默认 "*" 上传所有文件(例如 "*.csv" 仅上传 CSV 文件)
type: string
oss_prefix:
description: OSS对象路径前缀默认使用self.oss_prefix
description: OSS 对象前缀默认使用类初始化时的配置
type: string
required: []
type: object
@@ -336,19 +336,25 @@ neware_battery_test_system:
config:
properties:
devtype:
description: 设备类型标识
type: string
ip:
description: TCP服务器IP地址
type: string
machine_id:
default: 1
description: 机器ID
type: integer
oss_prefix:
default: neware_backup
description: OSS对象路径前缀默认"neware_backup"
type: string
oss_upload_enabled:
default: false
description: 是否启用OSS上传功能默认False
type: boolean
port:
description: TCP端口
type: integer
size_x:
default: 50
@@ -360,6 +366,7 @@ neware_battery_test_system:
default: 20
type: number
timeout:
description: 通信超时时间(秒)
type: integer
required: []
type: object

View File

@@ -207,8 +207,12 @@ separator.homemade:
goal:
properties:
condition:
description: The condition to be monitored, either 'delta' or 'time'.
type: string
value:
description: 'The threshold value for the condition.
`delta > 0.05`, `time > 60`'
type: string
required:
- condition
@@ -305,12 +309,17 @@ separator.homemade:
event:
type: string
settling_time:
description: The duration for which to settle after stirring, in
seconds. Defaults to 10.
type: string
stir_speed:
description: The speed of stirring, in RPM. Defaults to 300.
maximum: 1.7976931348623157e+308
minimum: -1.7976931348623157e+308
type: number
stir_time:
description: The duration for which to stir, in seconds. Defaults
to 10.
maximum: 1.7976931348623157e+308
minimum: -1.7976931348623157e+308
type: number

View File

@@ -456,6 +456,7 @@ syringe_pump_with_valve.runze.SY03B-T06:
goal:
properties:
volume:
description: 'absolute position of the plunger, unit: mL'
type: number
required:
- volume
@@ -481,6 +482,7 @@ syringe_pump_with_valve.runze.SY03B-T06:
goal:
properties:
volume:
description: 'absolute position of the plunger, unit: mL'
type: number
required:
- volume
@@ -687,8 +689,10 @@ syringe_pump_with_valve.runze.SY03B-T06:
goal:
properties:
max_velocity:
description: 'maximum velocity of the plunger, unit: ml/s'
type: number
position:
description: 'absolute position of the plunger, unit: ml'
type: number
required:
- position
@@ -1003,6 +1007,7 @@ syringe_pump_with_valve.runze.SY03B-T08:
goal:
properties:
volume:
description: 'absolute position of the plunger, unit: mL'
type: number
required:
- volume
@@ -1028,6 +1033,7 @@ syringe_pump_with_valve.runze.SY03B-T08:
goal:
properties:
volume:
description: 'absolute position of the plunger, unit: mL'
type: number
required:
- volume
@@ -1234,8 +1240,10 @@ syringe_pump_with_valve.runze.SY03B-T08:
goal:
properties:
max_velocity:
description: 'maximum velocity of the plunger, unit: ml/s'
type: number
position:
description: 'absolute position of the plunger, unit: ml'
type: number
required:
- position

View File

@@ -32,7 +32,7 @@ reaction_station.bioyond:
type: integer
end_point:
default: 0
description: 终点计时点 (Start=开始前, End=结束后)
description: 终点计时点 (Start=0, End=1)
type: integer
end_step_key:
default: ''
@@ -40,11 +40,11 @@ reaction_station.bioyond:
type: string
start_point:
default: 0
description: 起点计时点 (Start=开始前, End=结束后)
description: 起点计时点 (Start=0, End=1)
type: integer
start_step_key:
default: ''
description: 起点步骤Key (例如 "feeding", "liquid", 可选, 默认为空则自动选择)
description: 起点步骤Key (可选, 默认为空则自动选择)
type: string
required:
- duration
@@ -91,6 +91,7 @@ reaction_station.bioyond:
goal:
properties:
json_str:
description: 订单参数的JSON字符串
type: string
required:
- json_str
@@ -117,6 +118,7 @@ reaction_station.bioyond:
goal:
properties:
workflow_ids:
description: 要删除的工作流ID数组
items:
type: string
type: array
@@ -145,6 +147,7 @@ reaction_station.bioyond:
goal:
properties:
json_str:
description: 'JSON格式的字符串,包含:'
type: string
required:
- json_str
@@ -197,6 +200,7 @@ reaction_station.bioyond:
goal:
properties:
web_workflow_json:
description: JSON 格式的网页工作流列表
type: string
required:
- web_workflow_json
@@ -228,8 +232,10 @@ reaction_station.bioyond:
goal:
properties:
reactor_id:
description: 反应器编号 (1-5)
type: integer
temperature:
description: 目标温度 (°C)
type: number
required:
- reactor_id
@@ -257,6 +263,7 @@ reaction_station.bioyond:
goal:
properties:
preintake_id:
description: 通量ID
type: string
required:
- preintake_id
@@ -338,6 +345,7 @@ reaction_station.bioyond:
goal:
properties:
value:
description: 工作流 ID 列表
items:
type: string
type: array
@@ -365,6 +373,7 @@ reaction_station.bioyond:
goal:
properties:
workflow_id:
description: 工作流ID
type: string
required:
- workflow_id
@@ -424,11 +433,11 @@ reaction_station.bioyond:
goal:
properties:
assign_material_name:
description: 物料名称(不能为空)
description: 物料名称(液体种类)
type: string
temperature:
default: 25.0
description: 温度设定(°C)
description: 温度(C)
type: number
time:
default: '90'
@@ -436,14 +445,14 @@ reaction_station.bioyond:
type: string
titration_type:
default: '1'
description: 是否滴定(NO=, YES=)
description: 是否滴定(NO=1, YES=2)
type: string
torque_variation:
default: 2
description: 是否观察 (NO=, YES=)
description: 是否观察(NO=1, YES=2)
type: integer
volume:
description: 分液公式(mL)
description: 分液量(μL)
type: string
required:
- assign_material_name
@@ -525,11 +534,11 @@ reaction_station.bioyond:
properties:
assign_material_name:
default: BAPP
description: 物料名称
description: 物料名称(试剂瓶位)
type: string
temperature:
default: 25.0
description: 温度设定(°C)
description: 温度设定(C)
type: number
time:
default: '0'
@@ -537,15 +546,15 @@ reaction_station.bioyond:
type: string
titration_type:
default: '1'
description: 是否滴定(NO=, YES=)
description: 是否滴定(NO=1, YES=2)
type: string
torque_variation:
default: 1
description: 是否观察 (NO=否, YES=是)
description: 是否观察(int类型, 1=否, 2=是)
type: integer
volume:
default: '350'
description: 分液公式(mL)
description: 分液质量(g)
type: string
required: []
type: object
@@ -593,26 +602,28 @@ reaction_station.bioyond:
description: 物料名称
type: string
solvents:
description: '溶剂信息对象(可选),包含: additional_solvent(溶剂体积mL), total_liquid_volume(总液体体积mL)。如果提供,将自动计算volume'
description: '溶剂信息的字典或JSON字符串(可选),格式如下:
{'
type: string
temperature:
default: 25.0
description: 温度设定(°C),默认25.00
description: 温度设定(C)
type: number
time:
default: '360'
description: 观察时间(分钟),默认360
description: 观察时间(分钟)
type: string
titration_type:
default: '1'
description: 是否滴定(NO=, YES=是),默认NO
description: 是否滴定(NO=1, YES=2)
type: string
torque_variation:
default: 2
description: 是否观察 (NO=, YES=是),默认YES
description: 是否观察(NO=1, YES=2)
type: integer
volume:
description: 分液量(mL)。可直接提供,或通过solvents参数自动计算
description: 分液量(μL),直接指定体积(可选,如果提供solvents自动计算)
type: string
required:
- assign_material_name
@@ -671,33 +682,32 @@ reaction_station.bioyond:
description: 物料名称
type: string
extracted_actuals:
description: 从报告提取的实际加料量JSON字符串,包含actualTargetWeigh(m二酐滴定)和actualVolume(V二酐滴定)
description: 从报告提取的实际加料量JSON字符串,包含actualTargetWeigh和actualVolume
type: string
feeding_order_data:
description: 'feeding_order JSON对象,用于获取m二酐值(type为main_anhydride的amount)。示例:
{"feeding_order": [{"type": "main_anhydride", "amount": 1.915}]}'
description: feeding_order JSON字符串或对象,用于获取m二酐值
type: string
temperature:
default: 25.0
description: 温度设定(°C),默认25.00
description: 温度(C)
type: number
time:
default: '90'
description: 观察时间(分钟),默认90
description: 观察时间(分钟)
type: string
titration_type:
default: '2'
description: 是否滴定(NO=, YES=),默认YES
description: 是否滴定(NO=1, YES=2),默认2
type: string
torque_variation:
default: 2
description: 是否观察 (NO=, YES=是),默认YES
description: 是否观察(NO=1, YES=2)
type: integer
volume_formula:
description: 分液公式(mL)。可直接提供固定公式,或留空由系统根据x_value、feeding_order_data、extracted_actuals自动生成
description: 分液公式(μL),如果提供则直接使用,否则自动计算
type: string
x_value:
description: 公式中的x值,手工输入,格式为"{{1-2-3}}"(包含双花括号)。用于自动公式计算
description: 手工输入的x值,格式如 "1-2-3"
type: string
required:
- assign_material_name
@@ -738,7 +748,7 @@ reaction_station.bioyond:
type: string
temperature:
default: 25.0
description: 温度设定(°C)
description: 温度(C)
type: number
time:
default: '0'
@@ -746,14 +756,14 @@ reaction_station.bioyond:
type: string
titration_type:
default: '1'
description: 是否滴定(NO=, YES=)
description: 是否滴定(NO=1, YES=2)
type: string
torque_variation:
default: 1
description: 是否观察 (NO=, YES=)
description: 是否观察(NO=1, YES=2)
type: integer
volume_formula:
description: 分液公式(mL)
description: 分液公式(μL)
type: string
required:
- volume_formula
@@ -786,7 +796,7 @@ reaction_station.bioyond:
description: 任务名称
type: string
workflow_name:
description: 工作流名称
description: 合并后的工作流名称
type: string
required:
- workflow_name
@@ -819,15 +829,15 @@ reaction_station.bioyond:
goal:
properties:
assign_material_name:
description: 物料名称
description: 物料名称(不能为空)
type: string
cutoff:
default: '900000'
description: 粘度上限
description: 粘度上限(需为有效数字字符串,默认 "900000")
type: string
temperature:
default: -10.0
description: 温度设定(°C)
description: 温度设定(C,范围:-50.00 至 100.00)
type: number
required:
- assign_material_name
@@ -909,11 +919,11 @@ reaction_station.bioyond:
description: 物料名称(用于获取试剂瓶位ID)
type: string
material_id:
description: 粉末类型IDSalt=21分钟Flour=面粉27分钟BTDA=BTDA38分钟
description: 粉末类型ID, Salt=1, Flour=2, BTDA=3
type: string
temperature:
default: 25.0
description: 温度设定(°C)
description: 温度设定(C)
type: number
time:
default: '0'
@@ -921,7 +931,7 @@ reaction_station.bioyond:
type: string
torque_variation:
default: 1
description: 是否观察 (NO=, YES=)
description: 是否观察(NO=1, YES=2)
type: integer
required:
- material_id
@@ -945,10 +955,13 @@ reaction_station.bioyond:
config:
properties:
config:
description: 配置字典,应包含workflow_mappings等配置
type: object
deck:
description: Deck对象
type: string
protocol_type:
description: 协议类型(由ROS系统传递,此处忽略)
type: string
required: []
type: object

View File

@@ -198,6 +198,8 @@ robotic_arm.SCARA_with_slider.moveit.virtual:
additionalProperties: false
properties:
command:
description: A JSON-formatted string that includes option, target,
speed, lift_height, mt_height
type: string
title: SendCmd_Goal
type: object
@@ -241,6 +243,8 @@ robotic_arm.SCARA_with_slider.moveit.virtual:
additionalProperties: false
properties:
command:
description: A JSON-formatted string that includes quaternion, speed,
position
type: string
title: SendCmd_Goal
type: object
@@ -284,6 +288,7 @@ robotic_arm.SCARA_with_slider.moveit.virtual:
additionalProperties: false
properties:
command:
description: A JSON-formatted string that includes speed
type: string
title: SendCmd_Goal
type: object

View File

@@ -709,6 +709,8 @@ linear_motion.toyo_xyz.sim:
additionalProperties: false
properties:
command:
description: A JSON-formatted string that includes option, target,
speed, lift_height, mt_height
type: string
title: SendCmd_Goal
type: object
@@ -752,6 +754,8 @@ linear_motion.toyo_xyz.sim:
additionalProperties: false
properties:
command:
description: A JSON-formatted string that includes quaternion, speed,
position
type: string
title: SendCmd_Goal
type: object
@@ -795,6 +799,7 @@ linear_motion.toyo_xyz.sim:
additionalProperties: false
properties:
command:
description: A JSON-formatted string that includes speed
type: string
title: SendCmd_Goal
type: object

View File

@@ -2179,6 +2179,7 @@ virtual_multiway_valve:
goal:
properties:
port_number:
description: 端口号 (1-8)
type: integer
required:
- port_number
@@ -2225,6 +2226,7 @@ virtual_multiway_valve:
goal:
properties:
port_number:
description: 目标端口号 (1-8)
type: integer
required:
- port_number
@@ -2261,6 +2263,7 @@ virtual_multiway_valve:
additionalProperties: false
properties:
command:
description: 目标位置 (0-8) 或位置字符串
type: string
title: SendCmd_Goal
type: object
@@ -2304,6 +2307,7 @@ virtual_multiway_valve:
additionalProperties: false
properties:
command:
description: 目标位置 (0-8) 或位置字符串
type: string
title: SendCmd_Goal
type: object
@@ -3960,6 +3964,14 @@ virtual_separator:
io_type: source
label: bottom_phase_out
side: SOUTH
- data_key: top_outlet
data_source: executor
data_type: fluid
description: 上相(轻相)液体输出口
handler_key: topphaseout
io_type: source
label: top_phase_out
side: NORTH
- data_key: mechanical_port
data_source: handle
data_type: mechanical
@@ -4207,6 +4219,7 @@ virtual_solenoid_valve:
additionalProperties: false
properties:
string:
description: '"ON"/"OFF" 或 "OPEN"/"CLOSED"'
type: string
title: StrSingleInput_Goal
type: object
@@ -4250,6 +4263,7 @@ virtual_solenoid_valve:
additionalProperties: false
properties:
command:
description: '"OPEN"/"CLOSED" 或其他控制命令'
type: string
title: SendCmd_Goal
type: object
@@ -4410,16 +4424,20 @@ virtual_solid_dispenser:
event:
type: string
mass:
description: 质量字符串 (如 "2.9 g")
type: string
mol:
description: 摩尔数字符串 (如 "0.12 mol")
type: string
purpose:
description: 添加目的
type: string
rate_spec:
type: string
ratio:
type: string
reagent:
description: 试剂名称
type: string
stir:
type: boolean
@@ -4431,6 +4449,7 @@ virtual_solid_dispenser:
type: string
vessel:
additionalProperties: false
description: 目标容器
properties:
category:
type: string
@@ -5560,8 +5579,10 @@ virtual_transfer_pump:
goal:
properties:
velocity:
description: 拉取速度 (ml/s)
type: number
volume:
description: 要拉取的体积 (ml)
type: number
required:
- volume
@@ -5588,8 +5609,10 @@ virtual_transfer_pump:
goal:
properties:
velocity:
description: 推出速度 (ml/s)
type: number
volume:
description: 要推出的体积 (ml)
type: number
required:
- volume
@@ -5685,10 +5708,12 @@ virtual_transfer_pump:
additionalProperties: false
properties:
max_velocity:
description: 移动速度 (ml/s)
maximum: 1.7976931348623157e+308
minimum: -1.7976931348623157e+308
type: number
position:
description: 目标位置 (ml)
maximum: 1.7976931348623157e+308
minimum: -1.7976931348623157e+308
type: number
@@ -5837,8 +5862,10 @@ virtual_transfer_pump:
config:
properties:
config:
description: 配置字典包含max_volume, port等参数
type: object
device_id:
description: 设备ID
type: string
required: []
type: object

View File

@@ -409,11 +409,11 @@ xrd_d7mate:
properties:
end_theta:
default: 80.0
description: 结束角度≥5.5°且必须大于start_theta
description: 结束角度≥5.5°,且必须大于 start_theta
type: number
exp_time:
default: 0.1
description: 曝光时间0.1-5.0秒)
description: 曝光时间0.1-5.0 秒)
type: number
increment:
default: 0.05
@@ -421,7 +421,7 @@ xrd_d7mate:
type: number
sample_id:
default: ''
description: 样品标识符
description: 样品名称
type: string
start_theta:
default: 10.0
@@ -433,7 +433,7 @@ xrd_d7mate:
type: string
wait_minutes:
default: 3.0
description: 允许上样后等待分钟数
description: 允许上样后、发送样品准备完成前的等待分钟数(默认 3 分钟)
type: number
required: []
title: StartWorkflow_Goal
@@ -492,12 +492,15 @@ xrd_d7mate:
properties:
host:
default: 127.0.0.1
description: 设备IP地址
type: string
port:
default: 6001
description: 通信端口默认6001
type: string
timeout:
default: 10.0
description: 超时时间,单位秒
type: string
required: []
type: object

View File

@@ -217,6 +217,7 @@ zhida_gcms:
additionalProperties: false
properties:
string:
description: Base64编码的CSV数据ROS2参数名
type: string
title: StrSingleInput_Goal
type: object
@@ -257,6 +258,7 @@ zhida_gcms:
additionalProperties: false
properties:
string:
description: CSV文件路径ROS2参数名
type: string
title: StrSingleInput_Goal
type: object
@@ -289,12 +291,15 @@ zhida_gcms:
properties:
host:
default: 192.168.3.184
description: 设备IP地址本地部署时可使用'127.0.0.1'
type: string
port:
default: 5792
description: 通信端口默认5792
type: string
timeout:
default: 10.0
description: 超时时间,单位秒
type: string
required: []
type: object

View File

@@ -271,6 +271,7 @@ class Registry:
registry_cache.pkl 一个文件中,删除即可完全重置。
"""
import time as _time
from unilabos.registry.ast_registry_scanner import _CACHE_VERSION as AST_SCAN_CACHE_VERSION
from unilabos.registry.ast_registry_scanner import scan_directory
scan_t0 = _time.perf_counter()
@@ -286,6 +287,10 @@ class Registry:
# ---- 统一缓存:一个 pkl 包含所有数据 ----
unified_cache = self._load_config_cache()
ast_cache = unified_cache.setdefault("_ast_scan", {"files": {}})
if ast_cache.get("version") != AST_SCAN_CACHE_VERSION:
ast_cache = {"version": AST_SCAN_CACHE_VERSION, "files": {}}
unified_cache["_ast_scan"] = ast_cache
unified_cache.pop("_build_results", None)
# 默认:扫描 unilabos 包所在的父目录
pkg_root = Path(__file__).resolve().parent.parent # .../unilabos
@@ -561,13 +566,47 @@ class Registry:
return prop_schema
@staticmethod
def _apply_docstring_param_metadata(
schema: Dict[str, Any],
doc_info: Dict[str, Any],
field_to_param: Optional[Dict[str, str]] = None,
apply_defaults: bool = False,
) -> None:
"""Apply parsed docstring display names and descriptions to schema properties."""
if not schema or not doc_info:
return
props = schema.get("properties", {})
if not isinstance(props, dict):
return
param_descs = doc_info.get("params", {}) or {}
param_display_names = doc_info.get("param_display_names", {}) or {}
for field_name, prop_schema in props.items():
if not isinstance(prop_schema, dict):
continue
param_name = field_to_param.get(field_name, field_name) if field_to_param else field_name
if not isinstance(param_name, str):
continue
param_name = param_name.removesuffix("[]")
if param_name in param_display_names:
prop_schema["title"] = param_display_names[param_name]
elif apply_defaults and not prop_schema.get("title"):
prop_schema["title"] = field_name
if param_name in param_descs:
prop_schema["description"] = param_descs[param_name]
elif apply_defaults and "description" not in prop_schema:
prop_schema["description"] = ""
def _generate_unilab_json_command_schema(
self, method_args: list, docstring: Optional[str] = None,
import_map: Optional[Dict[str, str]] = None,
apply_doc_defaults: bool = False,
) -> Dict[str, Any]:
"""根据方法参数和 docstring 生成 UniLabJsonCommand schema"""
doc_info = parse_docstring(docstring)
param_descs = doc_info.get("params", {})
schema = {
"type": "object",
@@ -598,12 +637,10 @@ class Registry:
param_name, param_type, param_default, import_map=import_map
)
if param_name in param_descs:
schema["properties"][param_name]["description"] = param_descs[param_name]
if param_required:
schema["required"].append(param_name)
self._apply_docstring_param_metadata(schema, doc_info, apply_defaults=apply_doc_defaults)
return schema
def _generate_status_types_schema(self, status_methods: Dict[str, Any]) -> Dict[str, Any]:
@@ -799,6 +836,7 @@ class Registry:
type_str = "UniLabJsonCommandAsync" if is_async else "UniLabJsonCommand"
params = method_info.get("params", [])
method_doc = method_info.get("docstring")
method_doc_info = parse_docstring(method_doc)
goal_schema = self._generate_schema_from_ast_params(params, method_name, method_doc, imap)
if action_args is not None:
@@ -828,7 +866,11 @@ class Registry:
# action handles: 从 @action(handles=[...]) 提取并转换为标准格式
raw_handles = (action_args or {}).get("handles")
handles = normalize_ast_action_handles(raw_handles) if isinstance(raw_handles, list) else (raw_handles or {})
handles = (
normalize_ast_action_handles(raw_handles)
if isinstance(raw_handles, list)
else (raw_handles or {})
)
# placeholder_keys: 先从参数类型自动检测,再用装饰器显式配置覆盖/补充
pk = detect_placeholder_keys(params)
@@ -847,7 +889,12 @@ class Registry:
"goal": goal,
"feedback": (action_args or {}).get("feedback") or {},
"result": (action_args or {}).get("result") or {},
"schema": wrap_action_schema(goal_schema, action_name, result_schema=result_schema),
"schema": wrap_action_schema(
goal_schema,
action_name,
description=(action_args or {}).get("description") or method_doc_info.get("description", ""),
result_schema=result_schema,
),
"goal_default": goal_default,
"handles": handles,
"placeholder_keys": pk,
@@ -886,7 +933,11 @@ class Registry:
action_name = f"auto-{action_name}"
raw_handles = action_args.get("handles")
handles = normalize_ast_action_handles(raw_handles) if isinstance(raw_handles, list) else (raw_handles or {})
handles = (
normalize_ast_action_handles(raw_handles)
if isinstance(raw_handles, list)
else (raw_handles or {})
)
method_params = method_info.get("params", [])
@@ -979,7 +1030,10 @@ class Registry:
"schema": schema,
"goal_default": goal_default,
"handles": handles,
"placeholder_keys": {**detect_placeholder_keys(method_params), **(action_args.get("placeholder_keys") or {})},
"placeholder_keys": {
**detect_placeholder_keys(method_params),
**(action_args.get("placeholder_keys") or {}),
},
}
if action_args.get("always_free") or method_info.get("always_free"):
action_entry["always_free"] = True
@@ -988,13 +1042,22 @@ class Registry:
nt = normalize_enum_value(action_args.get("node_type"), NodeType)
if nt:
action_entry["node_type"] = nt
goal_schema_for_docs = action_entry.get("schema", {}).get("properties", {}).get("goal", {})
self._apply_docstring_param_metadata(
goal_schema_for_docs,
parse_docstring(method_info.get("docstring")),
goal,
apply_defaults=True,
)
action_value_mappings[action_name] = action_entry
action_value_mappings = dict(sorted(action_value_mappings.items()))
# --- init_param_schema = { config: <init_params>, data: <status_types> } ---
init_params = ast_meta.get("init_params", [])
config_schema = self._generate_schema_from_ast_params(init_params, "__init__", import_map=imap)
config_schema = self._generate_schema_from_ast_params(
init_params, "__init__", ast_meta.get("init_docstring"), import_map=imap
)
data_schema = self._generate_status_schema_from_ast(
ast_meta.get("status_properties", {}), imap
)
@@ -1042,7 +1105,6 @@ class Registry:
) -> Dict[str, Any]:
"""Generate JSON Schema from AST-extracted parameter list."""
doc_info = parse_docstring(docstring)
param_descs = doc_info.get("params", {})
schema: Dict[str, Any] = {
"type": "object",
@@ -1072,12 +1134,10 @@ class Registry:
pname, ptype, pdefault, import_map
)
if pname in param_descs:
schema["properties"][pname]["description"] = param_descs[pname]
if prequired:
schema["required"].append(pname)
self._apply_docstring_param_metadata(schema, doc_info, apply_defaults=True)
return schema
def _generate_status_schema_from_ast(
@@ -1807,7 +1867,7 @@ class Registry:
else:
action_key = f"auto-{k}"
goal_schema = self._generate_unilab_json_command_schema(
v["args"], import_map=enhanced_import_map
v["args"], docstring=v.get("docstring"), import_map=enhanced_import_map
)
ret_type = v.get("return_type", "")
result_schema = None
@@ -1816,7 +1876,13 @@ class Registry:
"result", ret_type, None, import_map=enhanced_import_map
)
old_cfg = old_action_configs.get(action_key) or old_action_configs.get(f"auto-{k}", {})
new_schema = wrap_action_schema(goal_schema, action_key, result_schema=result_schema)
doc_info = parse_docstring(v.get("docstring"))
new_schema = wrap_action_schema(
goal_schema,
action_key,
description=doc_info.get("description", ""),
result_schema=result_schema,
)
old_schema = old_cfg.get("schema", {})
if old_schema:
preserve_field_descriptions(new_schema, old_schema)
@@ -1882,6 +1948,12 @@ class Registry:
merged_pk = dict(old_cfg.get("placeholder_keys", {}))
merged_pk.update(detect_placeholder_keys(v["args"]))
goal_schema_for_docs = (
entry_schema.get("properties", {}).get("goal", {})
if isinstance(entry_schema, dict)
else {}
)
self._apply_docstring_param_metadata(goal_schema_for_docs, doc_info, entry_goal)
entry = {
"type": entry_type,
@@ -1902,7 +1974,8 @@ class Registry:
device_config["init_param_schema"] = {}
init_schema = self._generate_unilab_json_command_schema(
enhanced_info["init_params"], "__init__",
enhanced_info["init_params"],
docstring=enhanced_info.get("init_docstring"),
import_map=enhanced_import_map,
)
device_config["init_param_schema"]["config"] = init_schema
@@ -1949,7 +2022,9 @@ class Registry:
action_str_type_mapping[action_type_str] = target_type
if target_type is not None:
try:
action_config["goal_default"] = ROS2MessageInstance(target_type.Goal()).get_python_dict()
action_config["goal_default"] = ROS2MessageInstance(
target_type.Goal()
).get_python_dict()
except Exception:
action_config["goal_default"] = {}
prev_schema = action_config.get("schema", {})
@@ -2141,6 +2216,7 @@ class Registry:
"unilabos_device_id": {
"type": "string",
"default": "",
"title": "设备ID",
"description": "UniLabOS设备ID用于指定执行动作的具体设备实例",
},
**schema["properties"]["goal"]["properties"],
@@ -2212,7 +2288,14 @@ class Registry:
lab_registry = Registry()
def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False, external_only=False):
def build_registry(
registry_paths=None,
devices_dirs=None,
upload_registry=False,
check_mode=False,
complete_registry=False,
external_only=False,
):
"""
构建或获取Registry单例实例
"""
@@ -2226,7 +2309,12 @@ def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False
if path not in current_paths:
lab_registry.registry_paths.append(path)
lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry, external_only=external_only)
lab_registry.setup(
devices_dirs=devices_dirs,
upload_registry=upload_registry,
complete_registry=complete_registry,
external_only=external_only,
)
# 将 AST 扫描的字符串类型替换为实际 ROS2 消息类(仅查找 ROS2 类型,不 import 设备模块)
lab_registry.resolve_all_types()

View File

@@ -36,16 +36,40 @@ class ROSMsgNotFound(Exception):
# ---------------------------------------------------------------------------
_SECTION_RE = re.compile(r"^(\w[\w\s]*):\s*$")
_PARAM_HEADER_RE = re.compile(
r"^\s*(?P<name>\w[\w]*)\s*(?:\[(?P<display_name>[^\]]+)\])?(?:\s*\([^)]*\))?\s*$"
)
def _parse_docstring_param_header(param_part: str) -> Tuple[str, Optional[str]]:
"""Parse ``name[display_name]`` or Google-style ``name (type)``."""
match = _PARAM_HEADER_RE.match(param_part.strip())
if not match:
return param_part.strip().split("(")[0].strip(), None
display_name = match.group("display_name")
if display_name is not None:
display_name = display_name.strip() or None
return match.group("name").strip(), display_name
def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
"""
解析 Google-style docstring提取描述和参数说明。
解析 docstring提取描述和参数说明。
支持:
- Google-style ``Args:`` / ``Parameters:`` 小节
- 直接参数行 ``field: desc``
- 带显示名参数行 ``field[Display Name]: desc``
Returns:
{"description": "短描述", "params": {"param1": "参数1描述", ...}}
{
"description": "短描述",
"params": {"param1": "参数1描述", ...},
"param_display_names": {"param1": "显示名", ...},
}
"""
result: Dict[str, Any] = {"description": "", "params": {}}
result: Dict[str, Any] = {"description": "", "params": {}, "param_display_names": {}}
if not docstring:
return result
@@ -53,33 +77,53 @@ def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
if not lines:
return result
result["description"] = lines[0].strip()
in_args = False
current_section: Optional[str] = None
current_param: Optional[str] = None
current_display_name: Optional[str] = None
current_desc_parts: list = []
for line in lines[1:]:
def flush_current_param() -> None:
nonlocal current_param, current_display_name, current_desc_parts
if current_param is None:
return
result["params"][current_param] = "\n".join(current_desc_parts).strip()
if current_display_name:
result["param_display_names"][current_param] = current_display_name
current_param = None
current_display_name = None
current_desc_parts = []
first_line = lines[0].strip()
start_index = 0
if not _SECTION_RE.match(first_line) and ":" not in first_line:
result["description"] = first_line
start_index = 1
for line in lines[start_index:]:
stripped = line.strip()
if not stripped:
if current_param is not None:
current_desc_parts.append("")
continue
section_match = _SECTION_RE.match(stripped)
if section_match:
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
current_param = None
current_desc_parts = []
section_name = section_match.group(1).lower()
in_args = section_name in ("args", "arguments", "parameters", "params")
flush_current_param()
current_section = section_match.group(1).lower()
in_args = current_section in ("args", "arguments", "parameters", "params")
continue
if not in_args:
parse_as_param = in_args or current_section is None
if not parse_as_param:
continue
if ":" in stripped and not stripped.startswith(" "):
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
if ":" in stripped:
flush_current_param()
param_part, _, desc_part = stripped.partition(":")
param_name = param_part.strip().split("(")[0].strip()
param_name, display_name = _parse_docstring_param_header(param_part)
current_param = param_name
current_display_name = display_name
current_desc_parts = [desc_part.strip()]
elif current_param is not None:
aline = line
@@ -89,8 +133,7 @@ def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
aline = aline[1:]
current_desc_parts.append(aline.strip())
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
flush_current_param()
return result

View File

@@ -0,0 +1,9 @@
try:
from . import peptide_materials # noqa: F401 ensure @resource classes are importable for PLR deserialize
except Exception: # pragma: no cover - 允许轻量环境导入非资源辅助函数
peptide_materials = None # type: ignore[assignment]
try:
from . import sirna_materials # noqa: F401 ensure @resource classes are importable for PLR deserialize
except Exception: # pragma: no cover - 允许轻量环境导入非资源辅助函数
sirna_materials = None # type: ignore[assignment]

View File

@@ -1,6 +1,8 @@
from os import name
from pylabrobot.resources import Deck, Coordinate, Rotation
from unilabos.registry.decorators import resource
from unilabos.resources.bioyond.YB_warehouses import (
bioyond_warehouse_1x4x4,
bioyond_warehouse_1x4x4_right, # 新增:右侧仓库 (A05D08)
@@ -23,6 +25,11 @@ from unilabos.resources.bioyond.YB_warehouses import (
from unilabos.resources.bioyond.warehouses import (
bioyond_warehouse_tipbox_storage_left, # 新增Tip盒堆栈(左)
bioyond_warehouse_tipbox_storage_right, # 新增Tip盒堆栈(右)
bioyond_warehouse_sirna_automation_stack,
bioyond_warehouse_sirna_centrifuge_balance_plate_stack,
bioyond_warehouse_sirna_g3_liquid_handler,
bioyond_warehouse_numeric_stack, # 新增:数字编码堆栈 (用于多肽站)
bioyond_warehouse_live_grid,
)
@@ -101,6 +108,83 @@ class BIOYOND_PolymerPreparationStation_Deck(Deck):
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
@resource(
id="BIOYOND_SirnaStation_Deck",
category=["deck"],
description="BIOYOND 小核酸工作站 Deck",
icon="配液站.webp",
)
class BIOYOND_SirnaStation_Deck(Deck):
WAREHOUSE_BIOYOND_AXIS = {
"G3移液站": "xy_col_row",
"自动化堆栈": "xy_col_row",
"离心机配平板堆栈": "xy_col_row",
}
WAREHOUSE_BIOYOND_KEY_AXIS = {
"G3移液站": "col_row",
"自动化堆栈": "col_row",
"离心机配平板堆栈": "col_row",
}
# Bioyond warehouse UUID -> 本地仓库名称 映射。
# 留空时由配置station config 的 ``warehouse_bioyond_ids``)注入。
# graph 节点也可在 deck.config.warehouse_bioyond_ids 覆盖。
WAREHOUSE_BIOYOND_IDS: dict = {}
def __init__(
self,
name: str = "SirnaStation_Deck",
size_x: float = 2700.0,
size_y: float = 1080.0,
size_z: float = 1500.0,
category: str = "deck",
setup: bool = False,
warehouse_bioyond_ids: dict | None = None,
**kwargs,
) -> None:
super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z)
# 按需写入实例级覆盖;保留默认空 mapping避免改动模型常量。
self.warehouse_bioyond_ids: dict = dict(self.WAREHOUSE_BIOYOND_IDS)
if warehouse_bioyond_ids:
self.warehouse_bioyond_ids.update(warehouse_bioyond_ids)
if setup:
self.setup()
@classmethod
def deserialize(cls, data: dict, allow_marshal: bool = False):
if data.get("children") and data.get("setup") is True:
data = data.copy()
data["setup"] = False
result = super().deserialize(data, allow_marshal=allow_marshal)
result._ensure_sirna_warehouse_metadata()
return result
def _ensure_sirna_warehouse_metadata(self) -> None:
for child in getattr(self, "children", []):
name = getattr(child, "name", "")
axis = self.WAREHOUSE_BIOYOND_AXIS.get(name)
if axis and not hasattr(child, "bioyond_axis"):
child.bioyond_axis = axis
key_axis = self.WAREHOUSE_BIOYOND_KEY_AXIS.get(name)
if key_axis and not hasattr(child, "bioyond_key_axis"):
child.bioyond_key_axis = key_axis
def setup(self) -> None:
# Sirna 读接口 /api/storage/location/locations-by-type 返回完整固定堆栈清单。
# LIMS 在库物料接口仍使用相同的 自动化堆栈 名称和数字库位编码。
self.warehouses = {
"G3移液站": bioyond_warehouse_sirna_g3_liquid_handler(),
"自动化堆栈": bioyond_warehouse_sirna_automation_stack(),
"离心机配平板堆栈": bioyond_warehouse_sirna_centrifuge_balance_plate_stack(),
}
self.warehouse_locations = {
"G3移液站": Coordinate(0.0, 0.0, 0.0),
"自动化堆栈": Coordinate(220.0, 0.0, 0.0),
"离心机配平板堆栈": Coordinate(1740.0, 0.0, 0.0),
}
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
class BIOYOND_YB_Deck(Deck):
def __init__(
self,
@@ -150,12 +234,207 @@ class BIOYOND_YB_Deck(Deck):
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
@resource(
id="BIOYOND_PeptideStation_Deck",
category=["deck"],
description="BIOYOND 多肽工作站 Deck",
icon="preparation_station.webp",
)
class BIOYOND_PeptideStation_Deck(Deck):
WAREHOUSE_BIOYOND_AXIS = dict.fromkeys(
[
"自动化堆栈",
"低温冰箱仓库",
"Tecan移液站库",
"G3移液站库",
"IDOT移液站库",
"G3缓冲库",
"盖板缓冲库",
"配平板缓冲库",
"IDOT缓冲库",
"固相合成板底座缓冲位",
"离心机库位",
"热封膜机位",
],
"xy_col_row",
)
WAREHOUSE_BIOYOND_KEY_AXIS = dict.fromkeys(WAREHOUSE_BIOYOND_AXIS, "col_row")
def __init__(
self,
name: str = "PeptideStation_Deck",
size_x: float = 2700.0,
size_y: float = 2000.0,
size_z: float = 1500.0,
category: str = "deck",
setup: bool = False
) -> None:
super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z)
if setup:
self.setup()
@classmethod
def deserialize(cls, data: dict, allow_marshal: bool = False):
if data.get("children") and data.get("setup") is True:
data = data.copy()
data["setup"] = False
# 已有序列化子资源,跳过 setup 避免重复创建
result = super(BIOYOND_PeptideStation_Deck, cls).deserialize(data, allow_marshal=allow_marshal)
else:
result = super(BIOYOND_PeptideStation_Deck, cls).deserialize(data, allow_marshal=allow_marshal)
result._ensure_peptide_warehouse_metadata()
return result
def _ensure_peptide_warehouse_metadata(self) -> None:
for child in getattr(self, "children", []):
name = getattr(child, "name", "")
axis = self.WAREHOUSE_BIOYOND_AXIS.get(name)
if axis and not hasattr(child, "bioyond_axis"):
child.bioyond_axis = axis
key_axis = self.WAREHOUSE_BIOYOND_KEY_AXIS.get(name)
if key_axis and not hasattr(child, "bioyond_key_axis"):
child.bioyond_key_axis = key_axis
def _frontend_y_flipped_coordinate(self, display_x: float, display_y: float, child) -> Coordinate:
"""把期望显示坐标转换为兼容前端 y 轴翻转的存储坐标。"""
return Coordinate(display_x, self.get_size_y() - display_y - child.get_size_y(), 0.0)
def setup(self) -> None:
# 多肽工作站仓库配置
# 基于 2026-05-09 live API probe 发现的实际仓库拓扑 (12个仓库)
# 数据来源: Bioyond 现场仓库发现结果。
self.warehouses = {
# 主自动化堆栈 - live API: code 10-17 -> x=17, y=10显示为 17 行×10 列
"自动化堆栈": bioyond_warehouse_numeric_stack(
"自动化堆栈",
rows=17,
columns=10,
bioyond_axis="xy_col_row",
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
# 低温存储
"低温冰箱仓库": bioyond_warehouse_live_grid(
"低温冰箱仓库",
rows=3,
columns=2,
slot_keys=["1", "2", "3", "4", "5", "6"],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
# 移液站库位
"Tecan移液站库": bioyond_warehouse_live_grid(
"Tecan移液站库",
rows=18,
columns=1,
slot_keys=[str(index) for index in range(1, 19)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
"G3移液站库": bioyond_warehouse_live_grid(
"G3移液站库",
rows=18,
columns=1,
slot_keys=["1", "2", "3", "4", "垃圾桶", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18"],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
"IDOT移液站库": bioyond_warehouse_live_grid(
"IDOT移液站库",
rows=12,
columns=1,
slot_keys=[f"0009-{index:04d}" for index in range(1, 13)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
# 缓冲库位
"G3缓冲库": bioyond_warehouse_live_grid(
"G3缓冲库",
rows=5,
columns=1,
slot_keys=[str(index) for index in range(1, 6)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
"盖板缓冲库": bioyond_warehouse_live_grid(
"盖板缓冲库",
rows=7,
columns=1,
slot_keys=[str(index) for index in range(1, 8)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
"配平板缓冲库": bioyond_warehouse_live_grid(
"配平板缓冲库",
rows=3,
columns=1,
slot_keys=[str(index) for index in range(1, 4)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
"IDOT缓冲库": bioyond_warehouse_live_grid(
"IDOT缓冲库",
rows=2,
columns=1,
slot_keys=["1", "1"],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
"固相合成板底座缓冲位": bioyond_warehouse_live_grid(
"固相合成板底座缓冲位",
rows=4,
columns=1,
slot_keys=[f"0015-{index:04d}" for index in range(1, 5)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
# 设备库位
"离心机库位": bioyond_warehouse_live_grid(
"离心机库位",
rows=4,
columns=1,
slot_keys=[f"0017-{index:04d}" for index in range(1, 5)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
"热封膜机位": bioyond_warehouse_live_grid(
"热封膜机位",
rows=2,
columns=1,
slot_keys=[f"0016-{index:04d}" for index in range(1, 3)],
bioyond_key_axis="col_row",
frontend_y_flip=True,
),
}
# 仓库显示布局:紧凑排列;存储 y 坐标按前端兼容翻转预先反向。
display_layout = {
"自动化堆栈": (0.0, 0.0),
"Tecan移液站库": (1520.0, 0.0),
"G3移液站库": (1710.0, 0.0),
"IDOT移液站库": (1900.0, 0.0),
"G3缓冲库": (2090.0, 0.0),
"盖板缓冲库": (2090.0, 580.0),
"低温冰箱仓库": (2280.0, 0.0),
"配平板缓冲库": (2280.0, 370.0),
"IDOT缓冲库": (2470.0, 370.0),
"固相合成板底座缓冲位": (2280.0, 740.0),
"离心机库位": (2470.0, 740.0),
"热封膜机位": (2280.0, 1210.0),
}
self.warehouse_locations = {
name: self._frontend_y_flipped_coordinate(x, y, self.warehouses[name])
for name, (x, y) in display_layout.items()
}
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
def YB_Deck(name: str) -> Deck:
by=BIOYOND_YB_Deck(name=name)
by.setup()
return by

View File

@@ -0,0 +1,247 @@
"""Peptide Station Material Resource Definitions."""
from __future__ import annotations
from collections import OrderedDict
try:
from pylabrobot.resources import Container, Plate, TipRack
except Exception: # pragma: no cover - 允许无 pylabrobot 的轻量动作测试导入
class _FallbackResource:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class Container(_FallbackResource): # type: ignore[no-redef]
pass
class Plate(_FallbackResource): # type: ignore[no-redef]
pass
class TipRack(_FallbackResource): # type: ignore[no-redef]
pass
try:
from unilabos.registry.decorators import resource
except Exception: # pragma: no cover - 允许无完整 registry 依赖时导入常量
def resource(*args, **kwargs):
def decorator(cls):
return cls
return decorator
def _ensure_itemized_ordering(kwargs: dict) -> None:
if kwargs.get("ordering") is None and kwargs.get("ordered_items") is None:
kwargs["ordering"] = OrderedDict()
class _PeptideTipRack(TipRack):
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 64.0)
kwargs.setdefault("with_tips", True)
_ensure_itemized_ordering(kwargs)
super().__init__(*args, **kwargs)
class _PeptidePlate(Plate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 14.35)
kwargs.setdefault("plate_type", "skirted")
_ensure_itemized_ordering(kwargs)
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_1000ul_tip_rack",
category=["labware", "tip_rack"],
description="1000uL tip rack for Bioyond peptide station",
)
class BioyondPeptide_1000ul_TipRack(_PeptideTipRack):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_1000ul_tip_rack")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_200ul_tip_rack",
category=["labware", "tip_rack"],
description="200uL tip rack for Bioyond peptide station",
)
class BioyondPeptide_200ul_TipRack(_PeptideTipRack):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_200ul_tip_rack")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_50ul_tip_rack",
category=["labware", "tip_rack"],
description="50uL tip rack for Bioyond peptide station",
)
class BioyondPeptide_50ul_TipRack(_PeptideTipRack):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_50ul_tip_rack")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_96_well_deep_well_plate",
category=["labware", "plate"],
description="96 well deep well plate for Bioyond peptide station",
)
class BioyondPeptide_96WellDeepWellPlate(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_96_well_deep_well_plate")
kwargs.setdefault("size_z", 44.0)
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_96_well_synthesis_plate",
category=["labware", "plate"],
description="96 well solid-phase synthesis plate for Bioyond peptide station",
)
class BioyondPeptide_96WellSynthesisPlate(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_96_well_synthesis_plate")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_96_well_synthesis_plate_base",
category=["labware", "adapter"],
description="96 well solid-phase synthesis plate base for Bioyond peptide station",
)
class BioyondPeptide_96WellSynthesisPlateBase(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_96_well_synthesis_plate_base")
kwargs.setdefault("size_z", 20.0)
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_96_well_balance_plate",
category=["labware", "plate"],
description="96 well balance plate for Bioyond peptide station",
)
class BioyondPeptide_96WellBalancePlate(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_96_well_balance_plate")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_384_well_plate",
category=["labware", "plate"],
description="384 well plate for Bioyond peptide station",
)
class BioyondPeptide_384WellPlate(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_384_well_plate")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_384_lcms_plate",
category=["labware", "plate"],
description="384 well LCMS plate for Bioyond peptide station",
)
class BioyondPeptide_384LCMSPlate(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_384_lcms_plate")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_384_balance_plate",
category=["labware", "plate"],
description="384 well balance plate for Bioyond peptide station",
)
class BioyondPeptide_384BalancePlate(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_384_balance_plate")
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_cover_plate",
category=["labware", "cover"],
description="Cover plate for Bioyond peptide station",
)
class BioyondPeptide_CoverPlate(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_cover_plate")
kwargs.setdefault("size_z", 8.0)
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_sealing_base",
category=["labware", "adapter"],
description="Sealing base for Bioyond peptide station",
)
class BioyondPeptide_SealingBase(_PeptidePlate):
def __init__(self, *args, **kwargs):
kwargs.setdefault("model", "bioyond_peptide_sealing_base")
kwargs.setdefault("size_z", 20.0)
super().__init__(*args, **kwargs)
@resource(
id="bioyond_peptide_reagent_trough",
category=["labware", "trough"],
description="Reagent trough for Bioyond peptide station",
)
class BioyondPeptide_ReagentTrough(Container):
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 44.0)
kwargs.setdefault("max_volume", 300000.0)
kwargs.setdefault("model", "bioyond_peptide_reagent_trough")
super().__init__(*args, **kwargs)
DEFAULT_PEPTIDE_MATERIAL_TYPE_MAPPINGS = {
"bioyond_peptide_1000ul_tip_rack": ["1000μL枪头盒", "3a1890bb-736e-cfdd-3213-eb314e8a60f9"],
"bioyond_peptide_200ul_tip_rack": ["200μL枪头盒", "3a1890bb-36d1-964a-18bd-0bf0f2877a7b"],
"bioyond_peptide_50ul_tip_rack": ["50μL枪头盒", "3a1890bc-5fae-361c-cc09-e6f2f6dcd71d"],
"bioyond_peptide_96_well_deep_well_plate": ["96孔深孔板", "3a1890bc-1fa8-fe39-9faa-12279ed4569b"],
"bioyond_peptide_96_well_synthesis_plate": ["96孔固相合成板", "3a1871cb-99f3-f01d-23e2-08dbbd0045b5"],
"bioyond_peptide_96_well_synthesis_plate_base": ["96孔固相合成板底座", "3a1b997e-241b-64f0-80d1-47bca08799d1"],
"bioyond_peptide_96_well_balance_plate": ["96孔配平板", "3a187661-2378-1e20-fa5c-a27d49fdc15d"],
"bioyond_peptide_384_well_plate": ["384孔酶标板", "3a1890bf-2148-ed20-92bd-d85869947d9a"],
"bioyond_peptide_384_lcms_plate": ["384孔LCMS板", "3a1e6a8b-cb61-74da-a089-8e6f197f80f0"],
"bioyond_peptide_384_balance_plate": ["384孔配平板", "3a18be7e-47cc-888c-fc68-055753286826"],
"bioyond_peptide_cover_plate": ["防挥发盖板", "3a19d5a6-b0e2-b486-e5eb-bcabc632f4de"],
"bioyond_peptide_sealing_base": ["封膜底座", "3a1d1d7b-e33b-6975-165d-c56cba5ed345"],
"bioyond_peptide_reagent_trough": ["12道试剂槽", "3a18b431-ac58-ca2e-9680-2a4f5880ea45"],
}
MATERIAL_TYPE_CODE_TO_CLASS = {
"0001": BioyondPeptide_96WellSynthesisPlate,
"0002": BioyondPeptide_96WellBalancePlate,
"0008": BioyondPeptide_200ul_TipRack,
"0009": BioyondPeptide_1000ul_TipRack,
"0011": BioyondPeptide_96WellDeepWellPlate,
"0012": BioyondPeptide_50ul_TipRack,
"0016": BioyondPeptide_384WellPlate,
"0018": BioyondPeptide_384WellPlate,
"0024": BioyondPeptide_ReagentTrough,
"0026": BioyondPeptide_384BalancePlate,
"0035": BioyondPeptide_CoverPlate,
"0039": BioyondPeptide_96WellSynthesisPlateBase,
"0041": BioyondPeptide_SealingBase,
"0049": BioyondPeptide_384LCMSPlate,
}
def get_material_class_by_type_code(type_code: str):
"""Return a peptide material class by Bioyond material type code."""
return MATERIAL_TYPE_CODE_TO_CLASS.get(type_code)

View File

@@ -1,5 +1,192 @@
from pylabrobot.resources import Coordinate
from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_resources
from unilabos.resources.warehouse import WareHouse, warehouse_factory
class BioyondWareHouse(WareHouse):
"""Bioyond 仓库,额外保存服务端 x/y 坐标和库位标签语义。"""
def __init__(self, *args, bioyond_axis: str = "xy_row_col", bioyond_key_axis: str = "row_col", **kwargs):
super().__init__(*args, **kwargs)
self.bioyond_axis = bioyond_axis
self.bioyond_key_axis = bioyond_key_axis
def serialize(self) -> dict:
data = super().serialize()
data["bioyond_axis"] = self.bioyond_axis
data["bioyond_key_axis"] = self.bioyond_key_axis
return data
def bioyond_warehouse_numeric_stack(
name: str,
rows: int = 10,
columns: int = 17,
bioyond_axis: str = "xy_row_col",
bioyond_key_axis: str = "row_col",
frontend_y_flip: bool = False,
) -> WareHouse:
"""创建 Bioyond 数字库位堆栈,库位名使用服务端返回的 行-列 格式。
bioyond_axis: 仓库级别的 Bioyond 坐标轴约定,供 graphio 的坐标映射使用。
- "xy_row_col" (default): Bioyond x→row, y→col (reaction/peptide 历史约定).
- "xy_col_row": Bioyond x→col, y→row (Sirna live API 实测约定).
bioyond_key_axis: 库位标签生成约定。
- "row_col" (default): 视觉行列和标签行列一致,例如 10 行 x 17 列 → 1-1..10-17。
- "col_row": 视觉行列转置,但标签仍保持 Bioyond row-col例如
17 行 x 10 列 → 1-1..10-17。
未设置时 graphio 回退到默认 "xy_row_col",其他调用方保持原行为。
"""
num_items_x = columns
num_items_y = rows
num_items_z = 1
dx = 10.0
dy = 10.0
dz = 10.0
item_dx = 147.0
item_dy = 106.0
item_dz = 130.0
resource_size_x = 127.0
resource_size_y = 86.0
resource_size_z = 25.0
size_y = dy + item_dy * num_items_y
locations = []
for row in range(num_items_y):
display_y = dy + row * item_dy
y = size_y - display_y - resource_size_y if frontend_y_flip else display_y
for col in range(num_items_x):
locations.append(Coordinate(dx + col * item_dx, y, dz))
holders = create_homogeneous_resources(
klass=ResourceHolder,
locations=locations,
resource_size_x=resource_size_x,
resource_size_y=resource_size_y,
resource_size_z=resource_size_z,
name_prefix=name,
)
if bioyond_key_axis == "row_col":
keys = [
f"{row + 1}-{col + 1}"
for row in range(num_items_y)
for col in range(num_items_x)
]
elif bioyond_key_axis == "col_row":
keys = [
f"{col + 1}-{row + 1}"
for row in range(num_items_y)
for col in range(num_items_x)
]
else:
raise ValueError(f"未知 Bioyond 库位标签约定: {bioyond_key_axis!r}")
warehouse = BioyondWareHouse(
name=name,
size_x=dx + item_dx * num_items_x,
size_y=size_y,
size_z=dz + item_dz * num_items_z,
num_items_x=num_items_x,
num_items_y=num_items_y,
num_items_z=num_items_z,
ordering_layout="row-major",
sites={key: holder for key, holder in zip(keys, holders.values())},
category="warehouse",
bioyond_axis=bioyond_axis,
bioyond_key_axis=bioyond_key_axis,
)
return warehouse
def bioyond_warehouse_live_grid(
name: str,
rows: int,
columns: int,
slot_keys: list[str] | None = None,
bioyond_axis: str = "xy_col_row",
bioyond_key_axis: str = "row_col",
frontend_y_flip: bool = False,
) -> WareHouse:
"""创建 Bioyond 实测库位网格,按服务端 code 保存位点标签。
默认用于 Peptide live API 返回的坐标x 是视觉列y 是视觉行。
当服务端 code 重复时,为保持 PLR ordering 唯一性,会给后续重复项追加 ``#N``。
"""
num_items_x = columns
num_items_y = rows
num_items_z = 1
dx = 10.0
dy = 10.0
dz = 10.0
item_dx = 147.0
item_dy = 106.0
item_dz = 130.0
resource_size_x = 127.0
resource_size_y = 86.0
resource_size_z = 25.0
size_y = dy + item_dy * num_items_y
locations = []
for row in range(num_items_y):
display_y = dy + row * item_dy
y = size_y - display_y - resource_size_y if frontend_y_flip else display_y
for col in range(num_items_x):
locations.append(Coordinate(dx + col * item_dx, y, dz))
holders = create_homogeneous_resources(
klass=ResourceHolder,
locations=locations,
resource_size_x=resource_size_x,
resource_size_y=resource_size_y,
resource_size_z=resource_size_z,
name_prefix=name,
)
keys = slot_keys or [str(index + 1) for index in range(num_items_x * num_items_y)]
if len(keys) != len(holders):
raise ValueError(f"{name} 库位数量不匹配: keys={len(keys)}, holders={len(holders)}")
seen: dict[str, int] = {}
unique_keys: list[str] = []
for key in keys:
count = seen.get(key, 0) + 1
seen[key] = count
unique_keys.append(key if count == 1 else f"{key}#{count}")
return BioyondWareHouse(
name=name,
size_x=dx + item_dx * num_items_x,
size_y=size_y,
size_z=dz + item_dz * num_items_z,
num_items_x=num_items_x,
num_items_y=num_items_y,
num_items_z=num_items_z,
ordering_layout="row-major",
sites={key: holder for key, holder in zip(unique_keys, holders.values())},
category="warehouse",
bioyond_axis=bioyond_axis,
bioyond_key_axis=bioyond_key_axis,
)
# ================ 小核酸工作站相关堆栈 ================
def bioyond_warehouse_sirna_g3_liquid_handler(name: str = "G3移液站") -> WareHouse:
"""创建小核酸 G3 移液站库位堆栈:显示为 14 行 x 1 列,标签保持 1-1..1-14。"""
return bioyond_warehouse_numeric_stack(
name, rows=14, columns=1, bioyond_axis="xy_col_row", bioyond_key_axis="col_row"
)
def bioyond_warehouse_sirna_automation_stack(name: str = "自动化堆栈") -> WareHouse:
"""创建小核酸自动化堆栈:显示为 17 行 x 10 列,标签保持 1-1..10-17。"""
return bioyond_warehouse_numeric_stack(
name, rows=17, columns=10, bioyond_axis="xy_col_row", bioyond_key_axis="col_row"
)
def bioyond_warehouse_sirna_centrifuge_balance_plate_stack(name: str = "离心机配平板堆栈") -> WareHouse:
"""创建小核酸离心机配平板堆栈:显示为 1 行 x 2 列,标签保持 1-1、2-1。"""
return bioyond_warehouse_numeric_stack(
name, rows=1, columns=2, bioyond_axis="xy_col_row", bioyond_key_axis="col_row"
)
# ================ 反应站相关堆栈 ================
def bioyond_warehouse_1x4x4(name: str) -> WareHouse:

View File

@@ -736,7 +736,7 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
logger.warning(f"物料 {unique_name} 不是有效的 ResourcePLR 实例,类型: {type(plr_material)}")
continue
plr_material.code = material.get("code", "") and material.get("barCode", "") or ""
plr_material.code = material.get("barCode") or material.get("code") or ""
plr_material.unilabos_uuid = str(uuid.uuid4())
# ⭐ 保存 Bioyond 原始信息到 unilabos_extra用于出库时查询
@@ -864,11 +864,22 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
warehouse = deck.warehouses[wh_name]
logger.debug(f"[Warehouse匹配] 找到warehouse: {wh_name} (容量: {warehouse.capacity}, 行×列: {warehouse.num_items_x}×{warehouse.num_items_y})")
# Bioyond坐标映射 (重要!): x→行(1=A,2=B...), y→列(1=01,2=02...), z→层(通常=1)
x = loc.get("x", 1) # 行号 (1-based: 1=A, 2=B, 3=C, 4=D)
y = loc.get("y", 1) # 列号 (1-based: 1=01, 2=02, 3=03...)
# Bioyond坐标映射:
# - 历史 row_col 仓库中 x/y 直接按行/列参与索引。
# - Sirna 的库位标签为 col-rowstock-material 返回 x=标签第二段、y=标签第一段。
# 因此 x=13,y=4 应落到 key=4-13而不是交换后落到 3-5。
x = loc.get("x", 1)
y = loc.get("y", 1)
z = loc.get("z", 1) # 层号 (1-based, 通常为1)
# 仓库级别的轴约定覆盖。
# 对旧的 row-col 视觉标签bioyond_axis="xy_col_row" 需要交换 x/y。
# 对 Sirna 的 col-row 库位标签,原始 x/y 已能直接索引到 code 对应位置,不再交换。
bioyond_axis = getattr(warehouse, "bioyond_axis", "xy_row_col")
bioyond_key_axis = getattr(warehouse, "bioyond_key_axis", "row_col")
if bioyond_axis == "xy_col_row" and bioyond_key_axis != "col_row":
x, y = y, x
# 如果是右侧堆栈,需要调整列号 (5→1, 6→2, 7→3, 8→4)
if wh_name == "堆栈1右":
y = y - 4 # 将5-8映射到1-4
@@ -912,10 +923,43 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
logger.debug(f"列优先warehouse {wh_name}: x={x}(行),y={y}(列) → row={row_idx},col={col_idx} → idx={idx}")
if 0 <= idx < warehouse.capacity:
if warehouse[idx] is None or isinstance(warehouse[idx], ResourceHolder):
slot_key = None
ordering = getattr(warehouse, "_ordering", {})
sites = getattr(warehouse, "sites", [])
if isinstance(ordering, dict) and idx < len(sites):
site_at_idx = sites[idx]
slot_key = next(
(key for key, site in ordering.items() if site is site_at_idx),
None,
)
current_resource = warehouse[idx]
if current_resource is None or isinstance(current_resource, (ResourceHolder, str)):
if isinstance(current_resource, str):
logger.warning(
f"⚠️ 物料 {unique_name} 覆盖 {wh_name}[{idx}]"
f"{f'({slot_key})' if slot_key else ''} 的旧占位 occupied_by={current_resource!r}"
)
# 物料尺寸已在放入warehouse前根据需要进行了交换
warehouse[idx] = plr_material
logger.debug(f"✅ 物料 {unique_name} 放置到 {wh_name}[{idx}] (Bioyond坐标: x={loc.get('x')}, y={loc.get('y')})")
logger.debug(
f"✅ 物料 {unique_name} 放置到 {wh_name}[{idx}]"
f"{f'({slot_key})' if slot_key else ''} "
f"(Bioyond坐标: x={loc.get('x')}, y={loc.get('y')})"
)
else:
parent = getattr(current_resource, "parent", None)
current_repr = repr(current_resource)
current_len = len(current_resource) if isinstance(current_resource, str) else None
logger.warning(
f"⚠️ 物料 {unique_name} 跳过放置到 {wh_name}[{idx}]"
f"{f'({slot_key})' if slot_key else ''}:目标库位已有 "
f"{type(current_resource).__name__}"
f"(value={current_repr}, len={current_len})"
f"(name={getattr(current_resource, 'name', None)}, "
f"parent={getattr(parent, 'name', None)}, "
f"uuid={getattr(current_resource, 'unilabos_uuid', None)})"
)
else:
logger.warning(f"❌ 物料 {unique_name} 的索引 {idx} 超出仓库 {wh_name} 容量 {warehouse.capacity}")
else:

View File

@@ -18,3 +18,7 @@ def register():
from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend
from unilabos.devices.liquid_handling.laiyu.backend.laiyu_v_backend import UniLiquidHandlerLaiyuBackend
# noinspection PyUnresolvedReferences
from unilabos.resources.bioyond.decks import BIOYOND_SirnaStation_Deck
# noinspection PyUnresolvedReferences
from unilabos.resources.bioyond.decks import BIOYOND_PeptideStation_Deck

View File

@@ -1971,10 +1971,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
mapped_plr_resources = []
for uuid in uuids_list:
found = None
for plr_resource in figured_resources:
r = self.resource_tracker.loop_find_with_uuid(plr_resource, uuid)
mapped_plr_resources.append(r)
break
if r is not None:
found = r
break
if found is None:
raise Exception(f"未能在已解析的资源树中找到 uuid={uuid} 对应的资源")
mapped_plr_resources.append(found)
return mapped_plr_resources

View File

@@ -33,10 +33,76 @@ _USE_UV: Optional[bool] = None
def _has_uv() -> bool:
global _USE_UV
if _USE_UV is None:
_USE_UV = shutil.which("uv") is not None
uv_path = shutil.which("uv")
if not uv_path:
_USE_UV = False
else:
try:
result = subprocess.run([uv_path, "--version"], capture_output=True, text=True, timeout=10)
_USE_UV = result.returncode == 0
except Exception:
_USE_UV = False
return _USE_UV
def _install_command(installer: str, package: str, upgrade: bool, is_chinese: bool) -> List[str]:
if installer == "uv":
cmd = ["uv", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(package)
if is_chinese:
cmd.extend(["--index-url", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
return cmd
cmd = [sys.executable, "-m", "pip", "install", "--disable-pip-version-check"]
if upgrade:
cmd.append("--upgrade")
cmd.append(package)
if is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
return cmd
def _installer_candidates() -> List[str]:
installers: List[str] = []
if _has_uv():
installers.append("uv")
installers.append("pip")
return installers
def _git_url_from_requirement(requirement: str) -> Optional[str]:
if not requirement.startswith("git+"):
return None
return requirement[4:].split("#", 1)[0]
def _repo_dir_name(git_url: str) -> str:
repo_name = git_url.rstrip("/").rsplit("/", 1)[-1]
return repo_name[:-4] if repo_name.endswith(".git") else repo_name
def _print_manual_git_install_hint(requirement: str) -> None:
git_url = _git_url_from_requirement(requirement)
if not git_url:
return
repo_dir = _repo_dir_name(git_url)
install_cmd = "uv pip install -e ." if _has_uv() else f"{sys.executable} -m pip install -e ."
if _is_chinese_locale() and not _has_uv():
install_cmd += " -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"
print_status("Git 依赖自动安装失败,通常是网络连接被重置或代码托管站点暂时不可达。", "warning")
print_status("可以手动拉取代码后在本地安装:", "warning")
print_status(f" git clone {git_url}", "warning")
print_status(f" cd {repo_dir}", "warning")
print_status(" git pull", "warning")
print_status(f" {install_cmd}", "warning")
print_status(f"如果目录 {repo_dir} 已存在,直接进入该目录执行 git pull 后再安装。", "warning")
print_status("如果 git clone 仍失败,请切换网络/代理,或从浏览器下载源码后进入源码目录执行本地安装命令。", "warning")
def _install_packages(
packages: List[str],
upgrade: bool = False,
@@ -53,7 +119,7 @@ def _install_packages(
return True
is_chinese = _is_chinese_locale()
use_uv = _has_uv()
installers = _installer_candidates()
failed: List[str] = []
for pkg in packages:
@@ -63,35 +129,30 @@ def _install_packages(
else:
print_status(f"正在{action_word} {pkg}...", "info")
if use_uv:
cmd = ["uv", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["--index-url", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
else:
cmd = [sys.executable, "-m", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
pkg_installed = False
last_error = "unknown error"
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
installer = "uv" if use_uv else "pip"
print_status(f"{pkg} {action_word}成功 (via {installer})", "success")
else:
stderr_short = result.stderr.strip().split("\n")[-1] if result.stderr else "unknown error"
print_status(f"× {pkg} {action_word}失败: {stderr_short}", "error")
failed.append(pkg)
except subprocess.TimeoutExpired:
print_status(f"× {pkg} {action_word}超时 (300s)", "error")
failed.append(pkg)
except Exception as e:
print_status(f"× {pkg} {action_word}异常: {e}", "error")
for installer in installers:
cmd = _install_command(installer, pkg, upgrade, is_chinese)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
print_status(f"{pkg} {action_word}成功 (via {installer})", "success")
pkg_installed = True
break
last_error = result.stderr.strip().split("\n")[-1] if result.stderr else "unknown error"
print_status(f"× {pkg} {action_word}失败 (via {installer}): {last_error}", "warning")
except subprocess.TimeoutExpired:
last_error = "timeout after 300s"
print_status(f"× {pkg} {action_word}超时 (via {installer}, 300s)", "warning")
except Exception as e:
last_error = str(e)
print_status(f"× {pkg} {action_word}异常 (via {installer}): {e}", "warning")
if not pkg_installed:
print_status(f"× {pkg} {action_word}失败: {last_error}", "error")
_print_manual_git_install_hint(pkg)
failed.append(pkg)
if failed:
@@ -188,7 +249,13 @@ class EnvironmentChecker:
"crcmod": "crcmod-plus",
}
self.special_packages = {"pylabrobot": "git+https://github.com/Xuwznln/pylabrobot.git"}
# 中文 locale 下走 Gitee 镜像,规避 GitHub 拉取失败
pylabrobot_url = (
"git+https://gitee.com/xuwznln/pylabrobot.git"
if _is_chinese_locale()
else "git+https://github.com/Xuwznln/pylabrobot.git"
)
self.special_packages = {"pylabrobot": pylabrobot_url}
self.version_requirements = {
"msgcenterpy": "0.1.8",

View File

@@ -206,6 +206,7 @@ class ImportManager:
"ast_analysis_success": False,
"import_map": {},
"init_params": [],
"init_docstring": None,
"status_methods": {},
"action_methods": {},
}
@@ -251,6 +252,7 @@ class ImportManager:
# 映射到统一字段名(与 registry.py complete_registry 消费端一致)
result["init_params"] = body.get("init_params", [])
result["init_docstring"] = body.get("init_docstring")
result["status_methods"] = body.get("status_properties", {})
result["action_methods"] = {
k: {

View File

@@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>unilabos_msgs</name>
<version>0.10.19</version>
<version>0.11.1</version>
<description>ROS2 Messages package for unilabos devices</description>
<maintainer email="changjh@pku.edu.cn">Junhan Chang</maintainer>
<maintainer email="18435084+Xuwznln@users.noreply.github.com">Xuwznln</maintainer>