update reset

This commit is contained in:
hanhua@dp.tech
2026-05-22 11:32:31 +08:00
parent f14e1bc4a0
commit c2c986105d
3 changed files with 1195 additions and 177 deletions

View File

@@ -415,21 +415,25 @@ class BioyondV1RPC(BaseRequest):
return {} return {}
return response.get("data", {}) return response.get("data", {})
def reset_location(self, location_id: str) -> int: def reset_location(self, location_id: Optional[str] = None) -> int:
"""复位库位 """复位库位
现场实测 ``POST /api/lims/storage/reset-location`` 不传 ``data`` 即可成功
(见 ``temp_benyao/peptide/_findings/2026-05-21_1615_remaining_resets_no_data_live.md``
因此默认无 ``data`` 字段;保留 ``location_id`` 仅为兼容旧调用,传入会被忽略。
参数: 参数:
location_id: 库位ID location_id: 兼容入参,已被忽略;新逻辑不再以 location 为粒度复位。
返回值: 返回值:
int: 成功返回1失败返回0 int: 成功返回1失败返回0
""" """
del location_id
response = self.post( response = self.post(
url=f'{self.host}/api/lims/storage/reset-location', url=f'{self.host}/api/lims/storage/reset-location',
params={ params={
"apiKey": self.api_key, "apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(), "requestTime": self.get_current_time_iso8601(),
"data": location_id,
}) })
if not response or response['code'] != 1: if not response or response['code'] != 1:
return 0 return 0
@@ -929,21 +933,25 @@ class BioyondV1RPC(BaseRequest):
return {} return {}
return response.get("data", {}) return response.get("data", {})
def reset_order_status(self, order_id: str) -> int: def reset_order_status(self, order_id: Optional[str] = None) -> int:
"""复位订单状态 """复位订单状态
现场实测 ``POST /api/lims/order/reset-order-status`` 不传 ``data`` 即可成功
(见 ``temp_benyao/peptide/_findings/2026-05-21_1613_reset_order_status_no_data_live.md``
因此默认无 ``data`` 字段;保留 ``order_id`` 仅为兼容旧调用,传入会被忽略。
参数: 参数:
order_id: 订单ID order_id: 兼容入参,已被忽略;新逻辑不再以单订单为粒度复位。
返回值: 返回值:
int: 成功返回1失败返回0 int: 成功返回1失败返回0
""" """
del order_id
response = self.post( response = self.post(
url=f'{self.host}/api/lims/order/reset-order-status', url=f'{self.host}/api/lims/order/reset-order-status',
params={ params={
"apiKey": self.api_key, "apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(), "requestTime": self.get_current_time_iso8601(),
"data": order_id,
}) })
if not response or response['code'] != 1: if not response or response['code'] != 1:
return 0 return 0

View File

@@ -8,10 +8,12 @@ import copy
import json import json
import mimetypes import mimetypes
import sys import sys
import threading
import time
from contextlib import nullcontext from contextlib import nullcontext
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Annotated, Any, Dict, Iterable, List, Literal, Optional, Tuple from typing import Annotated, Any, Dict, Iterable, List, Optional, Tuple
from uuid import UUID from uuid import UUID
import requests import requests
@@ -94,13 +96,46 @@ except Exception as exc: # pragma: no cover
DEBUG_CLI_ENABLED = False DEBUG_CLI_ENABLED = False
DEFAULT_RESET_OPERATIONS = ("scheduler_reset", "reset_order_status", "reset_location") RESET_OPERATION_KEYS: Tuple[str, ...] = (
"reset_scheduler",
"reset_order_status",
"reset_location",
"reset_devices",
)
RESET_OPERATION_LABELS: Dict[str, str] = {
"reset_scheduler": "调度器复位",
"reset_order_status": "订单状态复位",
"reset_location": "库位复位",
"reset_devices": "仪器复位",
}
RESET_OPERATION_ENDPOINTS: Dict[str, str] = {
"reset_scheduler": "/api/lims/scheduler/reset",
"reset_order_status": "/api/lims/order/reset-order-status",
"reset_location": "/api/lims/storage/reset-location",
"reset_devices": "/api/lims/device/reset-devices",
}
RESET_MANUAL_CONFIRM_MESSAGE = (
"请确认G3、CEM、Tecan、撕膜机、封膜机、打标机、旋转堆栈上下料位、3个转台等位置的物料已清理完毕\n"
"请开门检查冰箱、IDOT、酶标仪、离心机、LCMS内部没有遗留物料。"
)
RESULT_TABLE_COLUMNS = [ RESULT_TABLE_COLUMNS = [
{"name": "设备", "key": "whName"}, {"name": "设备", "key": "whName"},
{"name": "位置", "key": "locationCode"}, {"name": "位置", "key": "locationCode"},
{"name": "物料名称", "key": "materialName"}, {"name": "物料名称", "key": "materialName"},
{"name": "数量", "key": "quantity"}, {"name": "数量", "key": "quantity"},
] ]
UNLOAD_TABLE_COLUMNS = [
{"name": "仓库名称", "key": "whName"},
{"name": "坐标 X", "key": "posX"},
{"name": "坐标 Y", "key": "posY"},
{"name": "坐标 Z", "key": "posZ"},
{"name": "单位", "key": "unit"},
{"name": "物料名称", "key": "materialName"},
]
UNLOAD_TABLE_COLUMNS_MULTI_ORDER = [
{"name": "订单编号", "key": "orderCode"},
*UNLOAD_TABLE_COLUMNS,
]
MATERIAL_TYPE_ORDER = ("Sample", "Consumables", "Reagent") MATERIAL_TYPE_ORDER = ("Sample", "Consumables", "Reagent")
PEPTIDE_SAMPLE_FILE_KEY = "SampleFile" PEPTIDE_SAMPLE_FILE_KEY = "SampleFile"
DAY1_CEM_METHOD_KEY = "CEMMethodFileName" DAY1_CEM_METHOD_KEY = "CEMMethodFileName"
@@ -291,6 +326,13 @@ class BioyondPeptideStation(BioyondWorkstation):
self.protocol_type = protocol_type self.protocol_type = protocol_type
self.bioyond_config = merged_config self.bioyond_config = merged_config
super().__init__(bioyond_config=self.bioyond_config, deck=deck) super().__init__(bioyond_config=self.bioyond_config, deck=deck)
# 订单完成报送等待机制(多肽场景):
# - last_order_code 记录当前正在等待的 orderCode业务编号用于回调侧多订单隔离
# - last_order_report 缓存最近一次匹配到的 report.data
# - order_finish_event 用于阻塞等待 + 唤醒 wait_for_order_finish 动作
self.order_finish_event = threading.Event()
self.last_order_code: Optional[str] = None
self.last_order_report: Optional[Dict[str, Any]] = None
logger.info("BioyondPeptideStation 初始化完成: %s", self.bioyond_config.get("api_host", "")) logger.info("BioyondPeptideStation 初始化完成: %s", self.bioyond_config.get("api_host", ""))
def _debug_call_session(self, action_name: str): def _debug_call_session(self, action_name: str):
@@ -722,6 +764,9 @@ class BioyondPeptideStation(BioyondWorkstation):
ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"), ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"), ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.HANDLE, io_type="source"), ActionInputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.HANDLE, io_type="source"),
ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.EXECUTOR),
], ],
) )
def start_experiment( def start_experiment(
@@ -738,83 +783,337 @@ class BioyondPeptideStation(BioyondWorkstation):
if table_rows and not materials_loaded: if table_rows and not materials_loaded:
raise RuntimeError("多肽物料装载未确认,拒绝启动调度器") raise RuntimeError("多肽物料装载未确认,拒绝启动调度器")
result = self._run_scheduler_action("scheduler_start", "启动") result = self._run_scheduler_action("scheduler_start", "启动")
result["order_id"] = resolved_order_ids[0] if resolved_order_ids else str(order_id or "")
result["order_ids"] = resolved_order_ids result["order_ids"] = resolved_order_ids
result["materials_loaded"] = bool(materials_loaded) result["materials_loaded"] = bool(materials_loaded)
result["resultTable"] = resultTable or {} result["resultTable"] = resultTable or {}
return result return result
def process_order_finish_report(self, report_request, used_materials: Optional[List[Any]] = None) -> Dict[str, Any]:
"""处理 LIMS /report/order_finish 推送:保留父类语义,并按 orderCode 唤醒等待动作。
说明:
- 工作站 HTTP 服务为进程级单例,所有 wait 节点共用同一条推送通道;
需要按 ``self.last_order_code`` 过滤,避免别的订单 push 错误唤醒当前等待。
"""
materials = used_materials or []
try:
result = super().process_order_finish_report(report_request, materials)
except Exception as exc:
logger.error("基类 process_order_finish_report 失败: %s", exc, exc_info=True)
result = {"processed": False, "error": str(exc)}
try:
data = getattr(report_request, "data", {}) or {}
report_order_code = str(data.get("orderCode") or "")
self.last_order_report = data
expected = self.last_order_code
logger.info(
"[peptide.order_finish] 收到 orderCode=%s 期望=%s status=%s",
report_order_code,
expected,
data.get("status"),
)
if expected and report_order_code and expected == report_order_code:
self.order_finish_event.set()
logger.info("[peptide.order_finish] orderCode 匹配,已触发 order_finish_event")
elif expected and report_order_code and expected != report_order_code:
logger.warning(
"[peptide.order_finish] orderCode 不匹配,忽略本次 push (期望=%s 实际=%s)",
expected,
report_order_code,
)
except Exception as exc: # pragma: no cover - 仅为防御
logger.error("[peptide.order_finish] 触发 event 失败: %s", exc, exc_info=True)
return result
@action(
always_free=True,
description="等待订单完成回调并预生成下料表",
handles=[
ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_code", data_type="str", label="订单编号", data_key="order_code", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_finish_status", data_type="str", label="订单完成状态", data_key="order_finish_status", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="order_finish_report", data_type="json", label="订单完成报文", data_key="order_finish_report", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="used_materials", data_type="json", label="使用物料列表", data_key="used_materials", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="material_ids", data_type="json", label="物料ID列表", data_key="material_ids", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="preintake_ids", data_type="json", label="通量ID列表", data_key="preintake_ids", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="unloadTable", data_type="table", label="下料表", data_key="unloadTable", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="unload_summary", data_type="json", label="下料摘要", data_key="unload_summary", data_source=DataSource.EXECUTOR),
],
)
def wait_for_order_finish(
self,
order_id: str = "",
order_ids: Optional[List[str]] = None,
order_code: str = "",
timeout_seconds: int = 36000,
poll_mode: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
"""阻塞等待 LIMS 订单完成回调,并基于 usedMaterials 预生成下料表。
- 多订单 ``order_ids`` 时按顺序逐个等;任何一个 ``abnormal_stop`` 立即返回。
- 节点 1 在此就把 ``unloadTable`` 组装好(前端 manual_confirm 弹窗在节点 2
中通过 ``getPreviousNodeResult`` 拿前一个节点 param 渲染)。
"""
with self._debug_call_session("wait_for_order_finish"):
resolved_order_ids = self._extract_order_ids(order_id=order_id, order_ids=order_ids, **kwargs)
order_code_input = str(order_code or "").strip()
if not resolved_order_ids and not order_code_input:
raise PeptideWorkflowError("wait_for_order_finish 至少需要 order_id/order_ids/order_code 之一")
material_info_cache: Dict[str, Dict[str, Any]] = {}
missing_material_info: List[str] = []
unload_rows: List[Dict[str, Any]] = []
used_materials_total: List[Dict[str, Any]] = []
material_ids_total: List[str] = []
preintake_ids_total: List[str] = []
order_codes_seen: List[str] = []
last_status: str = ""
last_report: Dict[str, Any] = {}
multi_order = len(resolved_order_ids) > 1 or (resolved_order_ids and order_code_input)
wait_targets: List[Tuple[str, str]] = []
if resolved_order_ids:
for oid in resolved_order_ids:
code_for_oid = self._resolve_order_code(oid, fallback=order_code_input if len(resolved_order_ids) == 1 else "")
wait_targets.append((oid, code_for_oid))
else:
wait_targets.append(("", order_code_input))
for oid, code_for_oid in wait_targets:
if not code_for_oid:
raise PeptideWorkflowError(
f"wait_for_order_finish 无法解析 orderCode (order_id={oid!r})"
)
order_codes_seen.append(code_for_oid)
wait_result = self._wait_single_order_finish(code_for_oid, timeout_seconds, poll_mode=poll_mode)
last_status = wait_result["status"]
last_report = wait_result["report"] or {}
used_materials = self._extract_used_materials(last_report)
used_materials_total.extend(used_materials)
material_ids_total.extend(self._collect_material_ids(used_materials))
preintake_ids_total.extend(self._collect_preintake_ids(used_materials))
rows = self._build_unload_rows(
used_materials,
material_info_cache=material_info_cache,
missing_material_info=missing_material_info,
order_code=code_for_oid if multi_order else None,
)
unload_rows.extend(rows)
if last_status == "timeout":
break
if last_status == "abnormal_stop":
break
unload_table = self._compose_unload_table(unload_rows, multi_order=multi_order)
unload_summary = {
"order_codes": order_codes_seen,
"total_items": len(unload_rows),
"missing_material_info": list(dict.fromkeys(missing_material_info)),
}
primary_order_id = resolved_order_ids[0] if resolved_order_ids else ""
primary_order_code = order_codes_seen[0] if order_codes_seen else order_code_input
return {
"success": last_status == "success",
"order_id": primary_order_id,
"order_ids": resolved_order_ids,
"order_code": primary_order_code,
"order_codes": order_codes_seen,
"order_finish_status": last_status,
"order_finish_report": last_report,
"used_materials": used_materials_total,
"material_ids": list(dict.fromkeys(material_ids_total)),
"preintake_ids": list(dict.fromkeys(preintake_ids_total)),
"unloadTable": unload_table,
"unload_summary": unload_summary,
}
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={"materials_unloaded": False, "timeout_seconds": 3600, "assignee_user_ids": []},
feedback_interval=300,
description="确认人工下料完成后调用 take-out 通知奔耀同步状态",
handles=[
ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="material_ids", data_type="json", label="物料ID列表", data_key="material_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="preintake_ids", data_type="json", label="通量ID列表", data_key="preintake_ids", data_source=DataSource.HANDLE, io_type="source"),
ActionInputHandle(key="unloadTable", data_type="table", label="下料表", data_key="unloadTable", data_source=DataSource.HANDLE, io_type="source"),
ActionOutputHandle(key="take_out_result", data_type="json", label="取出接口结果", data_key="take_out_result", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="unloaded_count", data_type="int", label="同步物料数量", data_key="unloaded_count", data_source=DataSource.EXECUTOR),
ActionOutputHandle(key="success", data_type="bool", label="同步是否成功", data_key="success", data_source=DataSource.EXECUTOR),
],
)
def unload_materials(
self,
order_id: str = "",
material_ids: Optional[List[str]] = None,
preintake_ids: Optional[List[str]] = None,
unloadTable: Optional[Dict[str, Any]] = None,
materials_unloaded: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
"""节点 2人工下料 manual_confirm 解锁后调用 take-out 通知奔耀同步状态。
时序:操作员物理下料 → 勾选 ``materials_unloaded=True`` → 批准 →
manual_confirm 解除阻塞 → 此处调用 ``take-out`` 让奔耀清空对应库位状态。
"""
del unloadTable, kwargs # unloadTable 仅供前端弹窗渲染,本节点函数体不消费
with self._debug_call_session("unload_materials"):
if not bool(materials_unloaded):
raise RuntimeError("下料未确认,拒绝结束节点")
resolved_order_id = str(order_id or "").strip()
if not resolved_order_id:
raise PeptideWorkflowError("unload_materials 缺少 order_id")
material_ids_list = [str(item) for item in (material_ids or []) if item]
preintake_ids_list = [str(item) for item in (preintake_ids or []) if item]
rpc = self._require_hardware_interface()
try:
take_out_result = rpc.take_out(
resolved_order_id,
preintake_ids=preintake_ids_list,
material_ids=material_ids_list,
) or {}
except Exception as exc:
logger.warning(
"take_out 调用异常 order_id=%s material_ids=%s: %s",
resolved_order_id,
material_ids_list,
exc,
)
take_out_result = {"code": 0, "message": f"take_out_invoke_failed: {exc}"}
code_value = take_out_result.get("code") if isinstance(take_out_result, dict) else None
success = bool(isinstance(take_out_result, dict) and code_value == 1)
if not success:
logger.warning(
"take_out 业务失败,未阻塞工作流,请人工核对奔耀库位 order_id=%s response=%s",
resolved_order_id,
take_out_result,
)
return {
"success": success,
"order_id": resolved_order_id,
"material_ids": material_ids_list,
"preintake_ids": preintake_ids_list,
"unloaded_count": len(material_ids_list),
"take_out_result": take_out_result,
}
@action( @action(
always_free=True, always_free=True,
goal_default={ goal_default={
"reset_operations": ["scheduler_reset", "reset_order_status", "reset_location"], "reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
}, },
description="复位调度器/订单/库", description="自动复位调度器/订单状态/库位,可选仪器复",
) )
def reset( def reset_auto(
self, self,
reset_operations: Optional[ reset_scheduler: bool = True,
List[Literal["scheduler_reset", "reset_order_status", "reset_location"]] reset_order_status: bool = True,
] = None, reset_location: bool = True,
reset_devices: bool = False,
**kwargs: Any, **kwargs: Any,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
with self._debug_call_session("reset"): """自动复位调度器/订单状态/库位,可选仪器复位。
operations = self._normalize_reset_operations(reset_operations)
result: Dict[str, Any] = {
"selected_operations": operations,
"executed_calls": [],
"skipped_operations": [],
}
rpc = self._require_hardware_interface()
for operation in operations:
if operation == "scheduler_reset":
code = rpc.scheduler_reset()
result["executed_calls"].append({"operation": operation, "result": {"code": code}})
elif operation == "reset_order_status":
resolved = str(
kwargs.get("reset_order_id") or kwargs.get("order_id") or ""
).strip()
if not resolved:
result["skipped_operations"].append(
{"operation": operation, "reason": "缺少 order_id/reset_order_id"}
)
continue
code = rpc.reset_order_status(resolved)
result["executed_calls"].append(
{"operation": operation, "order_id": resolved, "result": {"code": code}}
)
elif operation == "reset_location":
resolved = str(
kwargs.get("reset_location_id") or kwargs.get("location_id") or ""
).strip()
if not resolved:
result["skipped_operations"].append(
{"operation": operation, "reason": "缺少 location_id/reset_location_id"}
)
continue
code = rpc.reset_location(resolved)
result["executed_calls"].append(
{"operation": operation, "location_id": resolved, "result": {"code": code}}
)
else:
raise ValueError(f"未知 reset operation: {operation}")
return result
@action(always_free=True, description="从 Bioyond 同步库存物料到本地资源树") Args:
def sync_from_external(self, **kwargs: Any) -> Dict[str, Any]: reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
"""
del kwargs del kwargs
with self._debug_call_session("sync_from_external"): with self._debug_call_session("reset_auto"):
self._require_hardware_interface() return self._execute_reset_operations(
if not getattr(self, "resource_synchronizer", None): reset_scheduler=bool(reset_scheduler),
raise RuntimeError("BioyondPeptideStation 未初始化 resource_synchronizer") reset_order_status=bool(reset_order_status),
reset_location=bool(reset_location),
reset_devices=bool(reset_devices),
)
success = bool(self.resource_synchronizer.sync_from_external()) @action(
resource_tree_update_requested = self._publish_resource_tree_update() if success else False always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
"physical_cleanup_confirmed": False,
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description=RESET_MANUAL_CONFIRM_MESSAGE,
)
def reset_manual(
self,
reset_scheduler: bool = True,
reset_order_status: bool = True,
reset_location: bool = True,
reset_devices: bool = False,
physical_cleanup_confirmed: bool = False,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""人工确认物理清理完毕后执行复位。
操作员需先按弹窗提示完成 G3/CEM/Tecan/撕膜机/封膜机/打标机/旋转堆栈/3 个转台
等位置的物料清理,并开门检查冰箱/IDOT/酶标仪/离心机/LCMS 内部无遗留,再勾选
``physical_cleanup_confirmed``,节点才会真正调用复位接口。
Args:
reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset默认勾选。
reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status默认勾选。
reset_location[库位复位]: 调用 /api/lims/storage/reset-location默认勾选。
reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices默认不勾选。
physical_cleanup_confirmed[物理清理确认]: 确认弹窗中的物料检查已完成,默认不勾选;未勾选时不会调用任何 RPC。
"""
del kwargs, timeout_seconds, assignee_user_ids
with self._debug_call_session("reset_manual"):
if not bool(physical_cleanup_confirmed):
logger.info("[reset_manual] 物理清理未确认,拒绝执行复位 RPC")
return { return {
"success": success, "status": "blocked",
"action": "sync_from_external", "physical_cleanup_confirmed": False,
"resource_tree_update_requested": resource_tree_update_requested, "confirmation_message": RESET_MANUAL_CONFIRM_MESSAGE,
"message": "Bioyond 资源同步成功" if success else "Bioyond 资源同步失败或无库存数据", "selected_operations": self._build_selected_operations_summary(
reset_scheduler=bool(reset_scheduler),
reset_order_status=bool(reset_order_status),
reset_location=bool(reset_location),
reset_devices=bool(reset_devices),
),
"executed_calls": [],
"skipped_operations": [
{"operation": op, "reason": "physical_cleanup_not_confirmed"}
for op in RESET_OPERATION_KEYS
],
"warnings": ["physical_cleanup_not_confirmed"],
} }
payload = self._execute_reset_operations(
reset_scheduler=bool(reset_scheduler),
reset_order_status=bool(reset_order_status),
reset_location=bool(reset_location),
reset_devices=bool(reset_devices),
)
payload["physical_cleanup_confirmed"] = True
payload["confirmation_message"] = RESET_MANUAL_CONFIRM_MESSAGE
return payload
@action(always_free=True, description="启动 Bioyond 调度器") @action(always_free=True, description="启动 Bioyond 调度器")
def scheduler_start(self, **kwargs: Any) -> Dict[str, Any]: def scheduler_start(self, **kwargs: Any) -> Dict[str, Any]:
@@ -1369,44 +1668,172 @@ class BioyondPeptideStation(BioyondWorkstation):
"result_list_count": len(self._as_list(raw.get("resultList"))), "result_list_count": len(self._as_list(raw.get("resultList"))),
} }
# ---------- 基础设施 ---------- # ---------- wait_for_order_finish / unload_materials 辅助 ----------
def _publish_resource_tree_update(self) -> bool: def _wait_single_order_finish(
"""触发当前 deck 的资源树更新,让前端看到最新同步结果。""" self,
ros_node = getattr(self, "_ros_node", None) order_code: str,
if ros_node is None: timeout_seconds: int,
logger.warning("资源树更新跳过: 未绑定 _ros_node") *,
return False poll_mode: bool = False,
poll_interval: float = 0.5,
) -> Dict[str, Any]:
"""阻塞等待单个 orderCode 的 LIMS 完成推送,返回 ``{status, report}``.
deck = getattr(self, "deck", None) 与 :class:`BioyondCellWorkstation` 保持相同语义:
if deck is None: - 状态映射 ``"30" -> success`` / ``"-11" -> abnormal_stop`` /
logger.warning("资源树更新跳过: 未绑定 deck") ``"-12" -> manual_stop`` / 其它 ``unknown_<status>``;超时返回 ``timeout``。
return False """
if not order_code:
return {"status": "error", "report": {}, "message": "empty order_code"}
self.last_order_code = order_code
self.last_order_report = None
self.order_finish_event.clear()
timeout_value = max(int(timeout_seconds or 0), 1)
logger.info("[peptide.order_finish] 开始等待 orderCode=%s timeout=%ss poll_mode=%s", order_code, timeout_value, poll_mode)
update_resource = getattr(ros_node, "update_resource", None) if poll_mode:
if update_resource is None: start_time = time.time()
logger.warning("资源树更新跳过: _ros_node 缺少 update_resource") while not self.order_finish_event.is_set():
return False if time.time() - start_time > timeout_value:
logger.error("[peptide.order_finish] 等待超时 orderCode=%s", order_code)
try: return {"status": "timeout", "report": {}, "orderCode": order_code}
try: time.sleep(poll_interval)
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode # type: ignore
except Exception: # pragma: no cover
ROS2DeviceNode = None # type: ignore[assignment]
if ROS2DeviceNode is not None and hasattr(ROS2DeviceNode, "run_async_func"):
ROS2DeviceNode.run_async_func(update_resource, True, **{"resources": [deck]})
else: else:
update_resource(resources=[deck]) if not self.order_finish_event.wait(timeout=timeout_value):
logger.error("[peptide.order_finish] 等待超时 orderCode=%s", order_code)
return {"status": "timeout", "report": {}, "orderCode": order_code}
logger.info("已调度多肽 deck '%s' 的资源树更新", getattr(deck, "name", "")) report = self.last_order_report or {}
return True report_code = str(report.get("orderCode") or "")
except TypeError as exc: if report_code and report_code != order_code:
logger.error("资源树更新失败: update_resource 调用签名错误: %s", exc) logger.warning("[peptide.order_finish] 报送 orderCode 不匹配 期望=%s 实际=%s", order_code, report_code)
raise return {"status": "mismatch", "report": report}
status_text = str(report.get("status") or "").strip()
status_map = {"30": "success", "-11": "abnormal_stop", "-12": "manual_stop"}
normalized = status_map.get(status_text, f"unknown_{status_text or 'empty'}")
return {"status": normalized, "report": report}
def _resolve_order_code(self, order_id: str, fallback: str = "") -> str:
"""将 order_id (UUID) 反查为 orderCode。fallback 用于 CLI 调试时直接传 orderCode。"""
order_id_clean = str(order_id or "").strip()
if not order_id_clean:
return fallback.strip()
try:
raw = self._require_hardware_interface().order_report(order_id_clean) or {}
except Exception as exc: except Exception as exc:
logger.warning("资源树更新失败: %s", exc) logger.warning("反查 orderCode 失败 order_id=%s: %s", order_id_clean, exc)
return False return fallback.strip()
if isinstance(raw, dict):
for key in ("code", "orderCode", "order_code"):
value = raw.get(key)
if value:
return str(value)
return fallback.strip()
def _extract_used_materials(self, report: Dict[str, Any]) -> List[Dict[str, Any]]:
if not isinstance(report, dict):
return []
result: List[Dict[str, Any]] = []
for item in self._as_list(report.get("usedMaterials")):
if isinstance(item, dict):
result.append(item)
return result
@staticmethod
def _collect_material_ids(used_materials: List[Dict[str, Any]]) -> List[str]:
ids: List[str] = []
for item in used_materials:
material_id = item.get("materialId") or item.get("MaterialId") or ""
if material_id:
ids.append(str(material_id))
return ids
@staticmethod
def _collect_preintake_ids(used_materials: List[Dict[str, Any]]) -> List[str]:
ids: List[str] = []
for item in used_materials:
preintake_id = item.get("preintakeId") or item.get("preIntakeId") or ""
if preintake_id:
ids.append(str(preintake_id))
return ids
def _build_unload_rows(
self,
used_materials: List[Dict[str, Any]],
*,
material_info_cache: Dict[str, Dict[str, Any]],
missing_material_info: List[str],
order_code: Optional[str] = None,
) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for material in used_materials:
material_id = str(material.get("materialId") or material.get("MaterialId") or "")
info = self._fetch_material_info_cached(material_id, material_info_cache, missing_material_info)
location = self._first_location(info)
row = {
"whName": str(location.get("whName") or ""),
"posX": self._stringify_coord(location.get("posX")),
"posY": self._stringify_coord(location.get("posY")),
"posZ": self._stringify_coord(location.get("posZ")),
"unit": str(info.get("unit") or location.get("unit") or ""),
"materialName": str(info.get("name") or ""),
"materialId": material_id,
"typeMode": str(material.get("typeMode") or material.get("typemode") or ""),
}
if order_code is not None:
row["orderCode"] = order_code
rows.append(row)
return rows
def _fetch_material_info_cached(
self,
material_id: str,
cache: Dict[str, Dict[str, Any]],
missing_material_info: List[str],
) -> Dict[str, Any]:
if not material_id:
return {}
if material_id in cache:
return cache[material_id]
try:
info = self._require_hardware_interface().material_info(material_id) or {}
except Exception as exc:
logger.warning("material_info 查询失败 material_id=%s: %s", material_id, exc)
info = {}
if not isinstance(info, dict) or not info:
missing_material_info.append(material_id)
info = {}
cache[material_id] = info
return info
def _first_location(self, info: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(info, dict):
return {}
for location in self._as_list(info.get("locations")):
if isinstance(location, dict):
return location
return {}
@staticmethod
def _stringify_coord(value: Any) -> str:
if value is None:
return ""
if isinstance(value, float):
if value.is_integer():
return str(int(value))
return str(value)
@staticmethod
def _compose_unload_table(rows: List[Dict[str, Any]], *, multi_order: bool) -> Dict[str, Any]:
columns = UNLOAD_TABLE_COLUMNS_MULTI_ORDER if multi_order else UNLOAD_TABLE_COLUMNS
return {
"data": rows,
"columns": copy.deepcopy(columns),
"tableName": "unloadTable",
}
# ---------- 基础设施 ----------
def _run_scheduler_action(self, method_name: str, label: str) -> Dict[str, Any]: def _run_scheduler_action(self, method_name: str, label: str) -> Dict[str, Any]:
rpc = self._require_hardware_interface() rpc = self._require_hardware_interface()
@@ -1424,32 +1851,105 @@ class BioyondPeptideStation(BioyondWorkstation):
return interface return interface
@staticmethod @staticmethod
def _normalize_reset_operations(reset_operations: Optional[List[str]]) -> List[str]: def _build_selected_operations_summary(
alias_map = { *,
"scheduler": "scheduler_reset", reset_scheduler: bool,
"scheduler_reset": "scheduler_reset", reset_order_status: bool,
"order": "reset_order_status", reset_location: bool,
"order_status": "reset_order_status", reset_devices: bool,
"reset_order_status": "reset_order_status", ) -> List[Dict[str, Any]]:
"location": "reset_location", flags: Dict[str, bool] = {
"reset_location": "reset_location", "reset_scheduler": bool(reset_scheduler),
"reset_order_status": bool(reset_order_status),
"reset_location": bool(reset_location),
"reset_devices": bool(reset_devices),
} }
normalized: List[str] = [] return [
for operation in list(reset_operations or DEFAULT_RESET_OPERATIONS): {"key": key, "label": RESET_OPERATION_LABELS[key], "selected": flags[key]}
canonical = alias_map.get(str(operation).strip()) for key in RESET_OPERATION_KEYS
if not canonical: ]
raise ValueError(f"未知 reset operation: {operation}")
if canonical not in normalized:
normalized.append(canonical)
return normalized
@staticmethod def _execute_reset_operations(
def _reset_operation_endpoint(operation: str) -> str: self,
return { *,
"scheduler_reset": "/api/lims/scheduler/reset", reset_scheduler: bool,
"reset_order_status": "/api/lims/order/reset-order-status", reset_order_status: bool,
"reset_location": "/api/lims/storage/reset-location", reset_location: bool,
}.get(operation, "") reset_devices: bool,
) -> Dict[str, Any]:
"""根据 4 个 checkbox 选择顺序调用对应 RPC。
- 调用顺序固定为 scheduler → order_status → location → devices
- 单步失败(``code != 1`` 或 RPC 抛异常)记 warning 但继续执行后续选中的步骤,
不做 fail-fast便于操作员在遇到部分故障时仍能完成可恢复的复位。
"""
rpc = self._require_hardware_interface()
flags: Dict[str, bool] = {
"reset_scheduler": bool(reset_scheduler),
"reset_order_status": bool(reset_order_status),
"reset_location": bool(reset_location),
"reset_devices": bool(reset_devices),
}
selected_operations = self._build_selected_operations_summary(
reset_scheduler=reset_scheduler,
reset_order_status=reset_order_status,
reset_location=reset_location,
reset_devices=reset_devices,
)
result: Dict[str, Any] = {
"selected_operations": selected_operations,
"executed_calls": [],
"skipped_operations": [],
"warnings": [],
}
rpc_method_map: Dict[str, str] = {
"reset_scheduler": "scheduler_reset",
"reset_order_status": "reset_order_status",
"reset_location": "reset_location",
"reset_devices": "reset_devices",
}
for operation in RESET_OPERATION_KEYS:
if not flags[operation]:
result["skipped_operations"].append(
{"operation": operation, "reason": "checkbox_disabled"}
)
continue
method_name = rpc_method_map[operation]
method = getattr(rpc, method_name, None)
endpoint = RESET_OPERATION_ENDPOINTS[operation]
if not callable(method):
msg = f"RPC 缺少方法: {method_name}"
logger.warning("[reset] %s", msg)
result["executed_calls"].append({
"operation": operation,
"endpoint": endpoint,
"result": {"code": 0},
"error": msg,
})
result["warnings"].append(f"{operation}: {msg}")
continue
try:
code = method()
except Exception as exc: # 单步异常不阻断其余 reset
logger.warning("[reset] %s 调用异常: %s", method_name, exc)
result["executed_calls"].append({
"operation": operation,
"endpoint": endpoint,
"result": {"code": 0},
"error": str(exc),
})
result["warnings"].append(f"{operation}: {exc}")
continue
result["executed_calls"].append({
"operation": operation,
"endpoint": endpoint,
"result": {"code": code},
})
if code != 1:
result["warnings"].append(f"{operation}: rpc_returned_non_one_code={code}")
return result
@staticmethod @staticmethod
def _extract_order_ids(order_id: str = "", order_ids: Optional[List[str]] = None, **kwargs: Any) -> List[str]: def _extract_order_ids(order_id: str = "", order_ids: Optional[List[str]] = None, **kwargs: Any) -> List[str]:

View File

@@ -95,7 +95,8 @@ def test_required_actions_exposed() -> None:
"submit_experiment_day4", "submit_experiment_day4",
"submit_experiment_day4_LCMS", "submit_experiment_day4_LCMS",
"start_experiment", "start_experiment",
"reset", "reset_auto",
"reset_manual",
"scheduler_start", "scheduler_start",
"scheduler_stop", "scheduler_stop",
"scheduler_pause", "scheduler_pause",
@@ -112,14 +113,14 @@ def test_required_actions_exposed() -> None:
def test_manual_confirm_node_types() -> None: def test_manual_confirm_node_types() -> None:
module = _import_module() module = _import_module()
cls = getattr(module, CLASS_NAME) cls = getattr(module, CLASS_NAME)
manual = {"submit_experiment_day1", "start_experiment"} manual = {"submit_experiment_day1", "start_experiment", "reset_manual"}
normal = { normal = {
"submit_experiment", "submit_experiment",
"submit_experiment_day2", "submit_experiment_day2",
"submit_experiment_day3", "submit_experiment_day3",
"submit_experiment_day4", "submit_experiment_day4",
"submit_experiment_day4_LCMS", "submit_experiment_day4_LCMS",
"reset", "reset_auto",
"scheduler_start", "scheduler_start",
"list_sample_excels", "list_sample_excels",
"get_step_parameters", "get_step_parameters",
@@ -142,7 +143,7 @@ def test_submit_and_reset_signatures_exclude_legacy_manual_confirm() -> None:
"submit_experiment_day3", "submit_experiment_day3",
"submit_experiment_day4", "submit_experiment_day4",
"submit_experiment_day4_LCMS", "submit_experiment_day4_LCMS",
"reset", "reset_auto",
): ):
params = inspect.signature(getattr(cls, name)).parameters params = inspect.signature(getattr(cls, name)).parameters
assert "timeout_seconds" not in params, name assert "timeout_seconds" not in params, name
@@ -612,70 +613,579 @@ def test_start_experiment_starts_when_table_empty() -> None:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 10. Reset # 10. Reset (plan 2026-05-21: reset_auto + reset_manual 四勾选)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_reset_signature_drops_legacy_params_and_uses_literal() -> None: RESET_BOOL_PARAMS = (
"""plan 调整:删除 dry_run/order_id/location_idreset_operations 用 Literal 注解。""" "reset_scheduler",
cls = getattr(_import_module(), CLASS_NAME)
sig = inspect.signature(cls.reset)
params = sig.parameters
for legacy in ("dry_run", "order_id", "location_id"):
assert legacy not in params, f"reset 不应再有 {legacy} 入参"
assert "reset_operations" in params
assert any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()), \
"reset 必须保留 **kwargs 以兜底 reset_order_id/reset_location_id"
annotation = params["reset_operations"].annotation
rendered = annotation if isinstance(annotation, str) else repr(annotation)
for op in ("scheduler_reset", "reset_order_status", "reset_location"):
assert op in rendered, f"reset_operations 的 Literal 必须包含 {op}"
def test_reset_goal_default_contains_all_operations() -> None:
"""像 sirna 一样goal_default 默认勾选全部三个 reset 操作。"""
cls = getattr(_import_module(), CLASS_NAME)
meta = getattr(cls.reset, "_action_registry_meta", {})
goal_default = meta.get("goal_default") or {}
assert goal_default.get("reset_operations") == [
"scheduler_reset",
"reset_order_status", "reset_order_status",
"reset_location", "reset_location",
] "reset_devices",
def test_reset_executes_typed_rpc_calls() -> None:
station = _make_station()
station.hardware_interface.scheduler_reset.return_value = 1
station.hardware_interface.reset_order_status.return_value = 1
station.hardware_interface.reset_location.return_value = 1
out = station.reset(
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"],
reset_order_id=ORDER_GUID,
reset_location_id="loc-1",
) )
station.hardware_interface.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_called_once_with(ORDER_GUID)
station.hardware_interface.reset_location.assert_called_once_with("loc-1") def _reset_meta(name: str) -> Dict[str, Any]:
assert out["selected_operations"] == [ cls = getattr(_import_module(), CLASS_NAME)
"scheduler_reset", return dict(getattr(getattr(cls, name), "_action_registry_meta", {}))
# --- plan §Tests 1: reset_auto 不是 MANUAL_CONFIRM ---
def test_reset_auto_is_not_manual_confirm() -> None:
module = _import_module()
meta = _reset_meta("reset_auto")
assert meta.get("node_type") != module.NodeType.MANUAL_CONFIRM
# --- plan §Tests 2: reset_manual 是 MANUAL_CONFIRM ---
def test_reset_manual_is_manual_confirm() -> None:
module = _import_module()
meta = _reset_meta("reset_manual")
assert meta.get("node_type") == module.NodeType.MANUAL_CONFIRM
# --- plan §Tests 3: reset_manual 关键 metadata ---
def test_reset_manual_metadata_shape() -> None:
meta = _reset_meta("reset_manual")
assert meta.get("always_free") is True
assert meta.get("placeholder_keys") == {
"assignee_user_ids": "unilabos_manual_confirm",
}
goal_default = meta.get("goal_default") or {}
assert goal_default.get("timeout_seconds") == 3600
assert goal_default.get("assignee_user_ids") == []
assert goal_default.get("physical_cleanup_confirmed") is False
# --- plan §Tests 4: 两个 action 都暴露 4 个真实 bool 参数 ---
def _resolved_bool_annotation(param: inspect.Parameter) -> Any:
"""`from __future__ import annotations` 下注解是字符串;统一解析回真实类型。"""
annotation = param.annotation
if annotation is bool:
return bool
if isinstance(annotation, str):
# 既不是 Annotated[...] 也不是 Optional[...] 的纯 "bool" 字符串
return bool if annotation.strip() == "bool" else annotation
return annotation
def test_reset_actions_expose_four_real_bool_params() -> None:
cls = getattr(_import_module(), CLASS_NAME)
for action_name in ("reset_auto", "reset_manual"):
params = inspect.signature(getattr(cls, action_name)).parameters
for flag in RESET_BOOL_PARAMS:
assert flag in params, f"{action_name} 缺少 {flag}"
resolved = _resolved_bool_annotation(params[flag])
assert resolved is bool, (
f"{action_name}.{flag} 必须是裸 bool不能用 Annotated[bool, Field(...)] 包裹),实际: {params[flag].annotation!r}"
)
assert params[flag].kind in (
inspect.Parameter.POSITIONAL_OR_KEYWORD,
inspect.Parameter.KEYWORD_ONLY,
), f"{action_name}.{flag} 必须是真实参数,不能藏在 **kwargs"
# --- plan §Tests 5: registry 生成的 schema 标记 reset 字段为 boolean ---
def test_reset_action_param_annotations_are_bool_for_schema() -> None:
"""plan: 当前 AST registry 不 unwrap Annotated裸 bool 才能映射成 type: boolean。
这里直接检查 type_to_schema 在 Python 类型 ``bool`` 上返回 ``{"type": "boolean"}``
再校验 reset_auto/reset_manual 的真实参数注解就是 ``bool``(裸字符串 "bool" 也算),
从而保证生成的 JSON Schema 一定是 boolean不会被前端当成 object/string。
"""
from unilabos.registry.utils import type_to_schema
assert type_to_schema(bool) == {"type": "boolean"}
cls = getattr(_import_module(), CLASS_NAME)
for action_name in ("reset_auto", "reset_manual"):
params = inspect.signature(getattr(cls, action_name)).parameters
for flag in RESET_BOOL_PARAMS:
resolved = _resolved_bool_annotation(params[flag])
assert type_to_schema(resolved) == {"type": "boolean"}, (
f"{action_name}.{flag} schema 必须是 boolean实际注解: {params[flag].annotation!r}"
)
# --- plan §Tests 6: reset_auto 替换旧 reset未保留旧 id-shaped reset 别名 ---
def test_legacy_reset_action_removed() -> None:
cls = getattr(_import_module(), CLASS_NAME)
have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)}
assert "reset" not in have, "旧 reset 应被 reset_auto 完全替换,不保留同名别名"
assert "reset_auto" in have
# --- plan §Tests 7: goal_default 中前三项 Truereset_devices=False ---
def test_reset_goal_defaults_first_three_true_devices_false() -> None:
for action_name in ("reset_auto", "reset_manual"):
meta = _reset_meta(action_name)
goal_default = meta.get("goal_default") or {}
assert goal_default.get("reset_scheduler") is True, action_name
assert goal_default.get("reset_order_status") is True, action_name
assert goal_default.get("reset_location") is True, action_name
assert goal_default.get("reset_devices") is False, action_name
# --- plan §Tests 8: reset_manual(physical_cleanup_confirmed=False) 不调任何 RPC ---
def test_reset_manual_blocks_when_not_confirmed() -> None:
station = _make_station()
out = station.reset_manual(
reset_scheduler=True,
reset_order_status=True,
reset_location=True,
reset_devices=True,
physical_cleanup_confirmed=False,
)
rpc = station.hardware_interface
rpc.scheduler_reset.assert_not_called()
rpc.reset_order_status.assert_not_called()
rpc.reset_location.assert_not_called()
rpc.reset_devices.assert_not_called()
assert out["status"] == "blocked"
assert out["physical_cleanup_confirmed"] is False
assert "请确认" in out["confirmation_message"]
# --- plan §Tests 9: reset_auto() 默认调三件、不调 reset_devices ---
def test_reset_auto_defaults_call_three_and_skip_devices() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
out = station.reset_auto()
rpc.scheduler_reset.assert_called_once_with()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
rpc.reset_devices.assert_not_called()
executed_ops = [item["operation"] for item in out["executed_calls"]]
assert executed_ops == ["reset_scheduler", "reset_order_status", "reset_location"]
skipped_ops = {item["operation"] for item in out["skipped_operations"]}
assert skipped_ops == {"reset_devices"}
selected = {item["key"]: item["selected"] for item in out["selected_operations"]}
assert selected == {
"reset_scheduler": True,
"reset_order_status": True,
"reset_location": True,
"reset_devices": False,
}
# --- plan §Tests 10: reset_auto(reset_devices=True) 也会调 reset_devices ---
def test_reset_auto_with_devices_true_calls_reset_devices() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
rpc.reset_devices.return_value = 1
out = station.reset_auto(reset_devices=True)
rpc.reset_devices.assert_called_once_with()
executed_ops = [item["operation"] for item in out["executed_calls"]]
assert executed_ops == [
"reset_scheduler",
"reset_order_status", "reset_order_status",
"reset_location", "reset_location",
"reset_devices",
] ]
assert len(out["executed_calls"]) == 3
assert out["skipped_operations"] == [] assert out["skipped_operations"] == []
def test_reset_skips_when_ids_missing() -> None: def test_reset_auto_individual_checkboxes_drive_calls() -> None:
"""没有 order_id / location_id 时应该 skip 而不是抛错""" """更细粒度:单独勾 reset_scheduler 时只调 scheduler_reset"""
station = _make_station() station = _make_station()
station.hardware_interface.scheduler_reset.return_value = 1 rpc = station.hardware_interface
out = station.reset( rpc.scheduler_reset.return_value = 1
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"], out = station.reset_auto(
reset_scheduler=True,
reset_order_status=False,
reset_location=False,
reset_devices=False,
) )
station.hardware_interface.scheduler_reset.assert_called_once_with() rpc.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_not_called() rpc.reset_order_status.assert_not_called()
station.hardware_interface.reset_location.assert_not_called() rpc.reset_location.assert_not_called()
skipped_ops = {item["operation"] for item in out["skipped_operations"]} rpc.reset_devices.assert_not_called()
assert skipped_ops == {"reset_order_status", "reset_location"} skipped = {item["operation"] for item in out["skipped_operations"]}
assert skipped == {"reset_order_status", "reset_location", "reset_devices"}
def test_reset_manual_after_confirmation_calls_same_helper() -> None:
"""plan §reset_manual 执行规则:勾选确认后等价于 reset_auto。"""
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
rpc.reset_devices.return_value = 1
out = station.reset_manual(
reset_scheduler=True,
reset_order_status=True,
reset_location=True,
reset_devices=True,
physical_cleanup_confirmed=True,
)
rpc.scheduler_reset.assert_called_once_with()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
rpc.reset_devices.assert_called_once_with()
assert out["physical_cleanup_confirmed"] is True
assert out["confirmation_message"]
assert [item["operation"] for item in out["executed_calls"]] == [
"reset_scheduler",
"reset_order_status",
"reset_location",
"reset_devices",
]
# --- plan §Tests 11: RPC 包装层 reset_order_status / reset_location 不发送 data 键 ---
def test_rpc_reset_order_status_sends_no_data_key() -> None:
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
rpc = object.__new__(BioyondV1RPC)
rpc.host = "http://test"
rpc.api_key = "k"
rpc._logger = MagicMock()
rpc.post = MagicMock(return_value={"code": 1}) # type: ignore[method-assign]
rpc.get_current_time_iso8601 = MagicMock(return_value="2026-05-21T08:00:00.000Z") # type: ignore[method-assign]
rpc.reset_order_status("ignored-uuid")
args, kwargs = rpc.post.call_args
sent_params = kwargs.get("params") or (args[1] if len(args) > 1 else {})
assert "data" not in sent_params, "reset_order_status 不应再发送 data 字段"
assert set(sent_params.keys()) == {"apiKey", "requestTime"}
def test_rpc_reset_location_sends_no_data_key() -> None:
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
rpc = object.__new__(BioyondV1RPC)
rpc.host = "http://test"
rpc.api_key = "k"
rpc._logger = MagicMock()
rpc.post = MagicMock(return_value={"code": 1}) # type: ignore[method-assign]
rpc.get_current_time_iso8601 = MagicMock(return_value="2026-05-21T08:00:00.000Z") # type: ignore[method-assign]
rpc.reset_location("ignored-loc-id")
args, kwargs = rpc.post.call_args
sent_params = kwargs.get("params") or (args[1] if len(args) > 1 else {})
assert "data" not in sent_params, "reset_location 不应再发送 data 字段"
assert set(sent_params.keys()) == {"apiKey", "requestTime"}
# --- plan §Tests 12 + 13: 任何 reset 路径都不调用 take_out / refresh_material_cache ---
def test_reset_paths_do_not_call_take_out_or_material_cache() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 1
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
rpc.reset_devices.return_value = 1
station.reset_auto(reset_devices=True)
station.reset_manual(physical_cleanup_confirmed=True, reset_devices=True)
station.reset_manual(physical_cleanup_confirmed=False)
rpc.take_out.assert_not_called()
refresh = getattr(rpc, "refresh_material_cache", None)
if refresh is not None and hasattr(refresh, "assert_not_called"):
refresh.assert_not_called()
# --- 失败/兜底用例:不 fail-fast单步异常或 code!=1 仅记 warning ---
def test_reset_auto_records_warning_when_rpc_returns_non_one() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.return_value = 0 # 业务失败
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
out = station.reset_auto()
rpc.scheduler_reset.assert_called_once_with()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
assert any("reset_scheduler" in w for w in out.get("warnings", []))
def test_reset_auto_continues_after_rpc_exception() -> None:
station = _make_station()
rpc = station.hardware_interface
rpc.scheduler_reset.side_effect = RuntimeError("HTTP 500")
rpc.reset_order_status.return_value = 1
rpc.reset_location.return_value = 1
out = station.reset_auto()
rpc.reset_order_status.assert_called_once_with()
rpc.reset_location.assert_called_once_with()
error_entries = [item for item in out["executed_calls"] if "error" in item]
assert any(item["operation"] == "reset_scheduler" for item in error_entries)
def test_reset_manual_confirmation_message_constant() -> None:
module = _import_module()
msg = module.RESET_MANUAL_CONFIRM_MESSAGE
for keyword in ("G3", "CEM", "Tecan", "撕膜机", "封膜机", "打标机", "旋转堆栈", "转台", "冰箱", "IDOT", "酶标仪", "离心机", "LCMS"):
assert keyword in msg, f"reset_manual 提示文案缺关键字: {keyword}"
meta = _reset_meta("reset_manual")
assert meta.get("description") == msg, "reset_manual 装饰器 description 应等于常量本身"
# ---------------------------------------------------------------------------
# 11. wait_for_order_finish + unload_materialsplan 2026-05-20 新增节点)
# ---------------------------------------------------------------------------
def _make_station_with_finish_state() -> Any:
"""带订单完成事件状态的 station 实例process_order_finish_report 用)。"""
import threading as _threading
station = _make_station()
station.order_finish_event = _threading.Event()
station.last_order_code = None
station.last_order_report = None
return station
def _make_report_request(order_code: str, status: str = "30", used_materials: List[Dict[str, Any]] | None = None) -> Any:
request = MagicMock()
request.data = {
"orderCode": order_code,
"orderName": f"实验{order_code}",
"startTime": "2026-05-20T10:00:00.000Z",
"endTime": "2026-05-20T11:00:00.000Z",
"status": status,
"usedMaterials": used_materials or [],
}
return request
def test_process_order_finish_report_triggers_event_on_match() -> None:
station = _make_station_with_finish_state()
station.last_order_code = "EXP260520-100000"
request = _make_report_request("EXP260520-100000", status="30")
station.process_order_finish_report(request, [])
assert station.order_finish_event.is_set() is True
assert station.last_order_report["orderCode"] == "EXP260520-100000"
def test_process_order_finish_report_ignores_mismatched_order_code() -> None:
station = _make_station_with_finish_state()
station.last_order_code = "EXP260520-100000"
request = _make_report_request("EXP260520-OTHER", status="30")
station.process_order_finish_report(request, [])
assert station.order_finish_event.is_set() is False
assert station.last_order_report["orderCode"] == "EXP260520-OTHER"
def test_wait_for_order_finish_returns_immediately_when_event_set() -> None:
"""事件预先 set + last_order_report 命中时wait 立即返回 success。"""
station = _make_station_with_finish_state()
used_materials = [
{"materialId": "mat-1", "locationId": "loc-1", "typeMode": "1", "usedQuantity": 1.0},
]
station.hardware_interface.order_report.return_value = {"code": "EXP260520-101010"}
station.hardware_interface.material_info.return_value = {
"name": "样品A",
"unit": "mg",
"locations": [{"whName": "自动化堆栈", "posX": 1, "posY": 2, "posZ": 3, "code": "1-01"}],
}
pending: Dict[str, Any] = {}
def _fake_wait(timeout: float = 0.0) -> bool:
pending["report"] = {
"orderCode": station.last_order_code,
"status": "30",
"usedMaterials": used_materials,
}
station.last_order_report = pending["report"]
return True
station.order_finish_event = MagicMock()
station.order_finish_event.wait = MagicMock(side_effect=_fake_wait)
station.order_finish_event.clear = MagicMock()
station.order_finish_event.set = MagicMock()
station.order_finish_event.is_set = MagicMock(return_value=True)
out = station.wait_for_order_finish(
order_id="11111111-1111-1111-1111-111111111111",
timeout_seconds=5,
)
assert out["order_finish_status"] == "success"
assert out["material_ids"] == ["mat-1"]
assert out["preintake_ids"] == []
table = out["unloadTable"]
assert table["tableName"] == "unloadTable"
column_keys = [c["key"] for c in table["columns"]]
assert column_keys == ["whName", "posX", "posY", "posZ", "unit", "materialName"]
row = table["data"][0]
assert row["whName"] == "自动化堆栈"
assert row["posX"] == "1"
assert row["posY"] == "2"
assert row["posZ"] == "3"
assert row["unit"] == "mg"
assert row["materialName"] == "样品A"
def test_wait_for_order_finish_returns_timeout_status() -> None:
station = _make_station_with_finish_state()
station.hardware_interface.order_report.return_value = {"code": "EXP260520-TIMEOUT"}
station.order_finish_event = MagicMock()
station.order_finish_event.wait = MagicMock(return_value=False)
station.order_finish_event.clear = MagicMock()
station.order_finish_event.is_set = MagicMock(return_value=False)
out = station.wait_for_order_finish(
order_id="22222222-2222-2222-2222-222222222222",
timeout_seconds=1,
)
assert out["order_finish_status"] == "timeout"
assert out["unloadTable"]["data"] == []
assert out["unload_summary"]["missing_material_info"] == []
def test_wait_for_order_finish_records_missing_material_info() -> None:
"""material-info 失败的物料行字段为空串,并被列入 missing_material_info。"""
station = _make_station_with_finish_state()
used_materials = [
{"materialId": "mat-good", "typeMode": "1"},
{"materialId": "mat-bad", "typeMode": "1"},
]
def _material_info(material_id: str) -> Dict[str, Any]:
if material_id == "mat-good":
return {
"name": "好物料",
"unit": "mg",
"locations": [{"whName": "WH-A", "posX": 4, "posY": 5, "posZ": 6}],
}
raise RuntimeError("HTTP 500")
station.hardware_interface.order_report.return_value = {"code": "EXP260520-MIX"}
station.hardware_interface.material_info.side_effect = _material_info
def _fake_wait(timeout: float = 0.0) -> bool:
station.last_order_report = {
"orderCode": station.last_order_code,
"status": "30",
"usedMaterials": used_materials,
}
return True
station.order_finish_event = MagicMock()
station.order_finish_event.wait = MagicMock(side_effect=_fake_wait)
station.order_finish_event.clear = MagicMock()
station.order_finish_event.set = MagicMock()
station.order_finish_event.is_set = MagicMock(return_value=True)
out = station.wait_for_order_finish(
order_id="33333333-3333-3333-3333-333333333333",
timeout_seconds=1,
)
rows = out["unloadTable"]["data"]
assert [row["materialId"] for row in rows] == ["mat-good", "mat-bad"]
bad_row = rows[1]
assert bad_row["whName"] == ""
assert bad_row["posX"] == ""
assert bad_row["posY"] == ""
assert bad_row["posZ"] == ""
assert bad_row["unit"] == ""
assert bad_row["materialName"] == ""
assert "mat-bad" in out["unload_summary"]["missing_material_info"]
assert "mat-good" not in out["unload_summary"]["missing_material_info"]
def test_unload_materials_blocks_when_not_confirmed() -> None:
station = _make_station()
with pytest.raises(RuntimeError):
station.unload_materials(
order_id=ORDER_GUID,
material_ids=["mat-1"],
preintake_ids=[],
unloadTable={"data": []},
materials_unloaded=False,
)
def test_unload_materials_calls_take_out_with_resolved_lists() -> None:
station = _make_station()
station.hardware_interface.take_out.return_value = {"code": 1, "message": "ok", "data": {}}
out = station.unload_materials(
order_id=ORDER_GUID,
material_ids=["mat-1", "mat-2"],
preintake_ids=[],
unloadTable={"data": [{"materialId": "mat-1"}, {"materialId": "mat-2"}]},
materials_unloaded=True,
)
station.hardware_interface.take_out.assert_called_once_with(
ORDER_GUID,
preintake_ids=[],
material_ids=["mat-1", "mat-2"],
)
assert out["success"] is True
assert out["unloaded_count"] == 2
assert out["take_out_result"] == {"code": 1, "message": "ok", "data": {}}
def test_unload_materials_does_not_raise_when_take_out_fails() -> None:
"""take_out 业务失败时仅 warn 不抛异常(已物理下料,避免阻塞工作流)。"""
station = _make_station()
station.hardware_interface.take_out.return_value = {"code": 0, "message": "已下过线"}
out = station.unload_materials(
order_id=ORDER_GUID,
material_ids=["mat-1"],
preintake_ids=[],
unloadTable={"data": []},
materials_unloaded=True,
)
assert out["success"] is False
assert out["take_out_result"] == {"code": 0, "message": "已下过线"}
def test_new_actions_registered_on_class() -> None:
cls = getattr(_import_module(), CLASS_NAME)
have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)}
assert "wait_for_order_finish" in have
assert "unload_materials" in have
module = _import_module()
unload_meta = getattr(cls.unload_materials, "_action_registry_meta", {})
assert unload_meta.get("node_type") == module.NodeType.MANUAL_CONFIRM
wait_meta = getattr(cls.wait_for_order_finish, "_action_registry_meta", {})
assert wait_meta.get("node_type") != module.NodeType.MANUAL_CONFIRM
assert wait_meta.get("always_free") is True
def test_unload_table_columns_constant_layout() -> None:
module = _import_module()
keys = [c["key"] for c in module.UNLOAD_TABLE_COLUMNS]
assert keys == ["whName", "posX", "posY", "posZ", "unit", "materialName"]
multi_keys = [c["key"] for c in module.UNLOAD_TABLE_COLUMNS_MULTI_ORDER]
assert multi_keys[0] == "orderCode"
assert multi_keys[1:] == keys