Compare commits

..

5 Commits

Author SHA1 Message Date
hanhua@dp.tech
6cc693ed5f delete day3 default params 2026-05-22 16:38:48 +08:00
hanhua@dp.tech
c2c986105d update reset 2026-05-22 11:32:31 +08:00
yxz321
f14e1bc4a0 plan: add modify reset plan 2026-05-21 19:53:58 +08:00
yxz321
247a0ee4c6 add and expose sync_from_external
从奔曜同步本地资源树+publish到deck
2026-05-21 19:37:02 +08:00
yxz321
a084031af0 improve: 使用调度状态接口做健康检查
- 将 Bioyond 启动健康检查从物料类型列表切换为 scheduler_status()。
- 避免 debug 日志输出完整物料类型列表。
- 保持连接监控响应更轻量、更易读。
2026-05-21 15:59:17 +08:00
5 changed files with 1679 additions and 138 deletions

View File

@@ -0,0 +1,461 @@
# Peptide Four-Checkbox Reset Plan
Date: 2026-05-21 16:30
Status: Proposal only / not executed
## Scope
This plan replaces `2026-05-21_1556_peptide_reset_sirna_reference_plan.md` for Peptide reset work.
User direction captured here:
- `take_out` is unnecessary for Peptide reset.
- Do not add a material-cache refresh checkbox.
- Change reset to four checkbox-controlled operations:
- 调度器复位
- 订单状态复位
- 库位复位
- 仪器复位
- The first three checkboxes default to checked.
- The fourth checkbox, 仪器复位 / `reset_devices`, defaults to unchecked.
- Replace the current public `reset` action with:
- `reset_auto`: normal ILab action node. This is the renamed/replaced version of the current reset implementation.
- `reset_manual`: manual-confirm action node with a physical cleanup confirmation message.
## Evidence Summary
Current Peptide source:
- Reset action code is currently in `unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py`.
- Current Peptide reset selects `scheduler_reset`, `reset_order_status`, and `reset_location`, and passes ids to order/location resets.
- `BioyondV1RPC.reset_devices()` already calls `/api/lims/device/reset-devices` with only `apiKey` and `requestTime`.
- `BioyondV1RPC.scheduler_reset()` already calls `/api/lims/scheduler/reset` with only `apiKey` and `requestTime`.
- `BioyondV1RPC.reset_order_status(order_id)` and `reset_location(location_id)` currently send `data`, but live probes showed that omitted `data` succeeds.
Live Peptide no-data reset probes using `temp_benyao/peptide/peptide_station_config.example.json`:
- `POST /api/lims/order/reset-order-status` with request keys `["apiKey", "requestTime"]` returned HTTP 200 and `code=1`.
- `POST /api/lims/scheduler/reset` with request keys `["apiKey", "requestTime"]` returned HTTP 200 and `code=1`.
- `POST /api/lims/storage/reset-location` with request keys `["apiKey", "requestTime"]` returned HTTP 200 and `code=1`.
- `reset-devices` was not live-probed in this session, but the current RPC wrapper already sends no `data`.
Raw findings:
- `temp_benyao/peptide/_findings/2026-05-21_1613_reset_order_status_no_data_live.md`
- `temp_benyao/peptide/_findings/2026-05-21_1615_remaining_resets_no_data_live.md`
## Proposed Public Actions
### `reset_auto`
Normal action node. This is the auto/no-manual-confirm path. It replaces the current public `reset` action; do not leave a second public `reset` action unless a later compatibility request explicitly asks for an alias.
Checkbox schema rule:
- Use plain `bool` annotations in the action signature.
- Do not use `Annotated[bool, Field(...)]` for these checkbox params in this implementation plan.
- The current AST registry schema path does not unwrap `Annotated[...]`; plain `bool` is required so generated JSON Schema marks the fields as boolean and the renderer can show checkboxes.
- Put human-facing labels/descriptions in the method docstring or action description. If field-level `Field(description=...)` metadata is required later, add registry `Annotated` support and a schema test as a separate change.
Decorator shape:
```python
@action(
always_free=True,
goal_default={
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
},
description="自动复位调度器/订单状态/库位,可选仪器复位",
)
def reset_auto(
self,
reset_scheduler: bool = True,
reset_order_status: bool = True,
reset_location: bool = True,
reset_devices: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
"""自动复位调度器/订单状态/库位,可选仪器复位。
Args:
reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
"""
...
```
Implementation notes:
- Use real plain-`bool` parameters, not hidden `**kwargs` and not `Annotated`, so the action renderer can expose four checkboxes.
- Rename/replace the existing `reset` action as `reset_auto`; the implementation should not keep the old id-shaped `reset` action as another public path by default.
- Keep the three routine reset defaults checked.
- Keep `reset_devices` unchecked because it can be broader and more disruptive.
- Do not require or resolve order ids or location ids.
- Do not call `take_out`.
- Do not call `refresh_material_cache`.
### `reset_manual`
Manual-confirm node. It should show the operator a physical cleanup warning, then execute the same reset helper as `reset_auto` after the operator confirms.
Actual manual-confirm decorator pattern in this repo:
- Use `@action(node_type=NodeType.MANUAL_CONFIRM)`.
- Set `always_free=True`.
- Add `placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}`.
- Include `timeout_seconds: int` and `assignee_user_ids: list[str]`.
- Add `goal_default` for `timeout_seconds` and `assignee_user_ids`.
- Manual-confirm actions are normally side-effect-light, but existing Peptide `start_experiment` is already a `MANUAL_CONFIRM` action that performs scheduler start after the operator gate, so a reset-after-confirm pattern is compatible with current Peptide style.
Proposed confirmation text:
```text
请确认G3、CEM、Tecan、撕膜机、封膜机、打标机、旋转堆栈上下料位、3个转台等位置的物料已清理完毕
请开门检查冰箱、IDOT、酶标仪、离心机、LCMS内部没有遗留物料。
```
Decorator/function shape:
```python
RESET_MANUAL_CONFIRM_MESSAGE = (
"请确认G3、CEM、Tecan、撕膜机、封膜机、打标机、旋转堆栈上下料位、3个转台等位置的物料已清理完毕\n"
"请开门检查冰箱、IDOT、酶标仪、离心机、LCMS内部没有遗留物料。"
)
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
"physical_cleanup_confirmed": False,
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description=RESET_MANUAL_CONFIRM_MESSAGE,
)
def reset_manual(
self,
reset_scheduler: bool = True,
reset_order_status: bool = True,
reset_location: bool = True,
reset_devices: bool = False,
physical_cleanup_confirmed: bool = False,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""人工确认物理清理后执行复位。
Args:
reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
physical_cleanup_confirmed[物理清理确认]: 确认清理提示中的物料检查已经完成,默认不勾选。
"""
...
```
Execution rule:
- If `physical_cleanup_confirmed` is false, return a blocked result and do not call any reset API.
- If it is true, call the same internal helper as `reset_auto`.
- Return `confirmation_message` in the result payload so call logs preserve the exact operator instruction text.
Renderer caveat:
- `description` should carry the warning in generated action metadata.
- `physical_cleanup_confirmed` must remain a plain `bool` so it renders as a checkbox.
- The cleanup warning should be carried by the action `description` and the docstring param description. Do not rely on `Field(description=...)` unless registry `Annotated` support has been implemented and tested.
- If the current frontend does not show action descriptions or docstring field descriptions reliably, add a read-only string parameter such as `confirmation_message: str = RESET_MANUAL_CONFIRM_MESSAGE` with `goal_default`, or use a handle-based display only after renderer behavior is verified.
## Shared Internal Helper
Both public actions should delegate to one helper, for example:
```python
def _execute_reset_operations(
self,
*,
reset_scheduler: bool,
reset_order_status: bool,
reset_location: bool,
reset_devices: bool,
) -> Dict[str, Any]:
...
```
Call order:
1. `scheduler_reset`
2. `reset_order_status`
3. `reset_location`
4. `reset_devices`
Result shape:
```python
{
"selected_operations": [
{"key": "reset_scheduler", "label": "调度器复位", "selected": True},
{"key": "reset_order_status", "label": "订单状态复位", "selected": True},
{"key": "reset_location", "label": "库位复位", "selected": True},
{"key": "reset_devices", "label": "仪器复位", "selected": False},
],
"executed_calls": [
{"operation": "scheduler_reset", "endpoint": "/api/lims/scheduler/reset", "result": {"code": 1}},
],
"skipped_operations": [
{"operation": "reset_devices", "reason": "checkbox_disabled"},
],
"warnings": [],
}
```
Failure handling:
- Execute selected operations sequentially and record each result.
- If an operation returns non-`1` code, add a warning and continue unless the caller later requests fail-fast.
- If an RPC method raises, catch it, record an error entry, and continue to the next selected operation unless fail-fast is introduced.
## RPC Wrapper Adjustment
Adjust the two id-shaped wrappers to no-data calls:
- `BioyondV1RPC.reset_order_status()` should no longer require `order_id`.
- `BioyondV1RPC.reset_location()` should no longer require `location_id`.
Current no-data wrappers already exist:
- `scheduler_reset()`
- `reset_devices()`
Suggested RPC signatures:
```python
def scheduler_reset(self) -> int: ...
def reset_order_status(self) -> int: ...
def reset_location(self) -> int: ...
def reset_devices(self) -> int: ...
```
Compatibility option:
```python
def reset_order_status(self, order_id: Optional[str] = None) -> int:
del order_id
...
def reset_location(self, location_id: Optional[str] = None) -> int:
del location_id
...
```
This keeps older code from crashing while making the actual wire request no-data.
## Adjusted Runtime API Schemas
These are the schemas Peptide reset code should target at runtime after the live no-data probes. They intentionally omit `data`, even though OpenAPI models nullable `data` for these endpoints.
All four requests use:
```json
{
"apiKey": "string",
"requestTime": "date-time"
}
```
No `data` field should be sent by default.
All four responses use:
```json
{
"code": 1,
"message": "",
"timestamp": 0
}
```
### 调度器复位
Endpoint:
```text
POST /api/lims/scheduler/reset
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-21T08:15:16.494Z"
}
```
Live response:
```json
{
"code": 1,
"message": "",
"timestamp": 1779351316072
}
```
Notes:
- OpenAPI says `data` is nullable int32.
- Live Peptide accepted omitted `data`.
### 订单状态复位
Endpoint:
```text
POST /api/lims/order/reset-order-status
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-21T08:13:34.750Z"
}
```
Live response:
```json
{
"code": 1,
"message": "",
"timestamp": 1779351214422
}
```
Notes:
- OpenAPI says `data` is nullable string.
- Live Peptide accepted omitted `data`.
- Do not model this as order-id scoped unless Bioyond confirms backend behavior.
### 库位复位
Endpoint:
```text
POST /api/lims/storage/reset-location
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "2026-05-21T08:15:18.924Z"
}
```
Live response:
```json
{
"code": 1,
"message": "",
"timestamp": 1779351318565
}
```
Notes:
- OpenAPI says `data` is nullable string.
- Live Peptide accepted omitted `data`.
- Do not model this as location-id scoped unless Bioyond confirms backend behavior.
### 仪器复位
Endpoint:
```text
POST /api/lims/device/reset-devices
```
Adjusted request:
```json
{
"apiKey": "B10B5995",
"requestTime": "date-time"
}
```
Expected response shape:
```json
{
"code": 1,
"message": "",
"timestamp": 0
}
```
Notes:
- OpenAPI says `data` is nullable string.
- Current `BioyondV1RPC.reset_devices()` already sends no `data`.
- This endpoint was not live-probed in the no-data reset session.
- Keep checkbox default unchecked.
## Tests To Add Before Implementation
1. `reset_auto` is not `NodeType.MANUAL_CONFIRM`.
2. `reset_manual` has `node_type=NodeType.MANUAL_CONFIRM`.
3. `reset_manual` metadata includes:
- `always_free=True`
- `placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}`
- `timeout_seconds=3600`
- `assignee_user_ids=[]`
- `physical_cleanup_confirmed=False`
4. Both reset actions expose four real boolean params:
- `reset_scheduler`
- `reset_order_status`
- `reset_location`
- `reset_devices`
5. The generated registry schema marks those reset params as JSON Schema `type: boolean`, not `object` or `string`, so the frontend can render checkboxes.
6. `reset_auto` replaces the current public `reset` action. Unless a later compatibility request adds an alias, no old id-shaped public `reset` action remains.
7. Goal defaults are:
- first three reset checkboxes `True`
- `reset_devices=False`
8. `reset_manual(..., physical_cleanup_confirmed=False)` does not call any RPC reset method.
9. `reset_auto()` with defaults calls:
- `scheduler_reset()`
- `reset_order_status()`
- `reset_location()`
- not `reset_devices()`
10. `reset_auto(reset_devices=True)` also calls `reset_devices()`.
11. `reset_order_status()` and `reset_location()` RPC wrappers send no `data` key.
12. No reset path calls `take_out`.
13. No reset path calls `refresh_material_cache`.
## Non-Goals
- Do not implement `take_out` in reset.
- Do not refresh `material_cache` from reset.
- Do not resolve order ids or location ids for reset.
- Do not add Project/cache/browser cleanup routes.
- Do not make `reset_devices` default-on.
- Do not execute this plan during planning.

View File

@@ -415,21 +415,25 @@ class BioyondV1RPC(BaseRequest):
return {}
return response.get("data", {})
def reset_location(self, location_id: str) -> int:
def reset_location(self, location_id: Optional[str] = None) -> int:
"""复位库位
现场实测 ``POST /api/lims/storage/reset-location`` 不传 ``data`` 即可成功
(见 ``temp_benyao/peptide/_findings/2026-05-21_1615_remaining_resets_no_data_live.md``
因此默认无 ``data`` 字段;保留 ``location_id`` 仅为兼容旧调用,传入会被忽略。
参数:
location_id: 库位ID
location_id: 兼容入参,已被忽略;新逻辑不再以 location 为粒度复位。
返回值:
int: 成功返回1失败返回0
"""
del location_id
response = self.post(
url=f'{self.host}/api/lims/storage/reset-location',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": location_id,
})
if not response or response['code'] != 1:
return 0
@@ -929,21 +933,25 @@ class BioyondV1RPC(BaseRequest):
return {}
return response.get("data", {})
def reset_order_status(self, order_id: str) -> int:
def reset_order_status(self, order_id: Optional[str] = None) -> int:
"""复位订单状态
现场实测 ``POST /api/lims/order/reset-order-status`` 不传 ``data`` 即可成功
(见 ``temp_benyao/peptide/_findings/2026-05-21_1613_reset_order_status_no_data_live.md``
因此默认无 ``data`` 字段;保留 ``order_id`` 仅为兼容旧调用,传入会被忽略。
参数:
order_id: 订单ID
order_id: 兼容入参,已被忽略;新逻辑不再以单订单为粒度复位。
返回值:
int: 成功返回1失败返回0
"""
del order_id
response = self.post(
url=f'{self.host}/api/lims/order/reset-order-status',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return 0

View File

@@ -8,10 +8,12 @@ import copy
import json
import mimetypes
import sys
import threading
import time
from contextlib import nullcontext
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated, Any, Dict, Iterable, List, Literal, Optional, Tuple
from typing import Annotated, Any, Dict, Iterable, List, Optional, Tuple
from uuid import UUID
import requests
@@ -94,13 +96,46 @@ except Exception as exc: # pragma: no cover
DEBUG_CLI_ENABLED = False
DEFAULT_RESET_OPERATIONS = ("scheduler_reset", "reset_order_status", "reset_location")
RESET_OPERATION_KEYS: Tuple[str, ...] = (
"reset_scheduler",
"reset_order_status",
"reset_location",
"reset_devices",
)
RESET_OPERATION_LABELS: Dict[str, str] = {
"reset_scheduler": "调度器复位",
"reset_order_status": "订单状态复位",
"reset_location": "库位复位",
"reset_devices": "仪器复位",
}
RESET_OPERATION_ENDPOINTS: Dict[str, str] = {
"reset_scheduler": "/api/lims/scheduler/reset",
"reset_order_status": "/api/lims/order/reset-order-status",
"reset_location": "/api/lims/storage/reset-location",
"reset_devices": "/api/lims/device/reset-devices",
}
RESET_MANUAL_CONFIRM_MESSAGE = (
"请确认G3、CEM、Tecan、撕膜机、封膜机、打标机、旋转堆栈上下料位、3个转台等位置的物料已清理完毕\n"
"请开门检查冰箱、IDOT、酶标仪、离心机、LCMS内部没有遗留物料。"
)
RESULT_TABLE_COLUMNS = [
{"name": "设备", "key": "whName"},
{"name": "位置", "key": "locationCode"},
{"name": "物料名称", "key": "materialName"},
{"name": "数量", "key": "quantity"},
]
UNLOAD_TABLE_COLUMNS = [
{"name": "仓库名称", "key": "whName"},
{"name": "坐标 X", "key": "posX"},
{"name": "坐标 Y", "key": "posY"},
{"name": "坐标 Z", "key": "posZ"},
{"name": "单位", "key": "unit"},
{"name": "物料名称", "key": "materialName"},
]
UNLOAD_TABLE_COLUMNS_MULTI_ORDER = [
{"name": "订单编号", "key": "orderCode"},
*UNLOAD_TABLE_COLUMNS,
]
MATERIAL_TYPE_ORDER = ("Sample", "Consumables", "Reagent")
PEPTIDE_SAMPLE_FILE_KEY = "SampleFile"
DAY1_CEM_METHOD_KEY = "CEMMethodFileName"
@@ -133,8 +168,12 @@ class PeptideCommonSubmitOptionalParams(TypedDict, total=False):
parameter_overrides: Annotated[
List[Dict[str, Any]],
Field(
default=[{"m": 0, "n": 0, "Key": "Example", "Value": "example value"}],
description="参数覆盖列表Key 和 Value 必填m/n 可选;省略 m/n 时 Key 必须唯一匹配。",
default_factory=list,
description=(
"参数覆盖列表,默认留空(不覆盖)。"
"如需覆盖子工作流某个步骤参数,按 [{\"Key\": \"参数名\", \"Value\": \"\", \"m\": 0, \"n\": 0}] 格式填写。"
"Key 必须与 Bioyond 子工作流里某个 step 参数名精确匹配m/n 可选,省略时 Key 在工作流内必须唯一。"
),
),
]
border_number: Annotated[int, Field(default=1, description="LIMS 创建订单 borderNumber默认 1。")]
@@ -291,6 +330,13 @@ class BioyondPeptideStation(BioyondWorkstation):
self.protocol_type = protocol_type
self.bioyond_config = merged_config
super().__init__(bioyond_config=self.bioyond_config, deck=deck)
# 订单完成报送等待机制(多肽场景):
# - last_order_code 记录当前正在等待的 orderCode业务编号用于回调侧多订单隔离
# - last_order_report 缓存最近一次匹配到的 report.data
# - order_finish_event 用于阻塞等待 + 唤醒 wait_for_order_finish 动作
self.order_finish_event = threading.Event()
self.last_order_code: Optional[str] = None
self.last_order_report: Optional[Dict[str, Any]] = None
logger.info("BioyondPeptideStation 初始化完成: %s", self.bioyond_config.get("api_host", ""))
def _debug_call_session(self, action_name: str):
@@ -722,6 +768,9 @@ class BioyondPeptideStation(BioyondWorkstation):
ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.HANDLE, io_type="source"),
ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.EXECUTOR),
],
)
def start_experiment(
@@ -738,66 +787,337 @@ class BioyondPeptideStation(BioyondWorkstation):
if table_rows and not materials_loaded:
raise RuntimeError("多肽物料装载未确认,拒绝启动调度器")
result = self._run_scheduler_action("scheduler_start", "启动")
result["order_id"] = resolved_order_ids[0] if resolved_order_ids else str(order_id or "")
result["order_ids"] = resolved_order_ids
result["materials_loaded"] = bool(materials_loaded)
result["resultTable"] = resultTable or {}
return result
def process_order_finish_report(self, report_request, used_materials: Optional[List[Any]] = None) -> Dict[str, Any]:
"""处理 LIMS /report/order_finish 推送:保留父类语义,并按 orderCode 唤醒等待动作。
说明:
- 工作站 HTTP 服务为进程级单例,所有 wait 节点共用同一条推送通道;
需要按 ``self.last_order_code`` 过滤,避免别的订单 push 错误唤醒当前等待。
"""
materials = used_materials or []
try:
result = super().process_order_finish_report(report_request, materials)
except Exception as exc:
logger.error("基类 process_order_finish_report 失败: %s", exc, exc_info=True)
result = {"processed": False, "error": str(exc)}
try:
data = getattr(report_request, "data", {}) or {}
report_order_code = str(data.get("orderCode") or "")
self.last_order_report = data
expected = self.last_order_code
logger.info(
"[peptide.order_finish] 收到 orderCode=%s 期望=%s status=%s",
report_order_code,
expected,
data.get("status"),
)
if expected and report_order_code and expected == report_order_code:
self.order_finish_event.set()
logger.info("[peptide.order_finish] orderCode 匹配,已触发 order_finish_event")
elif expected and report_order_code and expected != report_order_code:
logger.warning(
"[peptide.order_finish] orderCode 不匹配,忽略本次 push (期望=%s 实际=%s)",
expected,
report_order_code,
)
except Exception as exc: # pragma: no cover - 仅为防御
logger.error("[peptide.order_finish] 触发 event 失败: %s", exc, exc_info=True)
return result
@action(
always_free=True,
description="等待订单完成回调并预生成下料表",
handles=[
ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_code", data_type="str", label="订单编号", data_key="order_code", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_finish_status", data_type="str", label="订单完成状态", data_key="order_finish_status", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_finish_report", data_type="json", label="订单完成报文", data_key="order_finish_report", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="used_materials", data_type="json", label="使用物料列表", data_key="used_materials", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="material_ids", data_type="json", label="物料ID列表", data_key="material_ids", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="preintake_ids", data_type="json", label="通量ID列表", data_key="preintake_ids", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="unloadTable", data_type="table", label="下料表", data_key="unloadTable", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="unload_summary", data_type="json", label="下料摘要", data_key="unload_summary", data_source=DataSource.EXECUTOR),
],
)
def wait_for_order_finish(
self,
order_id: str = "",
order_ids: Optional[List[str]] = None,
order_code: str = "",
timeout_seconds: int = 36000,
poll_mode: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
"""阻塞等待 LIMS 订单完成回调,并基于 usedMaterials 预生成下料表。
- 多订单 ``order_ids`` 时按顺序逐个等;任何一个 ``abnormal_stop`` 立即返回。
- 节点 1 在此就把 ``unloadTable`` 组装好(前端 manual_confirm 弹窗在节点 2
中通过 ``getPreviousNodeResult`` 拿前一个节点 param 渲染)。
"""
with self._debug_call_session("wait_for_order_finish"):
resolved_order_ids = self._extract_order_ids(order_id=order_id, order_ids=order_ids, **kwargs)
order_code_input = str(order_code or "").strip()
if not resolved_order_ids and not order_code_input:
raise PeptideWorkflowError("wait_for_order_finish 至少需要 order_id/order_ids/order_code 之一")
material_info_cache: Dict[str, Dict[str, Any]] = {}
missing_material_info: List[str] = []
unload_rows: List[Dict[str, Any]] = []
used_materials_total: List[Dict[str, Any]] = []
material_ids_total: List[str] = []
preintake_ids_total: List[str] = []
order_codes_seen: List[str] = []
last_status: str = ""
last_report: Dict[str, Any] = {}
multi_order = len(resolved_order_ids) > 1 or (resolved_order_ids and order_code_input)
wait_targets: List[Tuple[str, str]] = []
if resolved_order_ids:
for oid in resolved_order_ids:
code_for_oid = self._resolve_order_code(oid, fallback=order_code_input if len(resolved_order_ids) == 1 else "")
wait_targets.append((oid, code_for_oid))
else:
wait_targets.append(("", order_code_input))
for oid, code_for_oid in wait_targets:
if not code_for_oid:
raise PeptideWorkflowError(
f"wait_for_order_finish 无法解析 orderCode (order_id={oid!r})"
)
order_codes_seen.append(code_for_oid)
wait_result = self._wait_single_order_finish(code_for_oid, timeout_seconds, poll_mode=poll_mode)
last_status = wait_result["status"]
last_report = wait_result["report"] or {}
used_materials = self._extract_used_materials(last_report)
used_materials_total.extend(used_materials)
material_ids_total.extend(self._collect_material_ids(used_materials))
preintake_ids_total.extend(self._collect_preintake_ids(used_materials))
rows = self._build_unload_rows(
used_materials,
material_info_cache=material_info_cache,
missing_material_info=missing_material_info,
order_code=code_for_oid if multi_order else None,
)
unload_rows.extend(rows)
if last_status == "timeout":
break
if last_status == "abnormal_stop":
break
unload_table = self._compose_unload_table(unload_rows, multi_order=multi_order)
unload_summary = {
"order_codes": order_codes_seen,
"total_items": len(unload_rows),
"missing_material_info": list(dict.fromkeys(missing_material_info)),
}
primary_order_id = resolved_order_ids[0] if resolved_order_ids else ""
primary_order_code = order_codes_seen[0] if order_codes_seen else order_code_input
return {
"success": last_status == "success",
"order_id": primary_order_id,
"order_ids": resolved_order_ids,
"order_code": primary_order_code,
"order_codes": order_codes_seen,
"order_finish_status": last_status,
"order_finish_report": last_report,
"used_materials": used_materials_total,
"material_ids": list(dict.fromkeys(material_ids_total)),
"preintake_ids": list(dict.fromkeys(preintake_ids_total)),
"unloadTable": unload_table,
"unload_summary": unload_summary,
}
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={"materials_unloaded": False, "timeout_seconds": 3600, "assignee_user_ids": []},
feedback_interval=300,
description="确认人工下料完成后调用 take-out 通知奔耀同步状态",
handles=[
ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="material_ids", data_type="json", label="物料ID列表", data_key="material_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="preintake_ids", data_type="json", label="通量ID列表", data_key="preintake_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="unloadTable", data_type="table", label="下料表", data_key="unloadTable", data_source=DataSource.HANDLE, io_type="source"),
ActionOutputHandle(key="take_out_result", data_type="json", label="取出接口结果", data_key="take_out_result", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="unloaded_count", data_type="int", label="同步物料数量", data_key="unloaded_count", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="success", data_type="bool", label="同步是否成功", data_key="success", data_source=DataSource.EXECUTOR),
],
)
def unload_materials(
self,
order_id: str = "",
material_ids: Optional[List[str]] = None,
preintake_ids: Optional[List[str]] = None,
unloadTable: Optional[Dict[str, Any]] = None,
materials_unloaded: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
"""节点 2人工下料 manual_confirm 解锁后调用 take-out 通知奔耀同步状态。
时序:操作员物理下料 → 勾选 ``materials_unloaded=True`` → 批准 →
manual_confirm 解除阻塞 → 此处调用 ``take-out`` 让奔耀清空对应库位状态。
"""
del unloadTable, kwargs # unloadTable 仅供前端弹窗渲染,本节点函数体不消费
with self._debug_call_session("unload_materials"):
if not bool(materials_unloaded):
raise RuntimeError("下料未确认,拒绝结束节点")
resolved_order_id = str(order_id or "").strip()
if not resolved_order_id:
raise PeptideWorkflowError("unload_materials 缺少 order_id")
material_ids_list = [str(item) for item in (material_ids or []) if item]
preintake_ids_list = [str(item) for item in (preintake_ids or []) if item]
rpc = self._require_hardware_interface()
try:
take_out_result = rpc.take_out(
resolved_order_id,
preintake_ids=preintake_ids_list,
material_ids=material_ids_list,
) or {}
except Exception as exc:
logger.warning(
"take_out 调用异常 order_id=%s material_ids=%s: %s",
resolved_order_id,
material_ids_list,
exc,
)
take_out_result = {"code": 0, "message": f"take_out_invoke_failed: {exc}"}
code_value = take_out_result.get("code") if isinstance(take_out_result, dict) else None
success = bool(isinstance(take_out_result, dict) and code_value == 1)
if not success:
logger.warning(
"take_out 业务失败,未阻塞工作流,请人工核对奔耀库位 order_id=%s response=%s",
resolved_order_id,
take_out_result,
)
return {
"success": success,
"order_id": resolved_order_id,
"material_ids": material_ids_list,
"preintake_ids": preintake_ids_list,
"unloaded_count": len(material_ids_list),
"take_out_result": take_out_result,
}
@action(
always_free=True,
goal_default={
"reset_operations": ["scheduler_reset", "reset_order_status", "reset_location"],
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
},
description="复位调度器/订单/库",
description="自动复位调度器/订单状态/库位,可选仪器复",
)
def reset(
def reset_auto(
self,
reset_operations: Optional[
List[Literal["scheduler_reset", "reset_order_status", "reset_location"]]
] = None,
reset_scheduler: bool = True,
reset_order_status: bool = True,
reset_location: bool = True,
reset_devices: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
with self._debug_call_session("reset"):
operations = self._normalize_reset_operations(reset_operations)
result: Dict[str, Any] = {
"selected_operations": operations,
"executed_calls": [],
"skipped_operations": [],
}
rpc = self._require_hardware_interface()
for operation in operations:
if operation == "scheduler_reset":
code = rpc.scheduler_reset()
result["executed_calls"].append({"operation": operation, "result": {"code": code}})
elif operation == "reset_order_status":
resolved = str(
kwargs.get("reset_order_id") or kwargs.get("order_id") or ""
).strip()
if not resolved:
result["skipped_operations"].append(
{"operation": operation, "reason": "缺少 order_id/reset_order_id"}
)
continue
code = rpc.reset_order_status(resolved)
result["executed_calls"].append(
{"operation": operation, "order_id": resolved, "result": {"code": code}}
)
elif operation == "reset_location":
resolved = str(
kwargs.get("reset_location_id") or kwargs.get("location_id") or ""
).strip()
if not resolved:
result["skipped_operations"].append(
{"operation": operation, "reason": "缺少 location_id/reset_location_id"}
)
continue
code = rpc.reset_location(resolved)
result["executed_calls"].append(
{"operation": operation, "location_id": resolved, "result": {"code": code}}
)
else:
raise ValueError(f"未知 reset operation: {operation}")
return result
"""自动复位调度器/订单状态/库位,可选仪器复位。
Args:
reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
"""
del kwargs
with self._debug_call_session("reset_auto"):
return self._execute_reset_operations(
reset_scheduler=bool(reset_scheduler),
reset_order_status=bool(reset_order_status),
reset_location=bool(reset_location),
reset_devices=bool(reset_devices),
)
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
"physical_cleanup_confirmed": False,
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description=RESET_MANUAL_CONFIRM_MESSAGE,
)
def reset_manual(
self,
reset_scheduler: bool = True,
reset_order_status: bool = True,
reset_location: bool = True,
reset_devices: bool = False,
physical_cleanup_confirmed: bool = False,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""人工确认物理清理完毕后执行复位。
操作员需先按弹窗提示完成 G3/CEM/Tecan/撕膜机/封膜机/打标机/旋转堆栈/3 个转台
等位置的物料清理,并开门检查冰箱/IDOT/酶标仪/离心机/LCMS 内部无遗留,再勾选
``physical_cleanup_confirmed``,节点才会真正调用复位接口。
Args:
reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
physical_cleanup_confirmed[物理清理确认]: 确认弹窗中的物料检查已完成,默认不勾选;未勾选时不会调用任何 RPC。
"""
del kwargs, timeout_seconds, assignee_user_ids
with self._debug_call_session("reset_manual"):
if not bool(physical_cleanup_confirmed):
logger.info("[reset_manual] 物理清理未确认,拒绝执行复位 RPC")
return {
"status": "blocked",
"physical_cleanup_confirmed": False,
"confirmation_message": RESET_MANUAL_CONFIRM_MESSAGE,
"selected_operations": self._build_selected_operations_summary(
reset_scheduler=bool(reset_scheduler),
reset_order_status=bool(reset_order_status),
reset_location=bool(reset_location),
reset_devices=bool(reset_devices),
),
"executed_calls": [],
"skipped_operations": [
{"operation": op, "reason": "physical_cleanup_not_confirmed"}
for op in RESET_OPERATION_KEYS
],
"warnings": ["physical_cleanup_not_confirmed"],
}
payload = self._execute_reset_operations(
reset_scheduler=bool(reset_scheduler),
reset_order_status=bool(reset_order_status),
reset_location=bool(reset_location),
reset_devices=bool(reset_devices),
)
payload["physical_cleanup_confirmed"] = True
payload["confirmation_message"] = RESET_MANUAL_CONFIRM_MESSAGE
return payload
@action(always_free=True, description="启动 Bioyond 调度器")
def scheduler_start(self, **kwargs: Any) -> Dict[str, Any]:
@@ -1352,6 +1672,171 @@ class BioyondPeptideStation(BioyondWorkstation):
"result_list_count": len(self._as_list(raw.get("resultList"))),
}
# ---------- wait_for_order_finish / unload_materials 辅助 ----------
def _wait_single_order_finish(
self,
order_code: str,
timeout_seconds: int,
*,
poll_mode: bool = False,
poll_interval: float = 0.5,
) -> Dict[str, Any]:
"""阻塞等待单个 orderCode 的 LIMS 完成推送,返回 ``{status, report}``.
与 :class:`BioyondCellWorkstation` 保持相同语义:
- 状态映射 ``"30" -> success`` / ``"-11" -> abnormal_stop`` /
``"-12" -> manual_stop`` / 其它 ``unknown_<status>``;超时返回 ``timeout``。
"""
if not order_code:
return {"status": "error", "report": {}, "message": "empty order_code"}
self.last_order_code = order_code
self.last_order_report = None
self.order_finish_event.clear()
timeout_value = max(int(timeout_seconds or 0), 1)
logger.info("[peptide.order_finish] 开始等待 orderCode=%s timeout=%ss poll_mode=%s", order_code, timeout_value, poll_mode)
if poll_mode:
start_time = time.time()
while not self.order_finish_event.is_set():
if time.time() - start_time > timeout_value:
logger.error("[peptide.order_finish] 等待超时 orderCode=%s", order_code)
return {"status": "timeout", "report": {}, "orderCode": order_code}
time.sleep(poll_interval)
else:
if not self.order_finish_event.wait(timeout=timeout_value):
logger.error("[peptide.order_finish] 等待超时 orderCode=%s", order_code)
return {"status": "timeout", "report": {}, "orderCode": order_code}
report = self.last_order_report or {}
report_code = str(report.get("orderCode") or "")
if report_code and report_code != order_code:
logger.warning("[peptide.order_finish] 报送 orderCode 不匹配 期望=%s 实际=%s", order_code, report_code)
return {"status": "mismatch", "report": report}
status_text = str(report.get("status") or "").strip()
status_map = {"30": "success", "-11": "abnormal_stop", "-12": "manual_stop"}
normalized = status_map.get(status_text, f"unknown_{status_text or 'empty'}")
return {"status": normalized, "report": report}
def _resolve_order_code(self, order_id: str, fallback: str = "") -> str:
"""将 order_id (UUID) 反查为 orderCode。fallback 用于 CLI 调试时直接传 orderCode。"""
order_id_clean = str(order_id or "").strip()
if not order_id_clean:
return fallback.strip()
try:
raw = self._require_hardware_interface().order_report(order_id_clean) or {}
except Exception as exc:
logger.warning("反查 orderCode 失败 order_id=%s: %s", order_id_clean, exc)
return fallback.strip()
if isinstance(raw, dict):
for key in ("code", "orderCode", "order_code"):
value = raw.get(key)
if value:
return str(value)
return fallback.strip()
def _extract_used_materials(self, report: Dict[str, Any]) -> List[Dict[str, Any]]:
if not isinstance(report, dict):
return []
result: List[Dict[str, Any]] = []
for item in self._as_list(report.get("usedMaterials")):
if isinstance(item, dict):
result.append(item)
return result
@staticmethod
def _collect_material_ids(used_materials: List[Dict[str, Any]]) -> List[str]:
ids: List[str] = []
for item in used_materials:
material_id = item.get("materialId") or item.get("MaterialId") or ""
if material_id:
ids.append(str(material_id))
return ids
@staticmethod
def _collect_preintake_ids(used_materials: List[Dict[str, Any]]) -> List[str]:
ids: List[str] = []
for item in used_materials:
preintake_id = item.get("preintakeId") or item.get("preIntakeId") or ""
if preintake_id:
ids.append(str(preintake_id))
return ids
def _build_unload_rows(
self,
used_materials: List[Dict[str, Any]],
*,
material_info_cache: Dict[str, Dict[str, Any]],
missing_material_info: List[str],
order_code: Optional[str] = None,
) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for material in used_materials:
material_id = str(material.get("materialId") or material.get("MaterialId") or "")
info = self._fetch_material_info_cached(material_id, material_info_cache, missing_material_info)
location = self._first_location(info)
row = {
"whName": str(location.get("whName") or ""),
"posX": self._stringify_coord(location.get("posX")),
"posY": self._stringify_coord(location.get("posY")),
"posZ": self._stringify_coord(location.get("posZ")),
"unit": str(info.get("unit") or location.get("unit") or ""),
"materialName": str(info.get("name") or ""),
"materialId": material_id,
"typeMode": str(material.get("typeMode") or material.get("typemode") or ""),
}
if order_code is not None:
row["orderCode"] = order_code
rows.append(row)
return rows
def _fetch_material_info_cached(
self,
material_id: str,
cache: Dict[str, Dict[str, Any]],
missing_material_info: List[str],
) -> Dict[str, Any]:
if not material_id:
return {}
if material_id in cache:
return cache[material_id]
try:
info = self._require_hardware_interface().material_info(material_id) or {}
except Exception as exc:
logger.warning("material_info 查询失败 material_id=%s: %s", material_id, exc)
info = {}
if not isinstance(info, dict) or not info:
missing_material_info.append(material_id)
info = {}
cache[material_id] = info
return info
def _first_location(self, info: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(info, dict):
return {}
for location in self._as_list(info.get("locations")):
if isinstance(location, dict):
return location
return {}
@staticmethod
def _stringify_coord(value: Any) -> str:
if value is None:
return ""
if isinstance(value, float):
if value.is_integer():
return str(int(value))
return str(value)
@staticmethod
def _compose_unload_table(rows: List[Dict[str, Any]], *, multi_order: bool) -> Dict[str, Any]:
columns = UNLOAD_TABLE_COLUMNS_MULTI_ORDER if multi_order else UNLOAD_TABLE_COLUMNS
return {
"data": rows,
"columns": copy.deepcopy(columns),
"tableName": "unloadTable",
}
# ---------- 基础设施 ----------
def _run_scheduler_action(self, method_name: str, label: str) -> Dict[str, Any]:
@@ -1370,32 +1855,105 @@ class BioyondPeptideStation(BioyondWorkstation):
return interface
@staticmethod
def _normalize_reset_operations(reset_operations: Optional[List[str]]) -> List[str]:
alias_map = {
"scheduler": "scheduler_reset",
"scheduler_reset": "scheduler_reset",
"order": "reset_order_status",
"order_status": "reset_order_status",
"reset_order_status": "reset_order_status",
"location": "reset_location",
"reset_location": "reset_location",
def _build_selected_operations_summary(
*,
reset_scheduler: bool,
reset_order_status: bool,
reset_location: bool,
reset_devices: bool,
) -> List[Dict[str, Any]]:
flags: Dict[str, bool] = {
"reset_scheduler": bool(reset_scheduler),
"reset_order_status": bool(reset_order_status),
"reset_location": bool(reset_location),
"reset_devices": bool(reset_devices),
}
normalized: List[str] = []
for operation in list(reset_operations or DEFAULT_RESET_OPERATIONS):
canonical = alias_map.get(str(operation).strip())
if not canonical:
raise ValueError(f"未知 reset operation: {operation}")
if canonical not in normalized:
normalized.append(canonical)
return normalized
return [
{"key": key, "label": RESET_OPERATION_LABELS[key], "selected": flags[key]}
for key in RESET_OPERATION_KEYS
]
@staticmethod
def _reset_operation_endpoint(operation: str) -> str:
return {
"scheduler_reset": "/api/lims/scheduler/reset",
"reset_order_status": "/api/lims/order/reset-order-status",
"reset_location": "/api/lims/storage/reset-location",
}.get(operation, "")
def _execute_reset_operations(
self,
*,
reset_scheduler: bool,
reset_order_status: bool,
reset_location: bool,
reset_devices: bool,
) -> Dict[str, Any]:
"""根据 4 个 checkbox 选择顺序调用对应 RPC。
- 调用顺序固定为 scheduler → order_status → location → devices
- 单步失败(``code != 1`` 或 RPC 抛异常)记 warning 但继续执行后续选中的步骤,
不做 fail-fast便于操作员在遇到部分故障时仍能完成可恢复的复位。
"""
rpc = self._require_hardware_interface()
flags: Dict[str, bool] = {
"reset_scheduler": bool(reset_scheduler),
"reset_order_status": bool(reset_order_status),
"reset_location": bool(reset_location),
"reset_devices": bool(reset_devices),
}
selected_operations = self._build_selected_operations_summary(
reset_scheduler=reset_scheduler,
reset_order_status=reset_order_status,
reset_location=reset_location,
reset_devices=reset_devices,
)
result: Dict[str, Any] = {
"selected_operations": selected_operations,
"executed_calls": [],
"skipped_operations": [],
"warnings": [],
}
rpc_method_map: Dict[str, str] = {
"reset_scheduler": "scheduler_reset",
"reset_order_status": "reset_order_status",
"reset_location": "reset_location",
"reset_devices": "reset_devices",
}
for operation in RESET_OPERATION_KEYS:
if not flags[operation]:
result["skipped_operations"].append(
{"operation": operation, "reason": "checkbox_disabled"}
)
continue
method_name = rpc_method_map[operation]
method = getattr(rpc, method_name, None)
endpoint = RESET_OPERATION_ENDPOINTS[operation]
if not callable(method):
msg = f"RPC 缺少方法: {method_name}"
logger.warning("[reset] %s", msg)
result["executed_calls"].append({
"operation": operation,
"endpoint": endpoint,
"result": {"code": 0},
"error": msg,
})
result["warnings"].append(f"{operation}: {msg}")
continue
try:
code = method()
except Exception as exc: # 单步异常不阻断其余 reset
logger.warning("[reset] %s 调用异常: %s", method_name, exc)
result["executed_calls"].append({
"operation": operation,
"endpoint": endpoint,
"result": {"code": 0},
"error": str(exc),
})
result["warnings"].append(f"{operation}: {exc}")
continue
result["executed_calls"].append({
"operation": operation,
"endpoint": endpoint,
"result": {"code": code},
})
if code != 1:
result["warnings"].append(f"{operation}: rpc_returned_non_one_code={code}")
return result
@staticmethod
def _extract_order_ids(order_id: str = "", order_ids: Optional[List[str]] = None, **kwargs: Any) -> List[str]:

View File

@@ -95,7 +95,8 @@ def test_required_actions_exposed() -> None:
"submit_experiment_day4",
"submit_experiment_day4_LCMS",
"start_experiment",
"reset",
"reset_auto",
"reset_manual",
"scheduler_start",
"scheduler_stop",
"scheduler_pause",
@@ -112,14 +113,14 @@ def test_required_actions_exposed() -> None:
def test_manual_confirm_node_types() -> None:
module = _import_module()
cls = getattr(module, CLASS_NAME)
manual = {"submit_experiment_day1", "start_experiment"}
manual = {"submit_experiment_day1", "start_experiment", "reset_manual"}
normal = {
"submit_experiment",
"submit_experiment_day2",
"submit_experiment_day3",
"submit_experiment_day4",
"submit_experiment_day4_LCMS",
"reset",
"reset_auto",
"scheduler_start",
"list_sample_excels",
"get_step_parameters",
@@ -142,7 +143,7 @@ def test_submit_and_reset_signatures_exclude_legacy_manual_confirm() -> None:
"submit_experiment_day3",
"submit_experiment_day4",
"submit_experiment_day4_LCMS",
"reset",
"reset_auto",
):
params = inspect.signature(getattr(cls, name)).parameters
assert "timeout_seconds" not in params, name
@@ -612,70 +613,579 @@ def test_start_experiment_starts_when_table_empty() -> None:
# ---------------------------------------------------------------------------
# 10. Reset
# 10. Reset (plan 2026-05-21: reset_auto + reset_manual 四勾选)
# ---------------------------------------------------------------------------
def test_reset_signature_drops_legacy_params_and_uses_literal() -> None:
"""plan 调整:删除 dry_run/order_id/location_idreset_operations 用 Literal 注解。"""
RESET_BOOL_PARAMS = (
"reset_scheduler",
"reset_order_status",
"reset_location",
"reset_devices",
)
def _reset_meta(name: str) -> Dict[str, Any]:
cls = getattr(_import_module(), CLASS_NAME)
sig = inspect.signature(cls.reset)
params = sig.parameters
for legacy in ("dry_run", "order_id", "location_id"):
assert legacy not in params, f"reset 不应再有 {legacy} 入参"
assert "reset_operations" in params
assert any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()), \
"reset 必须保留 **kwargs 以兜底 reset_order_id/reset_location_id"
annotation = params["reset_operations"].annotation
rendered = annotation if isinstance(annotation, str) else repr(annotation)
for op in ("scheduler_reset", "reset_order_status", "reset_location"):
assert op in rendered, f"reset_operations 的 Literal 必须包含 {op}"
return dict(getattr(getattr(cls, name), "_action_registry_meta", {}))
def test_reset_goal_default_contains_all_operations() -> None:
"""像 sirna 一样goal_default 默认勾选全部三个 reset 操作。"""
cls = getattr(_import_module(), CLASS_NAME)
meta = getattr(cls.reset, "_action_registry_meta", {})
# --- plan §Tests 1: reset_auto 不是 MANUAL_CONFIRM ---
def test_reset_auto_is_not_manual_confirm() -> None:
module = _import_module()
meta = _reset_meta("reset_auto")
assert meta.get("node_type") != module.NodeType.MANUAL_CONFIRM
# --- plan §Tests 2: reset_manual 是 MANUAL_CONFIRM ---
def test_reset_manual_is_manual_confirm() -> None:
module = _import_module()
meta = _reset_meta("reset_manual")
assert meta.get("node_type") == module.NodeType.MANUAL_CONFIRM
# --- plan §Tests 3: reset_manual 关键 metadata ---
def test_reset_manual_metadata_shape() -> None:
meta = _reset_meta("reset_manual")
assert meta.get("always_free") is True
assert meta.get("placeholder_keys") == {
"assignee_user_ids": "unilabos_manual_confirm",
}
goal_default = meta.get("goal_default") or {}
assert goal_default.get("reset_operations") == [
"scheduler_reset",
"reset_order_status",
"reset_location",
]
assert goal_default.get("timeout_seconds") == 3600
assert goal_default.get("assignee_user_ids") == []
assert goal_default.get("physical_cleanup_confirmed") is False
def test_reset_executes_typed_rpc_calls() -> None:
# --- plan §Tests 4: 两个 action 都暴露 4 个真实 bool 参数 ---
def _resolved_bool_annotation(param: inspect.Parameter) -> Any:
"""`from __future__ import annotations` 下注解是字符串;统一解析回真实类型。"""
annotation = param.annotation
if annotation is bool:
return bool
if isinstance(annotation, str):
# 既不是 Annotated[...] 也不是 Optional[...] 的纯 "bool" 字符串
return bool if annotation.strip() == "bool" else annotation
return annotation
def test_reset_actions_expose_four_real_bool_params() -> None:
cls = getattr(_import_module(), CLASS_NAME)
for action_name in ("reset_auto", "reset_manual"):
params = inspect.signature(getattr(cls, action_name)).parameters
for flag in RESET_BOOL_PARAMS:
assert flag in params, f"{action_name} 缺少 {flag}"
resolved = _resolved_bool_annotation(params[flag])
assert resolved is bool, (
f"{action_name}.{flag} 必须是裸 bool不能用 Annotated[bool, Field(...)] 包裹),实际: {params[flag].annotation!r}"
)
assert params[flag].kind in (
inspect.Parameter.POSITIONAL_OR_KEYWORD,
inspect.Parameter.KEYWORD_ONLY,
), f"{action_name}.{flag} 必须是真实参数,不能藏在 **kwargs"
# --- plan §Tests 5: registry 生成的 schema 标记 reset 字段为 boolean ---
def test_reset_action_param_annotations_are_bool_for_schema() -> None:
"""plan: 当前 AST registry 不 unwrap Annotated裸 bool 才能映射成 type: boolean。
这里直接检查 type_to_schema 在 Python 类型 ``bool`` 上返回 ``{"type": "boolean"}``
再校验 reset_auto/reset_manual 的真实参数注解就是 ``bool``(裸字符串 "bool" 也算),
从而保证生成的 JSON Schema 一定是 boolean不会被前端当成 object/string。
"""
from unilabos.registry.utils import type_to_schema
assert type_to_schema(bool) == {"type": "boolean"}
cls = getattr(_import_module(), CLASS_NAME)
for action_name in ("reset_auto", "reset_manual"):
params = inspect.signature(getattr(cls, action_name)).parameters
for flag in RESET_BOOL_PARAMS:
resolved = _resolved_bool_annotation(params[flag])
assert type_to_schema(resolved) == {"type": "boolean"}, (
f"{action_name}.{flag} schema 必须是 boolean实际注解: {params[flag].annotation!r}"
)
# --- plan §Tests 6: reset_auto 替换旧 reset未保留旧 id-shaped reset 别名 ---
def test_legacy_reset_action_removed() -> None:
cls = getattr(_import_module(), CLASS_NAME)
have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)}
assert "reset" not in have, "旧 reset 应被 reset_auto 完全替换,不保留同名别名"
assert "reset_auto" in have
# --- plan §Tests 7: goal_default 中前三项 Truereset_devices=False ---
def test_reset_goal_defaults_first_three_true_devices_false() -> None:
for action_name in ("reset_auto", "reset_manual"):
meta = _reset_meta(action_name)
goal_default = meta.get("goal_default") or {}
assert goal_default.get("reset_scheduler") is True, action_name
assert goal_default.get("reset_order_status") is True, action_name
assert goal_default.get("reset_location") is True, action_name
assert goal_default.get("reset_devices") is False, action_name
# --- plan §Tests 8: reset_manual(physical_cleanup_confirmed=False) 不调任何 RPC ---
def test_reset_manual_blocks_when_not_confirmed() -> None:
station = _make_station()
station.hardware_interface.scheduler_reset.return_value = 1
station.hardware_interface.reset_order_status.return_value = 1
station.hardware_interface.reset_location.return_value = 1
out = station.reset(
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"],
reset_order_id=ORDER_GUID,
reset_location_id="loc-1",
out = station.reset_manual(
reset_scheduler=True,
reset_order_status=True,
reset_location=True,
reset_devices=True,
physical_cleanup_confirmed=False,
)
station.hardware_interface.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_called_once_with(ORDER_GUID)
station.hardware_interface.reset_location.assert_called_once_with("loc-1")
assert out["selected_operations"] == [
"scheduler_reset",
rpc = station.hardware_interface
rpc.scheduler_reset.assert_not_called()
rpc.reset_order_status.assert_not_called()
rpc.reset_location.assert_not_called()
rpc.reset_devices.assert_not_called()
assert out["status"] == "blocked"
assert out["physical_cleanup_confirmed"] is False
assert "请确认" in out["confirmation_message"]
# --- plan §Tests 9: reset_auto() 默认调三件、不调 reset_devices ---
def test_reset_auto_defaults_call_three_and_skip_devices() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
out = station.reset_auto()
rpc.scheduler_reset.assert_called_once_with()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
rpc.reset_devices.assert_not_called()
executed_ops = [item["operation"] for item in out["executed_calls"]]
assert executed_ops == ["reset_scheduler", "reset_order_status", "reset_location"]
skipped_ops = {item["operation"] for item in out["skipped_operations"]}
assert skipped_ops == {"reset_devices"}
selected = {item["key"]: item["selected"] for item in out["selected_operations"]}
assert selected == {
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
}
# --- plan §Tests 10: reset_auto(reset_devices=True) 也会调 reset_devices ---
def test_reset_auto_with_devices_true_calls_reset_devices() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
rpc.reset_devices.return_value = 1
out = station.reset_auto(reset_devices=True)
rpc.reset_devices.assert_called_once_with()
executed_ops = [item["operation"] for item in out["executed_calls"]]
assert executed_ops == [
"reset_scheduler",
"reset_order_status",
"reset_location",
"reset_devices",
]
assert len(out["executed_calls"]) == 3
assert out["skipped_operations"] == []
def test_reset_skips_when_ids_missing() -> None:
"""没有 order_id / location_id 时应该 skip 而不是抛错"""
def test_reset_auto_individual_checkboxes_drive_calls() -> None:
"""更细粒度:单独勾 reset_scheduler 时只调 scheduler_reset"""
station = _make_station()
station.hardware_interface.scheduler_reset.return_value = 1
out = station.reset(
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"],
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
out = station.reset_auto(
reset_scheduler=True,
reset_order_status=False,
reset_location=False,
reset_devices=False,
)
station.hardware_interface.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_not_called()
station.hardware_interface.reset_location.assert_not_called()
skipped_ops = {item["operation"] for item in out["skipped_operations"]}
assert skipped_ops == {"reset_order_status", "reset_location"}
rpc.scheduler_reset.assert_called_once_with()
rpc.reset_order_status.assert_not_called()
rpc.reset_location.assert_not_called()
rpc.reset_devices.assert_not_called()
skipped = {item["operation"] for item in out["skipped_operations"]}
assert skipped == {"reset_order_status", "reset_location", "reset_devices"}
def test_reset_manual_after_confirmation_calls_same_helper() -> None:
"""plan §reset_manual 执行规则:勾选确认后等价于 reset_auto。"""
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
rpc.reset_devices.return_value = 1
out = station.reset_manual(
reset_scheduler=True,
reset_order_status=True,
reset_location=True,
reset_devices=True,
physical_cleanup_confirmed=True,
)
rpc.scheduler_reset.assert_called_once_with()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
rpc.reset_devices.assert_called_once_with()
assert out["physical_cleanup_confirmed"] is True
assert out["confirmation_message"]
assert [item["operation"] for item in out["executed_calls"]] == [
"reset_scheduler",
"reset_order_status",
"reset_location",
"reset_devices",
]
# --- plan §Tests 11: RPC 包装层 reset_order_status / reset_location 不发送 data 键 ---
def test_rpc_reset_order_status_sends_no_data_key() -> None:
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
rpc = object.__new__(BioyondV1RPC)
rpc.host = "http://test"
rpc.api_key = "k"
rpc._logger = MagicMock()
rpc.post = MagicMock(return_value={"code": 1}) # type: ignore[method-assign]
rpc.get_current_time_iso8601 = MagicMock(return_value="2026-05-21T08:00:00.000Z") # type: ignore[method-assign]
rpc.reset_order_status("ignored-uuid")
args, kwargs = rpc.post.call_args
sent_params = kwargs.get("params") or (args[1] if len(args) > 1 else {})
assert "data" not in sent_params, "reset_order_status 不应再发送 data 字段"
assert set(sent_params.keys()) == {"apiKey", "requestTime"}
def test_rpc_reset_location_sends_no_data_key() -> None:
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
rpc = object.__new__(BioyondV1RPC)
rpc.host = "http://test"
rpc.api_key = "k"
rpc._logger = MagicMock()
rpc.post = MagicMock(return_value={"code": 1}) # type: ignore[method-assign]
rpc.get_current_time_iso8601 = MagicMock(return_value="2026-05-21T08:00:00.000Z") # type: ignore[method-assign]
rpc.reset_location("ignored-loc-id")
args, kwargs = rpc.post.call_args
sent_params = kwargs.get("params") or (args[1] if len(args) > 1 else {})
assert "data" not in sent_params, "reset_location 不应再发送 data 字段"
assert set(sent_params.keys()) == {"apiKey", "requestTime"}
# --- plan §Tests 12 + 13: 任何 reset 路径都不调用 take_out / refresh_material_cache ---
def test_reset_paths_do_not_call_take_out_or_material_cache() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
rpc.reset_devices.return_value = 1
station.reset_auto(reset_devices=True)
station.reset_manual(physical_cleanup_confirmed=True, reset_devices=True)
station.reset_manual(physical_cleanup_confirmed=False)
rpc.take_out.assert_not_called()
refresh = getattr(rpc, "refresh_material_cache", None)
if refresh is not None and hasattr(refresh, "assert_not_called"):
refresh.assert_not_called()
# --- 失败/兜底用例:不 fail-fast单步异常或 code!=1 仅记 warning ---
def test_reset_auto_records_warning_when_rpc_returns_non_one() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 0 # 业务失败
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
out = station.reset_auto()
rpc.scheduler_reset.assert_called_once_with()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
assert any("reset_scheduler" in w for w in out.get("warnings", []))
def test_reset_auto_continues_after_rpc_exception() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.side_effect = RuntimeError("HTTP 500")
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
out = station.reset_auto()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
error_entries = [item for item in out["executed_calls"] if "error" in item]
assert any(item["operation"] == "reset_scheduler" for item in error_entries)
def test_reset_manual_confirmation_message_constant() -> None:
module = _import_module()
msg = module.RESET_MANUAL_CONFIRM_MESSAGE
for keyword in ("G3", "CEM", "Tecan", "撕膜机", "封膜机", "打标机", "旋转堆栈", "转台", "冰箱", "IDOT", "酶标仪", "离心机", "LCMS"):
assert keyword in msg, f"reset_manual 提示文案缺关键字: {keyword}"
meta = _reset_meta("reset_manual")
assert meta.get("description") == msg, "reset_manual 装饰器 description 应等于常量本身"
# ---------------------------------------------------------------------------
# 11. wait_for_order_finish + unload_materialsplan 2026-05-20 新增节点)
# ---------------------------------------------------------------------------
def _make_station_with_finish_state() -> Any:
"""带订单完成事件状态的 station 实例process_order_finish_report 用)。"""
import threading as _threading
station = _make_station()
station.order_finish_event = _threading.Event()
station.last_order_code = None
station.last_order_report = None
return station
def _make_report_request(order_code: str, status: str = "30", used_materials: List[Dict[str, Any]] | None = None) -> Any:
request = MagicMock()
request.data = {
"orderCode": order_code,
"orderName": f"实验{order_code}",
"startTime": "2026-05-20T10:00:00.000Z",
"endTime": "2026-05-20T11:00:00.000Z",
"status": status,
"usedMaterials": used_materials or [],
}
return request
def test_process_order_finish_report_triggers_event_on_match() -> None:
station = _make_station_with_finish_state()
station.last_order_code = "EXP260520-100000"
request = _make_report_request("EXP260520-100000", status="30")
station.process_order_finish_report(request, [])
assert station.order_finish_event.is_set() is True
assert station.last_order_report["orderCode"] == "EXP260520-100000"
def test_process_order_finish_report_ignores_mismatched_order_code() -> None:
station = _make_station_with_finish_state()
station.last_order_code = "EXP260520-100000"
request = _make_report_request("EXP260520-OTHER", status="30")
station.process_order_finish_report(request, [])
assert station.order_finish_event.is_set() is False
assert station.last_order_report["orderCode"] == "EXP260520-OTHER"
def test_wait_for_order_finish_returns_immediately_when_event_set() -> None:
"""事件预先 set + last_order_report 命中时wait 立即返回 success。"""
station = _make_station_with_finish_state()
used_materials = [
{"materialId": "mat-1", "locationId": "loc-1", "typeMode": "1", "usedQuantity": 1.0},
]
station.hardware_interface.order_report.return_value = {"code": "EXP260520-101010"}
station.hardware_interface.material_info.return_value = {
"name": "样品A",
"unit": "mg",
"locations": [{"whName": "自动化堆栈", "posX": 1, "posY": 2, "posZ": 3, "code": "1-01"}],
}
pending: Dict[str, Any] = {}
def _fake_wait(timeout: float = 0.0) -> bool:
pending["report"] = {
"orderCode": station.last_order_code,
"status": "30",
"usedMaterials": used_materials,
}
station.last_order_report = pending["report"]
return True
station.order_finish_event = MagicMock()
station.order_finish_event.wait = MagicMock(side_effect=_fake_wait)
station.order_finish_event.clear = MagicMock()
station.order_finish_event.set = MagicMock()
station.order_finish_event.is_set = MagicMock(return_value=True)
out = station.wait_for_order_finish(
order_id="11111111-1111-1111-1111-111111111111",
timeout_seconds=5,
)
assert out["order_finish_status"] == "success"
assert out["material_ids"] == ["mat-1"]
assert out["preintake_ids"] == []
table = out["unloadTable"]
assert table["tableName"] == "unloadTable"
column_keys = [c["key"] for c in table["columns"]]
assert column_keys == ["whName", "posX", "posY", "posZ", "unit", "materialName"]
row = table["data"][0]
assert row["whName"] == "自动化堆栈"
assert row["posX"] == "1"
assert row["posY"] == "2"
assert row["posZ"] == "3"
assert row["unit"] == "mg"
assert row["materialName"] == "样品A"
def test_wait_for_order_finish_returns_timeout_status() -> None:
station = _make_station_with_finish_state()
station.hardware_interface.order_report.return_value = {"code": "EXP260520-TIMEOUT"}
station.order_finish_event = MagicMock()
station.order_finish_event.wait = MagicMock(return_value=False)
station.order_finish_event.clear = MagicMock()
station.order_finish_event.is_set = MagicMock(return_value=False)
out = station.wait_for_order_finish(
order_id="22222222-2222-2222-2222-222222222222",
timeout_seconds=1,
)
assert out["order_finish_status"] == "timeout"
assert out["unloadTable"]["data"] == []
assert out["unload_summary"]["missing_material_info"] == []
def test_wait_for_order_finish_records_missing_material_info() -> None:
"""material-info 失败的物料行字段为空串,并被列入 missing_material_info。"""
station = _make_station_with_finish_state()
used_materials = [
{"materialId": "mat-good", "typeMode": "1"},
{"materialId": "mat-bad", "typeMode": "1"},
]
def _material_info(material_id: str) -> Dict[str, Any]:
if material_id == "mat-good":
return {
"name": "好物料",
"unit": "mg",
"locations": [{"whName": "WH-A", "posX": 4, "posY": 5, "posZ": 6}],
}
raise RuntimeError("HTTP 500")
station.hardware_interface.order_report.return_value = {"code": "EXP260520-MIX"}
station.hardware_interface.material_info.side_effect = _material_info
def _fake_wait(timeout: float = 0.0) -> bool:
station.last_order_report = {
"orderCode": station.last_order_code,
"status": "30",
"usedMaterials": used_materials,
}
return True
station.order_finish_event = MagicMock()
station.order_finish_event.wait = MagicMock(side_effect=_fake_wait)
station.order_finish_event.clear = MagicMock()
station.order_finish_event.set = MagicMock()
station.order_finish_event.is_set = MagicMock(return_value=True)
out = station.wait_for_order_finish(
order_id="33333333-3333-3333-3333-333333333333",
timeout_seconds=1,
)
rows = out["unloadTable"]["data"]
assert [row["materialId"] for row in rows] == ["mat-good", "mat-bad"]
bad_row = rows[1]
assert bad_row["whName"] == ""
assert bad_row["posX"] == ""
assert bad_row["posY"] == ""
assert bad_row["posZ"] == ""
assert bad_row["unit"] == ""
assert bad_row["materialName"] == ""
assert "mat-bad" in out["unload_summary"]["missing_material_info"]
assert "mat-good" not in out["unload_summary"]["missing_material_info"]
def test_unload_materials_blocks_when_not_confirmed() -> None:
station = _make_station()
with pytest.raises(RuntimeError):
station.unload_materials(
order_id=ORDER_GUID,
material_ids=["mat-1"],
preintake_ids=[],
unloadTable={"data": []},
materials_unloaded=False,
)
def test_unload_materials_calls_take_out_with_resolved_lists() -> None:
station = _make_station()
station.hardware_interface.take_out.return_value = {"code": 1, "message": "ok", "data": {}}
out = station.unload_materials(
order_id=ORDER_GUID,
material_ids=["mat-1", "mat-2"],
preintake_ids=[],
unloadTable={"data": [{"materialId": "mat-1"}, {"materialId": "mat-2"}]},
materials_unloaded=True,
)
station.hardware_interface.take_out.assert_called_once_with(
ORDER_GUID,
preintake_ids=[],
material_ids=["mat-1", "mat-2"],
)
assert out["success"] is True
assert out["unloaded_count"] == 2
assert out["take_out_result"] == {"code": 1, "message": "ok", "data": {}}
def test_unload_materials_does_not_raise_when_take_out_fails() -> None:
"""take_out 业务失败时仅 warn 不抛异常(已物理下料,避免阻塞工作流)。"""
station = _make_station()
station.hardware_interface.take_out.return_value = {"code": 0, "message": "已下过线"}
out = station.unload_materials(
order_id=ORDER_GUID,
material_ids=["mat-1"],
preintake_ids=[],
unloadTable={"data": []},
materials_unloaded=True,
)
assert out["success"] is False
assert out["take_out_result"] == {"code": 0, "message": "已下过线"}
def test_new_actions_registered_on_class() -> None:
cls = getattr(_import_module(), CLASS_NAME)
have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)}
assert "wait_for_order_finish" in have
assert "unload_materials" in have
module = _import_module()
unload_meta = getattr(cls.unload_materials, "_action_registry_meta", {})
assert unload_meta.get("node_type") == module.NodeType.MANUAL_CONFIRM
wait_meta = getattr(cls.wait_for_order_finish, "_action_registry_meta", {})
assert wait_meta.get("node_type") != module.NodeType.MANUAL_CONFIRM
assert wait_meta.get("always_free") is True
def test_unload_table_columns_constant_layout() -> None:
module = _import_module()
keys = [c["key"] for c in module.UNLOAD_TABLE_COLUMNS]
assert keys == ["whName", "posX", "posY", "posZ", "unit", "materialName"]
multi_keys = [c["key"] for c in module.UNLOAD_TABLE_COLUMNS_MULTI_ORDER]
assert multi_keys[0] == "orderCode"
assert multi_keys[1:] == keys

View File

@@ -56,13 +56,17 @@ class ConnectionMonitor:
def _monitor_loop(self):
while self._running:
try:
# 使用 lightweight API 检查连接
# query_matial_type_list 是比较快的查询
start_time = time.time()
result = self.workstation.hardware_interface.material_type_list()
# 使用轻量级调度状态接口检查连接,避免启动时打印完整物料类型列表。
result = self.workstation.hardware_interface.scheduler_status()
status = "online" if result else "offline"
msg = "Connection established" if status == "online" else "Failed to get material type list"
if status == "online":
msg = (
f"Scheduler status={result.get('status')}, "
f"hasTask={result.get('hasTask')}"
)
else:
msg = "Failed to get scheduler status"
if status != self._last_status:
logger.info(f"Bioyond连接状态变更: {self._last_status} -> {status}")