From c2c986105d22a3d708369fb65590900e4c190813 Mon Sep 17 00:00:00 2001 From: "hanhua@dp.tech" <2509856570@qq.com> Date: Fri, 22 May 2026 11:32:31 +0800 Subject: [PATCH] update reset --- .../workstation/bioyond_studio/bioyond_rpc.py | 20 +- .../peptide_station/peptide_station.py | 738 +++++++++++++++--- .../tests/test_peptide_station_contracts.py | 614 +++++++++++++-- 3 files changed, 1195 insertions(+), 177 deletions(-) diff --git a/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py b/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py index edb27c0c..a3faea62 100644 --- a/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py +++ b/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py @@ -415,21 +415,25 @@ class BioyondV1RPC(BaseRequest): return {} return response.get("data", {}) - def reset_location(self, location_id: str) -> int: + def reset_location(self, location_id: Optional[str] = None) -> int: """复位库位 + 现场实测 ``POST /api/lims/storage/reset-location`` 不传 ``data`` 即可成功 + (见 ``temp_benyao/peptide/_findings/2026-05-21_1615_remaining_resets_no_data_live.md``), + 因此默认无 ``data`` 字段;保留 ``location_id`` 仅为兼容旧调用,传入会被忽略。 + 参数: - location_id: 库位ID + location_id: 兼容入参,已被忽略;新逻辑不再以 location 为粒度复位。 返回值: int: 成功返回1,失败返回0 """ + del location_id response = self.post( url=f'{self.host}/api/lims/storage/reset-location', params={ "apiKey": self.api_key, "requestTime": self.get_current_time_iso8601(), - "data": location_id, }) if not response or response['code'] != 1: return 0 @@ -929,21 +933,25 @@ class BioyondV1RPC(BaseRequest): return {} return response.get("data", {}) - def reset_order_status(self, order_id: str) -> int: + def reset_order_status(self, order_id: Optional[str] = None) -> int: """复位订单状态 + 现场实测 ``POST /api/lims/order/reset-order-status`` 不传 ``data`` 即可成功 + (见 ``temp_benyao/peptide/_findings/2026-05-21_1613_reset_order_status_no_data_live.md``), + 因此默认无 ``data`` 字段;保留 ``order_id`` 仅为兼容旧调用,传入会被忽略。 + 参数: - order_id: 订单ID + order_id: 兼容入参,已被忽略;新逻辑不再以单订单为粒度复位。 返回值: int: 成功返回1,失败返回0 """ + del order_id response = self.post( url=f'{self.host}/api/lims/order/reset-order-status', params={ "apiKey": self.api_key, "requestTime": self.get_current_time_iso8601(), - "data": order_id, }) if not response or response['code'] != 1: return 0 diff --git a/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py b/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py index dbfd0344..5de5b9ad 100644 --- a/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py +++ b/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py @@ -8,10 +8,12 @@ import copy import json import mimetypes import sys +import threading +import time from contextlib import nullcontext from datetime import datetime, timezone from pathlib import Path -from typing import Annotated, Any, Dict, Iterable, List, Literal, Optional, Tuple +from typing import Annotated, Any, Dict, Iterable, List, Optional, Tuple from uuid import UUID import requests @@ -94,13 +96,46 @@ except Exception as exc: # pragma: no cover DEBUG_CLI_ENABLED = False -DEFAULT_RESET_OPERATIONS = ("scheduler_reset", "reset_order_status", "reset_location") +RESET_OPERATION_KEYS: Tuple[str, ...] = ( + "reset_scheduler", + "reset_order_status", + "reset_location", + "reset_devices", +) +RESET_OPERATION_LABELS: Dict[str, str] = { + "reset_scheduler": "调度器复位", + "reset_order_status": "订单状态复位", + "reset_location": "库位复位", + "reset_devices": "仪器复位", +} +RESET_OPERATION_ENDPOINTS: Dict[str, str] = { + "reset_scheduler": "/api/lims/scheduler/reset", + "reset_order_status": "/api/lims/order/reset-order-status", + "reset_location": "/api/lims/storage/reset-location", + "reset_devices": "/api/lims/device/reset-devices", +} +RESET_MANUAL_CONFIRM_MESSAGE = ( + "请确认G3、CEM、Tecan、撕膜机、封膜机、打标机、旋转堆栈上下料位、3个转台等位置的物料已清理完毕;\n" + "请开门检查冰箱、IDOT、酶标仪、离心机、LCMS内部没有遗留物料。" +) RESULT_TABLE_COLUMNS = [ {"name": "设备", "key": "whName"}, {"name": "位置", "key": "locationCode"}, {"name": "物料名称", "key": "materialName"}, {"name": "数量", "key": "quantity"}, ] +UNLOAD_TABLE_COLUMNS = [ + {"name": "仓库名称", "key": "whName"}, + {"name": "坐标 X", "key": "posX"}, + {"name": "坐标 Y", "key": "posY"}, + {"name": "坐标 Z", "key": "posZ"}, + {"name": "单位", "key": "unit"}, + {"name": "物料名称", "key": "materialName"}, +] +UNLOAD_TABLE_COLUMNS_MULTI_ORDER = [ + {"name": "订单编号", "key": "orderCode"}, + *UNLOAD_TABLE_COLUMNS, +] MATERIAL_TYPE_ORDER = ("Sample", "Consumables", "Reagent") PEPTIDE_SAMPLE_FILE_KEY = "SampleFile" DAY1_CEM_METHOD_KEY = "CEMMethodFileName" @@ -291,6 +326,13 @@ class BioyondPeptideStation(BioyondWorkstation): self.protocol_type = protocol_type self.bioyond_config = merged_config super().__init__(bioyond_config=self.bioyond_config, deck=deck) + # 订单完成报送等待机制(多肽场景): + # - last_order_code 记录当前正在等待的 orderCode(业务编号),用于回调侧多订单隔离 + # - last_order_report 缓存最近一次匹配到的 report.data + # - order_finish_event 用于阻塞等待 + 唤醒 wait_for_order_finish 动作 + self.order_finish_event = threading.Event() + self.last_order_code: Optional[str] = None + self.last_order_report: Optional[Dict[str, Any]] = None logger.info("BioyondPeptideStation 初始化完成: %s", self.bioyond_config.get("api_host", "")) def _debug_call_session(self, action_name: str): @@ -722,6 +764,9 @@ class BioyondPeptideStation(BioyondWorkstation): ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"), ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"), ActionInputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.HANDLE, io_type="source"), + ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.EXECUTOR), ], ) def start_experiment( @@ -738,83 +783,337 @@ class BioyondPeptideStation(BioyondWorkstation): if table_rows and not materials_loaded: raise RuntimeError("多肽物料装载未确认,拒绝启动调度器") result = self._run_scheduler_action("scheduler_start", "启动") + result["order_id"] = resolved_order_ids[0] if resolved_order_ids else str(order_id or "") result["order_ids"] = resolved_order_ids result["materials_loaded"] = bool(materials_loaded) result["resultTable"] = resultTable or {} return result + def process_order_finish_report(self, report_request, used_materials: Optional[List[Any]] = None) -> Dict[str, Any]: + """处理 LIMS /report/order_finish 推送:保留父类语义,并按 orderCode 唤醒等待动作。 + + 说明: + - 工作站 HTTP 服务为进程级单例,所有 wait 节点共用同一条推送通道; + 需要按 ``self.last_order_code`` 过滤,避免别的订单 push 错误唤醒当前等待。 + """ + materials = used_materials or [] + try: + result = super().process_order_finish_report(report_request, materials) + except Exception as exc: + logger.error("基类 process_order_finish_report 失败: %s", exc, exc_info=True) + result = {"processed": False, "error": str(exc)} + + try: + data = getattr(report_request, "data", {}) or {} + report_order_code = str(data.get("orderCode") or "") + self.last_order_report = data + expected = self.last_order_code + logger.info( + "[peptide.order_finish] 收到 orderCode=%s 期望=%s status=%s", + report_order_code, + expected, + data.get("status"), + ) + if expected and report_order_code and expected == report_order_code: + self.order_finish_event.set() + logger.info("[peptide.order_finish] orderCode 匹配,已触发 order_finish_event") + elif expected and report_order_code and expected != report_order_code: + logger.warning( + "[peptide.order_finish] orderCode 不匹配,忽略本次 push (期望=%s 实际=%s)", + expected, + report_order_code, + ) + except Exception as exc: # pragma: no cover - 仅为防御 + logger.error("[peptide.order_finish] 触发 event 失败: %s", exc, exc_info=True) + + return result + + @action( + always_free=True, + description="等待订单完成回调并预生成下料表", + handles=[ + ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"), + ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"), + ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="order_code", data_type="str", label="订单编号", data_key="order_code", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="order_finish_status", data_type="str", label="订单完成状态", data_key="order_finish_status", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="order_finish_report", data_type="json", label="订单完成报文", data_key="order_finish_report", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="used_materials", data_type="json", label="使用物料列表", data_key="used_materials", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="material_ids", data_type="json", label="物料ID列表", data_key="material_ids", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="preintake_ids", data_type="json", label="通量ID列表", data_key="preintake_ids", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="unloadTable", data_type="table", label="下料表", data_key="unloadTable", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="unload_summary", data_type="json", label="下料摘要", data_key="unload_summary", data_source=DataSource.EXECUTOR), + ], + ) + def wait_for_order_finish( + self, + order_id: str = "", + order_ids: Optional[List[str]] = None, + order_code: str = "", + timeout_seconds: int = 36000, + poll_mode: bool = False, + **kwargs: Any, + ) -> Dict[str, Any]: + """阻塞等待 LIMS 订单完成回调,并基于 usedMaterials 预生成下料表。 + + - 多订单 ``order_ids`` 时按顺序逐个等;任何一个 ``abnormal_stop`` 立即返回。 + - 节点 1 在此就把 ``unloadTable`` 组装好(前端 manual_confirm 弹窗在节点 2 + 中通过 ``getPreviousNodeResult`` 拿前一个节点 param 渲染)。 + """ + with self._debug_call_session("wait_for_order_finish"): + resolved_order_ids = self._extract_order_ids(order_id=order_id, order_ids=order_ids, **kwargs) + order_code_input = str(order_code or "").strip() + if not resolved_order_ids and not order_code_input: + raise PeptideWorkflowError("wait_for_order_finish 至少需要 order_id/order_ids/order_code 之一") + + material_info_cache: Dict[str, Dict[str, Any]] = {} + missing_material_info: List[str] = [] + unload_rows: List[Dict[str, Any]] = [] + used_materials_total: List[Dict[str, Any]] = [] + material_ids_total: List[str] = [] + preintake_ids_total: List[str] = [] + order_codes_seen: List[str] = [] + last_status: str = "" + last_report: Dict[str, Any] = {} + multi_order = len(resolved_order_ids) > 1 or (resolved_order_ids and order_code_input) + + wait_targets: List[Tuple[str, str]] = [] + if resolved_order_ids: + for oid in resolved_order_ids: + code_for_oid = self._resolve_order_code(oid, fallback=order_code_input if len(resolved_order_ids) == 1 else "") + wait_targets.append((oid, code_for_oid)) + else: + wait_targets.append(("", order_code_input)) + + for oid, code_for_oid in wait_targets: + if not code_for_oid: + raise PeptideWorkflowError( + f"wait_for_order_finish 无法解析 orderCode (order_id={oid!r})" + ) + order_codes_seen.append(code_for_oid) + wait_result = self._wait_single_order_finish(code_for_oid, timeout_seconds, poll_mode=poll_mode) + last_status = wait_result["status"] + last_report = wait_result["report"] or {} + used_materials = self._extract_used_materials(last_report) + used_materials_total.extend(used_materials) + material_ids_total.extend(self._collect_material_ids(used_materials)) + preintake_ids_total.extend(self._collect_preintake_ids(used_materials)) + + rows = self._build_unload_rows( + used_materials, + material_info_cache=material_info_cache, + missing_material_info=missing_material_info, + order_code=code_for_oid if multi_order else None, + ) + unload_rows.extend(rows) + + if last_status == "timeout": + break + if last_status == "abnormal_stop": + break + + unload_table = self._compose_unload_table(unload_rows, multi_order=multi_order) + unload_summary = { + "order_codes": order_codes_seen, + "total_items": len(unload_rows), + "missing_material_info": list(dict.fromkeys(missing_material_info)), + } + primary_order_id = resolved_order_ids[0] if resolved_order_ids else "" + primary_order_code = order_codes_seen[0] if order_codes_seen else order_code_input + + return { + "success": last_status == "success", + "order_id": primary_order_id, + "order_ids": resolved_order_ids, + "order_code": primary_order_code, + "order_codes": order_codes_seen, + "order_finish_status": last_status, + "order_finish_report": last_report, + "used_materials": used_materials_total, + "material_ids": list(dict.fromkeys(material_ids_total)), + "preintake_ids": list(dict.fromkeys(preintake_ids_total)), + "unloadTable": unload_table, + "unload_summary": unload_summary, + } + + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={"materials_unloaded": False, "timeout_seconds": 3600, "assignee_user_ids": []}, + feedback_interval=300, + description="确认人工下料完成后调用 take-out 通知奔耀同步状态", + handles=[ + ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"), + ActionInputHandle(key="material_ids", data_type="json", label="物料ID列表", data_key="material_ids", data_source=DataSource.HANDLE, io_type="source"), + ActionInputHandle(key="preintake_ids", data_type="json", label="通量ID列表", data_key="preintake_ids", data_source=DataSource.HANDLE, io_type="source"), + ActionInputHandle(key="unloadTable", data_type="table", label="下料表", data_key="unloadTable", data_source=DataSource.HANDLE, io_type="source"), + ActionOutputHandle(key="take_out_result", data_type="json", label="取出接口结果", data_key="take_out_result", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="unloaded_count", data_type="int", label="同步物料数量", data_key="unloaded_count", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="success", data_type="bool", label="同步是否成功", data_key="success", data_source=DataSource.EXECUTOR), + ], + ) + def unload_materials( + self, + order_id: str = "", + material_ids: Optional[List[str]] = None, + preintake_ids: Optional[List[str]] = None, + unloadTable: Optional[Dict[str, Any]] = None, + materials_unloaded: bool = False, + **kwargs: Any, + ) -> Dict[str, Any]: + """节点 2:人工下料 manual_confirm 解锁后调用 take-out 通知奔耀同步状态。 + + 时序:操作员物理下料 → 勾选 ``materials_unloaded=True`` → 批准 → + manual_confirm 解除阻塞 → 此处调用 ``take-out`` 让奔耀清空对应库位状态。 + """ + del unloadTable, kwargs # unloadTable 仅供前端弹窗渲染,本节点函数体不消费 + with self._debug_call_session("unload_materials"): + if not bool(materials_unloaded): + raise RuntimeError("下料未确认,拒绝结束节点") + resolved_order_id = str(order_id or "").strip() + if not resolved_order_id: + raise PeptideWorkflowError("unload_materials 缺少 order_id") + material_ids_list = [str(item) for item in (material_ids or []) if item] + preintake_ids_list = [str(item) for item in (preintake_ids or []) if item] + rpc = self._require_hardware_interface() + try: + take_out_result = rpc.take_out( + resolved_order_id, + preintake_ids=preintake_ids_list, + material_ids=material_ids_list, + ) or {} + except Exception as exc: + logger.warning( + "take_out 调用异常 order_id=%s material_ids=%s: %s", + resolved_order_id, + material_ids_list, + exc, + ) + take_out_result = {"code": 0, "message": f"take_out_invoke_failed: {exc}"} + + code_value = take_out_result.get("code") if isinstance(take_out_result, dict) else None + success = bool(isinstance(take_out_result, dict) and code_value == 1) + if not success: + logger.warning( + "take_out 业务失败,未阻塞工作流,请人工核对奔耀库位 order_id=%s response=%s", + resolved_order_id, + take_out_result, + ) + return { + "success": success, + "order_id": resolved_order_id, + "material_ids": material_ids_list, + "preintake_ids": preintake_ids_list, + "unloaded_count": len(material_ids_list), + "take_out_result": take_out_result, + } + @action( always_free=True, goal_default={ - "reset_operations": ["scheduler_reset", "reset_order_status", "reset_location"], + "reset_scheduler": True, + "reset_order_status": True, + "reset_location": True, + "reset_devices": False, }, - description="复位调度器/订单/库位", + description="自动复位调度器/订单状态/库位,可选仪器复位", ) - def reset( + def reset_auto( self, - reset_operations: Optional[ - List[Literal["scheduler_reset", "reset_order_status", "reset_location"]] - ] = None, + reset_scheduler: bool = True, + reset_order_status: bool = True, + reset_location: bool = True, + reset_devices: bool = False, **kwargs: Any, ) -> Dict[str, Any]: - with self._debug_call_session("reset"): - operations = self._normalize_reset_operations(reset_operations) - result: Dict[str, Any] = { - "selected_operations": operations, - "executed_calls": [], - "skipped_operations": [], - } - rpc = self._require_hardware_interface() - for operation in operations: - if operation == "scheduler_reset": - code = rpc.scheduler_reset() - result["executed_calls"].append({"operation": operation, "result": {"code": code}}) - elif operation == "reset_order_status": - resolved = str( - kwargs.get("reset_order_id") or kwargs.get("order_id") or "" - ).strip() - if not resolved: - result["skipped_operations"].append( - {"operation": operation, "reason": "缺少 order_id/reset_order_id"} - ) - continue - code = rpc.reset_order_status(resolved) - result["executed_calls"].append( - {"operation": operation, "order_id": resolved, "result": {"code": code}} - ) - elif operation == "reset_location": - resolved = str( - kwargs.get("reset_location_id") or kwargs.get("location_id") or "" - ).strip() - if not resolved: - result["skipped_operations"].append( - {"operation": operation, "reason": "缺少 location_id/reset_location_id"} - ) - continue - code = rpc.reset_location(resolved) - result["executed_calls"].append( - {"operation": operation, "location_id": resolved, "result": {"code": code}} - ) - else: - raise ValueError(f"未知 reset operation: {operation}") - return result + """自动复位调度器/订单状态/库位,可选仪器复位。 - @action(always_free=True, description="从 Bioyond 同步库存物料到本地资源树") - def sync_from_external(self, **kwargs: Any) -> Dict[str, Any]: + Args: + reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset,默认勾选。 + reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status,默认勾选。 + reset_location[库位复位]: 调用 /api/lims/storage/reset-location,默认勾选。 + reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices,默认不勾选。 + """ del kwargs - with self._debug_call_session("sync_from_external"): - self._require_hardware_interface() - if not getattr(self, "resource_synchronizer", None): - raise RuntimeError("BioyondPeptideStation 未初始化 resource_synchronizer") + with self._debug_call_session("reset_auto"): + return self._execute_reset_operations( + reset_scheduler=bool(reset_scheduler), + reset_order_status=bool(reset_order_status), + reset_location=bool(reset_location), + reset_devices=bool(reset_devices), + ) - success = bool(self.resource_synchronizer.sync_from_external()) - resource_tree_update_requested = self._publish_resource_tree_update() if success else False - return { - "success": success, - "action": "sync_from_external", - "resource_tree_update_requested": resource_tree_update_requested, - "message": "Bioyond 资源同步成功" if success else "Bioyond 资源同步失败或无库存数据", - } + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={ + "reset_scheduler": True, + "reset_order_status": True, + "reset_location": True, + "reset_devices": False, + "physical_cleanup_confirmed": False, + "timeout_seconds": 3600, + "assignee_user_ids": [], + }, + feedback_interval=300, + description=RESET_MANUAL_CONFIRM_MESSAGE, + ) + def reset_manual( + self, + reset_scheduler: bool = True, + reset_order_status: bool = True, + reset_location: bool = True, + reset_devices: bool = False, + physical_cleanup_confirmed: bool = False, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """人工确认物理清理完毕后执行复位。 + + 操作员需先按弹窗提示完成 G3/CEM/Tecan/撕膜机/封膜机/打标机/旋转堆栈/3 个转台 + 等位置的物料清理,并开门检查冰箱/IDOT/酶标仪/离心机/LCMS 内部无遗留,再勾选 + ``physical_cleanup_confirmed``,节点才会真正调用复位接口。 + + Args: + reset_scheduler[调度器复位]: 调用 /api/lims/scheduler/reset,默认勾选。 + reset_order_status[订单状态复位]: 调用 /api/lims/order/reset-order-status,默认勾选。 + reset_location[库位复位]: 调用 /api/lims/storage/reset-location,默认勾选。 + reset_devices[仪器复位]: 调用 /api/lims/device/reset-devices,默认不勾选。 + physical_cleanup_confirmed[物理清理确认]: 确认弹窗中的物料检查已完成,默认不勾选;未勾选时不会调用任何 RPC。 + """ + del kwargs, timeout_seconds, assignee_user_ids + with self._debug_call_session("reset_manual"): + if not bool(physical_cleanup_confirmed): + logger.info("[reset_manual] 物理清理未确认,拒绝执行复位 RPC") + return { + "status": "blocked", + "physical_cleanup_confirmed": False, + "confirmation_message": RESET_MANUAL_CONFIRM_MESSAGE, + "selected_operations": self._build_selected_operations_summary( + reset_scheduler=bool(reset_scheduler), + reset_order_status=bool(reset_order_status), + reset_location=bool(reset_location), + reset_devices=bool(reset_devices), + ), + "executed_calls": [], + "skipped_operations": [ + {"operation": op, "reason": "physical_cleanup_not_confirmed"} + for op in RESET_OPERATION_KEYS + ], + "warnings": ["physical_cleanup_not_confirmed"], + } + payload = self._execute_reset_operations( + reset_scheduler=bool(reset_scheduler), + reset_order_status=bool(reset_order_status), + reset_location=bool(reset_location), + reset_devices=bool(reset_devices), + ) + payload["physical_cleanup_confirmed"] = True + payload["confirmation_message"] = RESET_MANUAL_CONFIRM_MESSAGE + return payload @action(always_free=True, description="启动 Bioyond 调度器") def scheduler_start(self, **kwargs: Any) -> Dict[str, Any]: @@ -1369,44 +1668,172 @@ class BioyondPeptideStation(BioyondWorkstation): "result_list_count": len(self._as_list(raw.get("resultList"))), } - # ---------- 基础设施 ---------- + # ---------- wait_for_order_finish / unload_materials 辅助 ---------- - def _publish_resource_tree_update(self) -> bool: - """触发当前 deck 的资源树更新,让前端看到最新同步结果。""" - ros_node = getattr(self, "_ros_node", None) - if ros_node is None: - logger.warning("资源树更新跳过: 未绑定 _ros_node") - return False + def _wait_single_order_finish( + self, + order_code: str, + timeout_seconds: int, + *, + poll_mode: bool = False, + poll_interval: float = 0.5, + ) -> Dict[str, Any]: + """阻塞等待单个 orderCode 的 LIMS 完成推送,返回 ``{status, report}``. - deck = getattr(self, "deck", None) - if deck is None: - logger.warning("资源树更新跳过: 未绑定 deck") - return False + 与 :class:`BioyondCellWorkstation` 保持相同语义: + - 状态映射 ``"30" -> success`` / ``"-11" -> abnormal_stop`` / + ``"-12" -> manual_stop`` / 其它 ``unknown_``;超时返回 ``timeout``。 + """ + if not order_code: + return {"status": "error", "report": {}, "message": "empty order_code"} + self.last_order_code = order_code + self.last_order_report = None + self.order_finish_event.clear() + timeout_value = max(int(timeout_seconds or 0), 1) + logger.info("[peptide.order_finish] 开始等待 orderCode=%s timeout=%ss poll_mode=%s", order_code, timeout_value, poll_mode) - update_resource = getattr(ros_node, "update_resource", None) - if update_resource is None: - logger.warning("资源树更新跳过: _ros_node 缺少 update_resource") - return False + if poll_mode: + start_time = time.time() + while not self.order_finish_event.is_set(): + if time.time() - start_time > timeout_value: + logger.error("[peptide.order_finish] 等待超时 orderCode=%s", order_code) + return {"status": "timeout", "report": {}, "orderCode": order_code} + time.sleep(poll_interval) + else: + if not self.order_finish_event.wait(timeout=timeout_value): + logger.error("[peptide.order_finish] 等待超时 orderCode=%s", order_code) + return {"status": "timeout", "report": {}, "orderCode": order_code} + report = self.last_order_report or {} + report_code = str(report.get("orderCode") or "") + if report_code and report_code != order_code: + logger.warning("[peptide.order_finish] 报送 orderCode 不匹配 期望=%s 实际=%s", order_code, report_code) + return {"status": "mismatch", "report": report} + status_text = str(report.get("status") or "").strip() + status_map = {"30": "success", "-11": "abnormal_stop", "-12": "manual_stop"} + normalized = status_map.get(status_text, f"unknown_{status_text or 'empty'}") + return {"status": normalized, "report": report} + + def _resolve_order_code(self, order_id: str, fallback: str = "") -> str: + """将 order_id (UUID) 反查为 orderCode。fallback 用于 CLI 调试时直接传 orderCode。""" + order_id_clean = str(order_id or "").strip() + if not order_id_clean: + return fallback.strip() try: - try: - from unilabos.ros.nodes.base_device_node import ROS2DeviceNode # type: ignore - except Exception: # pragma: no cover - ROS2DeviceNode = None # type: ignore[assignment] - - if ROS2DeviceNode is not None and hasattr(ROS2DeviceNode, "run_async_func"): - ROS2DeviceNode.run_async_func(update_resource, True, **{"resources": [deck]}) - else: - update_resource(resources=[deck]) - - logger.info("已调度多肽 deck '%s' 的资源树更新", getattr(deck, "name", "")) - return True - except TypeError as exc: - logger.error("资源树更新失败: update_resource 调用签名错误: %s", exc) - raise + raw = self._require_hardware_interface().order_report(order_id_clean) or {} except Exception as exc: - logger.warning("资源树更新失败: %s", exc) - return False + logger.warning("反查 orderCode 失败 order_id=%s: %s", order_id_clean, exc) + return fallback.strip() + if isinstance(raw, dict): + for key in ("code", "orderCode", "order_code"): + value = raw.get(key) + if value: + return str(value) + return fallback.strip() + + def _extract_used_materials(self, report: Dict[str, Any]) -> List[Dict[str, Any]]: + if not isinstance(report, dict): + return [] + result: List[Dict[str, Any]] = [] + for item in self._as_list(report.get("usedMaterials")): + if isinstance(item, dict): + result.append(item) + return result + + @staticmethod + def _collect_material_ids(used_materials: List[Dict[str, Any]]) -> List[str]: + ids: List[str] = [] + for item in used_materials: + material_id = item.get("materialId") or item.get("MaterialId") or "" + if material_id: + ids.append(str(material_id)) + return ids + + @staticmethod + def _collect_preintake_ids(used_materials: List[Dict[str, Any]]) -> List[str]: + ids: List[str] = [] + for item in used_materials: + preintake_id = item.get("preintakeId") or item.get("preIntakeId") or "" + if preintake_id: + ids.append(str(preintake_id)) + return ids + + def _build_unload_rows( + self, + used_materials: List[Dict[str, Any]], + *, + material_info_cache: Dict[str, Dict[str, Any]], + missing_material_info: List[str], + order_code: Optional[str] = None, + ) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + for material in used_materials: + material_id = str(material.get("materialId") or material.get("MaterialId") or "") + info = self._fetch_material_info_cached(material_id, material_info_cache, missing_material_info) + location = self._first_location(info) + row = { + "whName": str(location.get("whName") or ""), + "posX": self._stringify_coord(location.get("posX")), + "posY": self._stringify_coord(location.get("posY")), + "posZ": self._stringify_coord(location.get("posZ")), + "unit": str(info.get("unit") or location.get("unit") or ""), + "materialName": str(info.get("name") or ""), + "materialId": material_id, + "typeMode": str(material.get("typeMode") or material.get("typemode") or ""), + } + if order_code is not None: + row["orderCode"] = order_code + rows.append(row) + return rows + + def _fetch_material_info_cached( + self, + material_id: str, + cache: Dict[str, Dict[str, Any]], + missing_material_info: List[str], + ) -> Dict[str, Any]: + if not material_id: + return {} + if material_id in cache: + return cache[material_id] + try: + info = self._require_hardware_interface().material_info(material_id) or {} + except Exception as exc: + logger.warning("material_info 查询失败 material_id=%s: %s", material_id, exc) + info = {} + if not isinstance(info, dict) or not info: + missing_material_info.append(material_id) + info = {} + cache[material_id] = info + return info + + def _first_location(self, info: Dict[str, Any]) -> Dict[str, Any]: + if not isinstance(info, dict): + return {} + for location in self._as_list(info.get("locations")): + if isinstance(location, dict): + return location + return {} + + @staticmethod + def _stringify_coord(value: Any) -> str: + if value is None: + return "" + if isinstance(value, float): + if value.is_integer(): + return str(int(value)) + return str(value) + + @staticmethod + def _compose_unload_table(rows: List[Dict[str, Any]], *, multi_order: bool) -> Dict[str, Any]: + columns = UNLOAD_TABLE_COLUMNS_MULTI_ORDER if multi_order else UNLOAD_TABLE_COLUMNS + return { + "data": rows, + "columns": copy.deepcopy(columns), + "tableName": "unloadTable", + } + + # ---------- 基础设施 ---------- def _run_scheduler_action(self, method_name: str, label: str) -> Dict[str, Any]: rpc = self._require_hardware_interface() @@ -1424,32 +1851,105 @@ class BioyondPeptideStation(BioyondWorkstation): return interface @staticmethod - def _normalize_reset_operations(reset_operations: Optional[List[str]]) -> List[str]: - alias_map = { - "scheduler": "scheduler_reset", - "scheduler_reset": "scheduler_reset", - "order": "reset_order_status", - "order_status": "reset_order_status", - "reset_order_status": "reset_order_status", - "location": "reset_location", - "reset_location": "reset_location", + def _build_selected_operations_summary( + *, + reset_scheduler: bool, + reset_order_status: bool, + reset_location: bool, + reset_devices: bool, + ) -> List[Dict[str, Any]]: + flags: Dict[str, bool] = { + "reset_scheduler": bool(reset_scheduler), + "reset_order_status": bool(reset_order_status), + "reset_location": bool(reset_location), + "reset_devices": bool(reset_devices), } - normalized: List[str] = [] - for operation in list(reset_operations or DEFAULT_RESET_OPERATIONS): - canonical = alias_map.get(str(operation).strip()) - if not canonical: - raise ValueError(f"未知 reset operation: {operation}") - if canonical not in normalized: - normalized.append(canonical) - return normalized + return [ + {"key": key, "label": RESET_OPERATION_LABELS[key], "selected": flags[key]} + for key in RESET_OPERATION_KEYS + ] - @staticmethod - def _reset_operation_endpoint(operation: str) -> str: - return { - "scheduler_reset": "/api/lims/scheduler/reset", - "reset_order_status": "/api/lims/order/reset-order-status", - "reset_location": "/api/lims/storage/reset-location", - }.get(operation, "") + def _execute_reset_operations( + self, + *, + reset_scheduler: bool, + reset_order_status: bool, + reset_location: bool, + reset_devices: bool, + ) -> Dict[str, Any]: + """根据 4 个 checkbox 选择顺序调用对应 RPC。 + + - 调用顺序固定为 scheduler → order_status → location → devices; + - 单步失败(``code != 1`` 或 RPC 抛异常)记 warning 但继续执行后续选中的步骤, + 不做 fail-fast,便于操作员在遇到部分故障时仍能完成可恢复的复位。 + """ + rpc = self._require_hardware_interface() + flags: Dict[str, bool] = { + "reset_scheduler": bool(reset_scheduler), + "reset_order_status": bool(reset_order_status), + "reset_location": bool(reset_location), + "reset_devices": bool(reset_devices), + } + selected_operations = self._build_selected_operations_summary( + reset_scheduler=reset_scheduler, + reset_order_status=reset_order_status, + reset_location=reset_location, + reset_devices=reset_devices, + ) + result: Dict[str, Any] = { + "selected_operations": selected_operations, + "executed_calls": [], + "skipped_operations": [], + "warnings": [], + } + + rpc_method_map: Dict[str, str] = { + "reset_scheduler": "scheduler_reset", + "reset_order_status": "reset_order_status", + "reset_location": "reset_location", + "reset_devices": "reset_devices", + } + + for operation in RESET_OPERATION_KEYS: + if not flags[operation]: + result["skipped_operations"].append( + {"operation": operation, "reason": "checkbox_disabled"} + ) + continue + method_name = rpc_method_map[operation] + method = getattr(rpc, method_name, None) + endpoint = RESET_OPERATION_ENDPOINTS[operation] + if not callable(method): + msg = f"RPC 缺少方法: {method_name}" + logger.warning("[reset] %s", msg) + result["executed_calls"].append({ + "operation": operation, + "endpoint": endpoint, + "result": {"code": 0}, + "error": msg, + }) + result["warnings"].append(f"{operation}: {msg}") + continue + try: + code = method() + except Exception as exc: # 单步异常不阻断其余 reset + logger.warning("[reset] %s 调用异常: %s", method_name, exc) + result["executed_calls"].append({ + "operation": operation, + "endpoint": endpoint, + "result": {"code": 0}, + "error": str(exc), + }) + result["warnings"].append(f"{operation}: {exc}") + continue + result["executed_calls"].append({ + "operation": operation, + "endpoint": endpoint, + "result": {"code": code}, + }) + if code != 1: + result["warnings"].append(f"{operation}: rpc_returned_non_one_code={code}") + return result @staticmethod def _extract_order_ids(order_id: str = "", order_ids: Optional[List[str]] = None, **kwargs: Any) -> List[str]: diff --git a/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py b/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py index 12d14038..20a63b46 100644 --- a/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py +++ b/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py @@ -95,7 +95,8 @@ def test_required_actions_exposed() -> None: "submit_experiment_day4", "submit_experiment_day4_LCMS", "start_experiment", - "reset", + "reset_auto", + "reset_manual", "scheduler_start", "scheduler_stop", "scheduler_pause", @@ -112,14 +113,14 @@ def test_required_actions_exposed() -> None: def test_manual_confirm_node_types() -> None: module = _import_module() cls = getattr(module, CLASS_NAME) - manual = {"submit_experiment_day1", "start_experiment"} + manual = {"submit_experiment_day1", "start_experiment", "reset_manual"} normal = { "submit_experiment", "submit_experiment_day2", "submit_experiment_day3", "submit_experiment_day4", "submit_experiment_day4_LCMS", - "reset", + "reset_auto", "scheduler_start", "list_sample_excels", "get_step_parameters", @@ -142,7 +143,7 @@ def test_submit_and_reset_signatures_exclude_legacy_manual_confirm() -> None: "submit_experiment_day3", "submit_experiment_day4", "submit_experiment_day4_LCMS", - "reset", + "reset_auto", ): params = inspect.signature(getattr(cls, name)).parameters assert "timeout_seconds" not in params, name @@ -612,70 +613,579 @@ def test_start_experiment_starts_when_table_empty() -> None: # --------------------------------------------------------------------------- -# 10. Reset +# 10. Reset (plan 2026-05-21: reset_auto + reset_manual 四勾选) # --------------------------------------------------------------------------- -def test_reset_signature_drops_legacy_params_and_uses_literal() -> None: - """plan 调整:删除 dry_run/order_id/location_id;reset_operations 用 Literal 注解。""" +RESET_BOOL_PARAMS = ( + "reset_scheduler", + "reset_order_status", + "reset_location", + "reset_devices", +) + + +def _reset_meta(name: str) -> Dict[str, Any]: cls = getattr(_import_module(), CLASS_NAME) - sig = inspect.signature(cls.reset) - params = sig.parameters - for legacy in ("dry_run", "order_id", "location_id"): - assert legacy not in params, f"reset 不应再有 {legacy} 入参" - assert "reset_operations" in params - assert any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()), \ - "reset 必须保留 **kwargs 以兜底 reset_order_id/reset_location_id" - - annotation = params["reset_operations"].annotation - rendered = annotation if isinstance(annotation, str) else repr(annotation) - for op in ("scheduler_reset", "reset_order_status", "reset_location"): - assert op in rendered, f"reset_operations 的 Literal 必须包含 {op}" + return dict(getattr(getattr(cls, name), "_action_registry_meta", {})) -def test_reset_goal_default_contains_all_operations() -> None: - """像 sirna 一样,goal_default 默认勾选全部三个 reset 操作。""" - cls = getattr(_import_module(), CLASS_NAME) - meta = getattr(cls.reset, "_action_registry_meta", {}) +# --- plan §Tests 1: reset_auto 不是 MANUAL_CONFIRM --- + + +def test_reset_auto_is_not_manual_confirm() -> None: + module = _import_module() + meta = _reset_meta("reset_auto") + assert meta.get("node_type") != module.NodeType.MANUAL_CONFIRM + + +# --- plan §Tests 2: reset_manual 是 MANUAL_CONFIRM --- + + +def test_reset_manual_is_manual_confirm() -> None: + module = _import_module() + meta = _reset_meta("reset_manual") + assert meta.get("node_type") == module.NodeType.MANUAL_CONFIRM + + +# --- plan §Tests 3: reset_manual 关键 metadata --- + + +def test_reset_manual_metadata_shape() -> None: + meta = _reset_meta("reset_manual") + assert meta.get("always_free") is True + assert meta.get("placeholder_keys") == { + "assignee_user_ids": "unilabos_manual_confirm", + } goal_default = meta.get("goal_default") or {} - assert goal_default.get("reset_operations") == [ - "scheduler_reset", - "reset_order_status", - "reset_location", - ] + assert goal_default.get("timeout_seconds") == 3600 + assert goal_default.get("assignee_user_ids") == [] + assert goal_default.get("physical_cleanup_confirmed") is False -def test_reset_executes_typed_rpc_calls() -> None: +# --- plan §Tests 4: 两个 action 都暴露 4 个真实 bool 参数 --- + + +def _resolved_bool_annotation(param: inspect.Parameter) -> Any: + """`from __future__ import annotations` 下注解是字符串;统一解析回真实类型。""" + annotation = param.annotation + if annotation is bool: + return bool + if isinstance(annotation, str): + # 既不是 Annotated[...] 也不是 Optional[...] 的纯 "bool" 字符串 + return bool if annotation.strip() == "bool" else annotation + return annotation + + +def test_reset_actions_expose_four_real_bool_params() -> None: + cls = getattr(_import_module(), CLASS_NAME) + for action_name in ("reset_auto", "reset_manual"): + params = inspect.signature(getattr(cls, action_name)).parameters + for flag in RESET_BOOL_PARAMS: + assert flag in params, f"{action_name} 缺少 {flag}" + resolved = _resolved_bool_annotation(params[flag]) + assert resolved is bool, ( + f"{action_name}.{flag} 必须是裸 bool(不能用 Annotated[bool, Field(...)] 包裹),实际: {params[flag].annotation!r}" + ) + assert params[flag].kind in ( + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.KEYWORD_ONLY, + ), f"{action_name}.{flag} 必须是真实参数,不能藏在 **kwargs" + + +# --- plan §Tests 5: registry 生成的 schema 标记 reset 字段为 boolean --- + + +def test_reset_action_param_annotations_are_bool_for_schema() -> None: + """plan: 当前 AST registry 不 unwrap Annotated;裸 bool 才能映射成 type: boolean。 + + 这里直接检查 type_to_schema 在 Python 类型 ``bool`` 上返回 ``{"type": "boolean"}``, + 再校验 reset_auto/reset_manual 的真实参数注解就是 ``bool``(裸字符串 "bool" 也算), + 从而保证生成的 JSON Schema 一定是 boolean,不会被前端当成 object/string。 + """ + from unilabos.registry.utils import type_to_schema + + assert type_to_schema(bool) == {"type": "boolean"} + + cls = getattr(_import_module(), CLASS_NAME) + for action_name in ("reset_auto", "reset_manual"): + params = inspect.signature(getattr(cls, action_name)).parameters + for flag in RESET_BOOL_PARAMS: + resolved = _resolved_bool_annotation(params[flag]) + assert type_to_schema(resolved) == {"type": "boolean"}, ( + f"{action_name}.{flag} schema 必须是 boolean,实际注解: {params[flag].annotation!r}" + ) + + +# --- plan §Tests 6: reset_auto 替换旧 reset,未保留旧 id-shaped reset 别名 --- + + +def test_legacy_reset_action_removed() -> None: + cls = getattr(_import_module(), CLASS_NAME) + have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)} + assert "reset" not in have, "旧 reset 应被 reset_auto 完全替换,不保留同名别名" + assert "reset_auto" in have + + +# --- plan §Tests 7: goal_default 中前三项 True,reset_devices=False --- + + +def test_reset_goal_defaults_first_three_true_devices_false() -> None: + for action_name in ("reset_auto", "reset_manual"): + meta = _reset_meta(action_name) + goal_default = meta.get("goal_default") or {} + assert goal_default.get("reset_scheduler") is True, action_name + assert goal_default.get("reset_order_status") is True, action_name + assert goal_default.get("reset_location") is True, action_name + assert goal_default.get("reset_devices") is False, action_name + + +# --- plan §Tests 8: reset_manual(physical_cleanup_confirmed=False) 不调任何 RPC --- + + +def test_reset_manual_blocks_when_not_confirmed() -> None: station = _make_station() - station.hardware_interface.scheduler_reset.return_value = 1 - station.hardware_interface.reset_order_status.return_value = 1 - station.hardware_interface.reset_location.return_value = 1 - out = station.reset( - reset_operations=["scheduler_reset", "reset_order_status", "reset_location"], - reset_order_id=ORDER_GUID, - reset_location_id="loc-1", + out = station.reset_manual( + reset_scheduler=True, + reset_order_status=True, + reset_location=True, + reset_devices=True, + physical_cleanup_confirmed=False, ) - station.hardware_interface.scheduler_reset.assert_called_once_with() - station.hardware_interface.reset_order_status.assert_called_once_with(ORDER_GUID) - station.hardware_interface.reset_location.assert_called_once_with("loc-1") - assert out["selected_operations"] == [ - "scheduler_reset", + rpc = station.hardware_interface + rpc.scheduler_reset.assert_not_called() + rpc.reset_order_status.assert_not_called() + rpc.reset_location.assert_not_called() + rpc.reset_devices.assert_not_called() + assert out["status"] == "blocked" + assert out["physical_cleanup_confirmed"] is False + assert "请确认" in out["confirmation_message"] + + +# --- plan §Tests 9: reset_auto() 默认调三件、不调 reset_devices --- + + +def test_reset_auto_defaults_call_three_and_skip_devices() -> None: + station = _make_station() + rpc = station.hardware_interface + rpc.scheduler_reset.return_value = 1 + rpc.reset_order_status.return_value = 1 + rpc.reset_location.return_value = 1 + out = station.reset_auto() + rpc.scheduler_reset.assert_called_once_with() + rpc.reset_order_status.assert_called_once_with() + rpc.reset_location.assert_called_once_with() + rpc.reset_devices.assert_not_called() + + executed_ops = [item["operation"] for item in out["executed_calls"]] + assert executed_ops == ["reset_scheduler", "reset_order_status", "reset_location"] + skipped_ops = {item["operation"] for item in out["skipped_operations"]} + assert skipped_ops == {"reset_devices"} + selected = {item["key"]: item["selected"] for item in out["selected_operations"]} + assert selected == { + "reset_scheduler": True, + "reset_order_status": True, + "reset_location": True, + "reset_devices": False, + } + + +# --- plan §Tests 10: reset_auto(reset_devices=True) 也会调 reset_devices --- + + +def test_reset_auto_with_devices_true_calls_reset_devices() -> None: + station = _make_station() + rpc = station.hardware_interface + rpc.scheduler_reset.return_value = 1 + rpc.reset_order_status.return_value = 1 + rpc.reset_location.return_value = 1 + rpc.reset_devices.return_value = 1 + out = station.reset_auto(reset_devices=True) + rpc.reset_devices.assert_called_once_with() + executed_ops = [item["operation"] for item in out["executed_calls"]] + assert executed_ops == [ + "reset_scheduler", "reset_order_status", "reset_location", + "reset_devices", ] - assert len(out["executed_calls"]) == 3 assert out["skipped_operations"] == [] -def test_reset_skips_when_ids_missing() -> None: - """没有 order_id / location_id 时应该 skip 而不是抛错。""" +def test_reset_auto_individual_checkboxes_drive_calls() -> None: + """更细粒度:单独勾 reset_scheduler 时只调 scheduler_reset。""" station = _make_station() - station.hardware_interface.scheduler_reset.return_value = 1 - out = station.reset( - reset_operations=["scheduler_reset", "reset_order_status", "reset_location"], + rpc = station.hardware_interface + rpc.scheduler_reset.return_value = 1 + out = station.reset_auto( + reset_scheduler=True, + reset_order_status=False, + reset_location=False, + reset_devices=False, ) - station.hardware_interface.scheduler_reset.assert_called_once_with() - station.hardware_interface.reset_order_status.assert_not_called() - station.hardware_interface.reset_location.assert_not_called() - skipped_ops = {item["operation"] for item in out["skipped_operations"]} - assert skipped_ops == {"reset_order_status", "reset_location"} + rpc.scheduler_reset.assert_called_once_with() + rpc.reset_order_status.assert_not_called() + rpc.reset_location.assert_not_called() + rpc.reset_devices.assert_not_called() + skipped = {item["operation"] for item in out["skipped_operations"]} + assert skipped == {"reset_order_status", "reset_location", "reset_devices"} + + +def test_reset_manual_after_confirmation_calls_same_helper() -> None: + """plan §reset_manual 执行规则:勾选确认后等价于 reset_auto。""" + station = _make_station() + rpc = station.hardware_interface + rpc.scheduler_reset.return_value = 1 + rpc.reset_order_status.return_value = 1 + rpc.reset_location.return_value = 1 + rpc.reset_devices.return_value = 1 + out = station.reset_manual( + reset_scheduler=True, + reset_order_status=True, + reset_location=True, + reset_devices=True, + physical_cleanup_confirmed=True, + ) + rpc.scheduler_reset.assert_called_once_with() + rpc.reset_order_status.assert_called_once_with() + rpc.reset_location.assert_called_once_with() + rpc.reset_devices.assert_called_once_with() + assert out["physical_cleanup_confirmed"] is True + assert out["confirmation_message"] + assert [item["operation"] for item in out["executed_calls"]] == [ + "reset_scheduler", + "reset_order_status", + "reset_location", + "reset_devices", + ] + + +# --- plan §Tests 11: RPC 包装层 reset_order_status / reset_location 不发送 data 键 --- + + +def test_rpc_reset_order_status_sends_no_data_key() -> None: + from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC + + rpc = object.__new__(BioyondV1RPC) + rpc.host = "http://test" + rpc.api_key = "k" + rpc._logger = MagicMock() + rpc.post = MagicMock(return_value={"code": 1}) # type: ignore[method-assign] + rpc.get_current_time_iso8601 = MagicMock(return_value="2026-05-21T08:00:00.000Z") # type: ignore[method-assign] + + rpc.reset_order_status("ignored-uuid") + args, kwargs = rpc.post.call_args + sent_params = kwargs.get("params") or (args[1] if len(args) > 1 else {}) + assert "data" not in sent_params, "reset_order_status 不应再发送 data 字段" + assert set(sent_params.keys()) == {"apiKey", "requestTime"} + + +def test_rpc_reset_location_sends_no_data_key() -> None: + from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC + + rpc = object.__new__(BioyondV1RPC) + rpc.host = "http://test" + rpc.api_key = "k" + rpc._logger = MagicMock() + rpc.post = MagicMock(return_value={"code": 1}) # type: ignore[method-assign] + rpc.get_current_time_iso8601 = MagicMock(return_value="2026-05-21T08:00:00.000Z") # type: ignore[method-assign] + + rpc.reset_location("ignored-loc-id") + args, kwargs = rpc.post.call_args + sent_params = kwargs.get("params") or (args[1] if len(args) > 1 else {}) + assert "data" not in sent_params, "reset_location 不应再发送 data 字段" + assert set(sent_params.keys()) == {"apiKey", "requestTime"} + + +# --- plan §Tests 12 + 13: 任何 reset 路径都不调用 take_out / refresh_material_cache --- + + +def test_reset_paths_do_not_call_take_out_or_material_cache() -> None: + station = _make_station() + rpc = station.hardware_interface + rpc.scheduler_reset.return_value = 1 + rpc.reset_order_status.return_value = 1 + rpc.reset_location.return_value = 1 + rpc.reset_devices.return_value = 1 + + station.reset_auto(reset_devices=True) + station.reset_manual(physical_cleanup_confirmed=True, reset_devices=True) + station.reset_manual(physical_cleanup_confirmed=False) + + rpc.take_out.assert_not_called() + refresh = getattr(rpc, "refresh_material_cache", None) + if refresh is not None and hasattr(refresh, "assert_not_called"): + refresh.assert_not_called() + + +# --- 失败/兜底用例:不 fail-fast,单步异常或 code!=1 仅记 warning --- + + +def test_reset_auto_records_warning_when_rpc_returns_non_one() -> None: + station = _make_station() + rpc = station.hardware_interface + rpc.scheduler_reset.return_value = 0 # 业务失败 + rpc.reset_order_status.return_value = 1 + rpc.reset_location.return_value = 1 + out = station.reset_auto() + rpc.scheduler_reset.assert_called_once_with() + rpc.reset_order_status.assert_called_once_with() + rpc.reset_location.assert_called_once_with() + assert any("reset_scheduler" in w for w in out.get("warnings", [])) + + +def test_reset_auto_continues_after_rpc_exception() -> None: + station = _make_station() + rpc = station.hardware_interface + rpc.scheduler_reset.side_effect = RuntimeError("HTTP 500") + rpc.reset_order_status.return_value = 1 + rpc.reset_location.return_value = 1 + out = station.reset_auto() + rpc.reset_order_status.assert_called_once_with() + rpc.reset_location.assert_called_once_with() + error_entries = [item for item in out["executed_calls"] if "error" in item] + assert any(item["operation"] == "reset_scheduler" for item in error_entries) + + +def test_reset_manual_confirmation_message_constant() -> None: + module = _import_module() + msg = module.RESET_MANUAL_CONFIRM_MESSAGE + for keyword in ("G3", "CEM", "Tecan", "撕膜机", "封膜机", "打标机", "旋转堆栈", "转台", "冰箱", "IDOT", "酶标仪", "离心机", "LCMS"): + assert keyword in msg, f"reset_manual 提示文案缺关键字: {keyword}" + meta = _reset_meta("reset_manual") + assert meta.get("description") == msg, "reset_manual 装饰器 description 应等于常量本身" + + +# --------------------------------------------------------------------------- +# 11. wait_for_order_finish + unload_materials(plan 2026-05-20 新增节点) +# --------------------------------------------------------------------------- + + +def _make_station_with_finish_state() -> Any: + """带订单完成事件状态的 station 实例(process_order_finish_report 用)。""" + import threading as _threading + + station = _make_station() + station.order_finish_event = _threading.Event() + station.last_order_code = None + station.last_order_report = None + return station + + +def _make_report_request(order_code: str, status: str = "30", used_materials: List[Dict[str, Any]] | None = None) -> Any: + request = MagicMock() + request.data = { + "orderCode": order_code, + "orderName": f"实验{order_code}", + "startTime": "2026-05-20T10:00:00.000Z", + "endTime": "2026-05-20T11:00:00.000Z", + "status": status, + "usedMaterials": used_materials or [], + } + return request + + +def test_process_order_finish_report_triggers_event_on_match() -> None: + station = _make_station_with_finish_state() + station.last_order_code = "EXP260520-100000" + request = _make_report_request("EXP260520-100000", status="30") + station.process_order_finish_report(request, []) + assert station.order_finish_event.is_set() is True + assert station.last_order_report["orderCode"] == "EXP260520-100000" + + +def test_process_order_finish_report_ignores_mismatched_order_code() -> None: + station = _make_station_with_finish_state() + station.last_order_code = "EXP260520-100000" + request = _make_report_request("EXP260520-OTHER", status="30") + station.process_order_finish_report(request, []) + assert station.order_finish_event.is_set() is False + assert station.last_order_report["orderCode"] == "EXP260520-OTHER" + + +def test_wait_for_order_finish_returns_immediately_when_event_set() -> None: + """事件预先 set + last_order_report 命中时,wait 立即返回 success。""" + station = _make_station_with_finish_state() + used_materials = [ + {"materialId": "mat-1", "locationId": "loc-1", "typeMode": "1", "usedQuantity": 1.0}, + ] + station.hardware_interface.order_report.return_value = {"code": "EXP260520-101010"} + station.hardware_interface.material_info.return_value = { + "name": "样品A", + "unit": "mg", + "locations": [{"whName": "自动化堆栈", "posX": 1, "posY": 2, "posZ": 3, "code": "1-01"}], + } + + pending: Dict[str, Any] = {} + + def _fake_wait(timeout: float = 0.0) -> bool: + pending["report"] = { + "orderCode": station.last_order_code, + "status": "30", + "usedMaterials": used_materials, + } + station.last_order_report = pending["report"] + return True + + station.order_finish_event = MagicMock() + station.order_finish_event.wait = MagicMock(side_effect=_fake_wait) + station.order_finish_event.clear = MagicMock() + station.order_finish_event.set = MagicMock() + station.order_finish_event.is_set = MagicMock(return_value=True) + + out = station.wait_for_order_finish( + order_id="11111111-1111-1111-1111-111111111111", + timeout_seconds=5, + ) + assert out["order_finish_status"] == "success" + assert out["material_ids"] == ["mat-1"] + assert out["preintake_ids"] == [] + + table = out["unloadTable"] + assert table["tableName"] == "unloadTable" + column_keys = [c["key"] for c in table["columns"]] + assert column_keys == ["whName", "posX", "posY", "posZ", "unit", "materialName"] + row = table["data"][0] + assert row["whName"] == "自动化堆栈" + assert row["posX"] == "1" + assert row["posY"] == "2" + assert row["posZ"] == "3" + assert row["unit"] == "mg" + assert row["materialName"] == "样品A" + + +def test_wait_for_order_finish_returns_timeout_status() -> None: + station = _make_station_with_finish_state() + station.hardware_interface.order_report.return_value = {"code": "EXP260520-TIMEOUT"} + station.order_finish_event = MagicMock() + station.order_finish_event.wait = MagicMock(return_value=False) + station.order_finish_event.clear = MagicMock() + station.order_finish_event.is_set = MagicMock(return_value=False) + out = station.wait_for_order_finish( + order_id="22222222-2222-2222-2222-222222222222", + timeout_seconds=1, + ) + assert out["order_finish_status"] == "timeout" + assert out["unloadTable"]["data"] == [] + assert out["unload_summary"]["missing_material_info"] == [] + + +def test_wait_for_order_finish_records_missing_material_info() -> None: + """material-info 失败的物料行字段为空串,并被列入 missing_material_info。""" + station = _make_station_with_finish_state() + used_materials = [ + {"materialId": "mat-good", "typeMode": "1"}, + {"materialId": "mat-bad", "typeMode": "1"}, + ] + + def _material_info(material_id: str) -> Dict[str, Any]: + if material_id == "mat-good": + return { + "name": "好物料", + "unit": "mg", + "locations": [{"whName": "WH-A", "posX": 4, "posY": 5, "posZ": 6}], + } + raise RuntimeError("HTTP 500") + + station.hardware_interface.order_report.return_value = {"code": "EXP260520-MIX"} + station.hardware_interface.material_info.side_effect = _material_info + + def _fake_wait(timeout: float = 0.0) -> bool: + station.last_order_report = { + "orderCode": station.last_order_code, + "status": "30", + "usedMaterials": used_materials, + } + return True + + station.order_finish_event = MagicMock() + station.order_finish_event.wait = MagicMock(side_effect=_fake_wait) + station.order_finish_event.clear = MagicMock() + station.order_finish_event.set = MagicMock() + station.order_finish_event.is_set = MagicMock(return_value=True) + + out = station.wait_for_order_finish( + order_id="33333333-3333-3333-3333-333333333333", + timeout_seconds=1, + ) + rows = out["unloadTable"]["data"] + assert [row["materialId"] for row in rows] == ["mat-good", "mat-bad"] + bad_row = rows[1] + assert bad_row["whName"] == "" + assert bad_row["posX"] == "" + assert bad_row["posY"] == "" + assert bad_row["posZ"] == "" + assert bad_row["unit"] == "" + assert bad_row["materialName"] == "" + assert "mat-bad" in out["unload_summary"]["missing_material_info"] + assert "mat-good" not in out["unload_summary"]["missing_material_info"] + + +def test_unload_materials_blocks_when_not_confirmed() -> None: + station = _make_station() + with pytest.raises(RuntimeError): + station.unload_materials( + order_id=ORDER_GUID, + material_ids=["mat-1"], + preintake_ids=[], + unloadTable={"data": []}, + materials_unloaded=False, + ) + + +def test_unload_materials_calls_take_out_with_resolved_lists() -> None: + station = _make_station() + station.hardware_interface.take_out.return_value = {"code": 1, "message": "ok", "data": {}} + out = station.unload_materials( + order_id=ORDER_GUID, + material_ids=["mat-1", "mat-2"], + preintake_ids=[], + unloadTable={"data": [{"materialId": "mat-1"}, {"materialId": "mat-2"}]}, + materials_unloaded=True, + ) + station.hardware_interface.take_out.assert_called_once_with( + ORDER_GUID, + preintake_ids=[], + material_ids=["mat-1", "mat-2"], + ) + assert out["success"] is True + assert out["unloaded_count"] == 2 + assert out["take_out_result"] == {"code": 1, "message": "ok", "data": {}} + + +def test_unload_materials_does_not_raise_when_take_out_fails() -> None: + """take_out 业务失败时仅 warn 不抛异常(已物理下料,避免阻塞工作流)。""" + station = _make_station() + station.hardware_interface.take_out.return_value = {"code": 0, "message": "已下过线"} + out = station.unload_materials( + order_id=ORDER_GUID, + material_ids=["mat-1"], + preintake_ids=[], + unloadTable={"data": []}, + materials_unloaded=True, + ) + assert out["success"] is False + assert out["take_out_result"] == {"code": 0, "message": "已下过线"} + + +def test_new_actions_registered_on_class() -> None: + cls = getattr(_import_module(), CLASS_NAME) + have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)} + assert "wait_for_order_finish" in have + assert "unload_materials" in have + + module = _import_module() + unload_meta = getattr(cls.unload_materials, "_action_registry_meta", {}) + assert unload_meta.get("node_type") == module.NodeType.MANUAL_CONFIRM + wait_meta = getattr(cls.wait_for_order_finish, "_action_registry_meta", {}) + assert wait_meta.get("node_type") != module.NodeType.MANUAL_CONFIRM + assert wait_meta.get("always_free") is True + + +def test_unload_table_columns_constant_layout() -> None: + module = _import_module() + keys = [c["key"] for c in module.UNLOAD_TABLE_COLUMNS] + assert keys == ["whName", "posX", "posY", "posZ", "unit", "materialName"] + multi_keys = [c["key"] for c in module.UNLOAD_TABLE_COLUMNS_MULTI_ORDER] + assert multi_keys[0] == "orderCode" + assert multi_keys[1:] == keys