From 97ccc38c7f7be3b6c1aa3634cafa47c23b171801 Mon Sep 17 00:00:00 2001 From: "hanhua@dp.tech" <2509856570@qq.com> Date: Mon, 18 May 2026 11:10:20 +0800 Subject: [PATCH] execute plan --- .../peptide_station/peptide_station.py | 1892 +++++++++-------- .../peptide_station/tests/__init__.py | 0 .../tests/test_peptide_station_contracts.py | 642 ++++++ 3 files changed, 1636 insertions(+), 898 deletions(-) create mode 100644 unilabos/devices/workstation/bioyond_studio/peptide_station/tests/__init__.py create mode 100644 unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py diff --git a/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py b/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py index dda3b9a5..e6239e4f 100644 --- a/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py +++ b/unilabos/devices/workstation/bioyond_studio/peptide_station/peptide_station.py @@ -1,4 +1,4 @@ -"""多肽工作站最小脚手架。""" +"""Bioyond 多肽工作站:LIMS 提交/复位/调度与样品 Excel 工作流。""" from __future__ import annotations @@ -11,19 +11,19 @@ import sys from contextlib import nullcontext from datetime import datetime, timezone from pathlib import Path -from typing import Annotated, Any, Dict, Iterable, List, Optional +from typing import Annotated, Any, Dict, Iterable, List, Optional, Tuple from uuid import UUID import requests try: from typing_extensions import TypedDict -except ImportError: # pragma: no cover - 仅用于轻量环境导入 +except ImportError: # pragma: no cover from typing import TypedDict # type: ignore try: from pydantic import Field -except Exception: # pragma: no cover - 仅用于无 pydantic 的轻量环境导入 +except Exception: # pragma: no cover def Field(*args: Any, **kwargs: Any) -> Dict[str, Any]: return kwargs @@ -45,7 +45,7 @@ try: device, ) _REGISTRY_IMPORT_ERROR: Optional[Exception] = None -except Exception as exc: # pragma: no cover - 允许轻量 helper 导入 +except Exception as exc: # pragma: no cover _REGISTRY_IMPORT_ERROR = exc class NodeType: # type: ignore[no-redef] @@ -87,52 +87,129 @@ try: from unilabos.devices.workstation.workstation_base import WorkstationBase from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation _BIOYOND_IMPORT_ERROR: Optional[Exception] = None -except Exception as exc: # pragma: no cover - 允许在轻量探测模式下运行 +except Exception as exc: # pragma: no cover WorkstationBase = object # type: ignore[assignment,misc] BioyondWorkstation = object # type: ignore[assignment,misc] _BIOYOND_IMPORT_ERROR = exc -_PARAMETER_KEY_ALIASES = { - "Type": "type", - "Key": "key", - "Value": "value", - "DisplayValue": "displayValue", - "Name": "name", - "Unit": "unit", - "Options": "options", - "Children": "children", - "Items": "items", -} - +DEBUG_CLI_ENABLED = False DEFAULT_RESET_OPERATIONS = ("scheduler_reset", "reset_order_status", "reset_location") -# Day1 多肽合成工作流已在接口手册中出现,但当前站点不公开 Day1 提交动作。 -DAY1_PEPTIDE_WORKFLOW_NAME = "多肽合成" +RESULT_TABLE_COLUMNS = [ + {"name": "设备", "key": "whName"}, + {"name": "位置", "key": "locationCode"}, + {"name": "物料名称", "key": "materialName"}, + {"name": "数量", "key": "quantity"}, +] +MATERIAL_TYPE_ORDER = ("Sample", "Consumables", "Reagent") +PEPTIDE_SAMPLE_FILE_KEY = "SampleFile" +DAY1_CEM_METHOD_KEY = "CEMMethodFileName" +DAY1_CEM_METHOD_DEFAULT = "5microdouble-20250911.MPM" + +# 绑定信息(最后更新 2026-05-16) +DAY1_PEPTIDE_WORKFLOW_NAME = "Day1线肽合成" DAY2_PEPTIDE_WORKFLOW_NAME = "DAY2多肽定量" DAY3_PEPTIDE_WORKFLOW_NAME = "Day3线肽环化" -DAY4_PEPTIDE_WORKFLOW_NAME = "Day4环肽酰化-酶标+LCMS" -PEPTIDE_SAMPLE_FILE_KEYS = ("SampleFile", "ExcelPath", "excelPath", "sampleFile") +DAY4_PEPTIDE_WORKFLOW_NAME = "Day4环肽酰化-酶标" +DAY4_LCMS_PEPTIDE_WORKFLOW_NAME = "Day4环肽酰化-酶标+LCMS" +DAY4_LCMS_SUB_WORKFLOW_NAME = "Day4环肽酰化-酶标LCMS" + +DAY_WORKFLOW_BINDINGS: Dict[str, Dict[str, str]] = { + "day1": {"root_name": DAY1_PEPTIDE_WORKFLOW_NAME, "sub_name": DAY1_PEPTIDE_WORKFLOW_NAME}, + "day2": {"root_name": DAY2_PEPTIDE_WORKFLOW_NAME, "sub_name": DAY2_PEPTIDE_WORKFLOW_NAME}, + "day3": {"root_name": DAY3_PEPTIDE_WORKFLOW_NAME, "sub_name": DAY3_PEPTIDE_WORKFLOW_NAME}, + "day4": {"root_name": DAY4_PEPTIDE_WORKFLOW_NAME, "sub_name": DAY4_PEPTIDE_WORKFLOW_NAME}, + "day4_lcms": {"root_name": DAY4_LCMS_PEPTIDE_WORKFLOW_NAME, "sub_name": DAY4_LCMS_SUB_WORKFLOW_NAME}, +} class PeptideWorkflowError(RuntimeError): - """多肽工作流可恢复错误:当前动作失败并停止工作流,不退出 UniLabOS edge。""" + """多肽工作流可恢复错误。""" -class PeptideSubmitRequiredParams(TypedDict): - sample_excel_pattern: Annotated[str, Field(description="样品 Excel 文件名匹配模式(必填)。")] - - -class PeptideGenericSubmitRequiredParams(PeptideSubmitRequiredParams): - workflow_name: Annotated[str, Field(description="工作流名称(必填,不填写工作流 ID)")] - - -class PeptideSubmitOptionalParams(TypedDict, total=False): - order_name: Annotated[str, Field(description="订单名称(可选,自动生成)。")] +class PeptideCommonSubmitOptionalParams(TypedDict, total=False): + order_name: Annotated[str, Field(description="订单名称;为空时自动生成,用户可覆盖。")] + auto_register_materials: Annotated[bool, Field(default=True, description="是否自动登记返回的物料信息;默认勾选。本轮仅回传开关,不修改资源树。")] + parameter_overrides: Annotated[ + List[Dict[str, Any]], + Field( + default=[{"m": 0, "n": 0, "Key": "Example", "Value": "example value"}], + description="参数覆盖列表:Key 和 Value 必填,m/n 可选;省略 m/n 时 Key 必须唯一匹配。", + ), + ] border_number: Annotated[int, Field(default=1, description="LIMS 创建订单 borderNumber,默认 1。")] - extend_properties: Annotated[str, Field(description="LIMS extendProperties 字符串。")] - local_excel_path: Annotated[str, Field(description="本地 Excel 文件路径;auto_upload_local_excel=True 时上传。")] - auto_upload_local_excel: Annotated[bool, Field(default=False, description="提交前是否先上传 local_excel_path。")] - parameter_values: Annotated[Dict[str, Any], Field(description="按参数 key 覆盖 TaskDisplayable=1 工作流默认值。")] + extend_properties: Annotated[str, Field(description="LIMS extendProperties 字符串;默认不传或传空。")] + + +class PeptideGenericSubmitRequiredParams(TypedDict): + workflow_name: Annotated[str, Field(description="Bioyond 根工作流名称;用于解析一个非 Day1 子工作流。")] + sample_excel_pattern: Annotated[str, Field(description="样品 Excel 文件名匹配模式。若通过上游句柄提供 sample_excel_relative_path,可留空。")] + + +class PeptideGenericSubmitOptionalParams(PeptideCommonSubmitOptionalParams, total=False): + subworkflow_name: Annotated[str, Field(description="Bioyond 子工作流名称过滤;为空时 workflow_name 下必须只有一个可用子工作流。")] + + +class PeptideDay1RequiredParams(TypedDict): + sample_excel_pattern: Annotated[str, Field(description="样品 Excel 文件名匹配模式。若通过上游句柄提供 sample_excel_relative_path,可留空。")] + cem_method_file_name: Annotated[ + str, + Field(default=DAY1_CEM_METHOD_DEFAULT, description="Day1 CEM 方法文件名称,默认 5microdouble-20250911.MPM。"), + ] + + +class PeptideDay1OptionalParams(PeptideCommonSubmitOptionalParams, total=False): + pass + + +class PeptideDay2RequiredParams(TypedDict): + sample_excel_pattern: Annotated[str, Field(description="样品 Excel 文件名匹配模式。若通过上游句柄提供 sample_excel_relative_path,可留空。")] + + +class PeptideDay2OptionalParams(PeptideCommonSubmitOptionalParams, total=False): + pass + + +class PeptideDay3RequiredParams(PeptideDay2RequiredParams): + pass + + +class PeptideDay3OptionalParams(PeptideCommonSubmitOptionalParams, total=False): + pass + + +class PeptideDay4RequiredParams(PeptideDay2RequiredParams): + pass + + +class PeptideDay4OptionalParams(PeptideCommonSubmitOptionalParams, total=False): + pass + + +class PeptideDay4LCMSRequiredParams(PeptideDay2RequiredParams): + pass + + +class PeptideDay4LCMSOptionalParams(PeptideCommonSubmitOptionalParams, total=False): + pass + + +_SUBMIT_INPUT_HANDLES = [ + ActionInputHandle( + key="sample_excel_relative_path", + data_type="bioyond_sample_file", + label="样品 Excel 相对路径", + data_key="sample_excel_relative_path", + data_source=DataSource.HANDLE, + io_type="source", + ), +] +_SUBMIT_OUTPUT_HANDLES = [ + ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.EXECUTOR), + ActionOutputHandle(key="sample_file", data_type="bioyond_sample_file", label="样品文件", data_key="sample_file", data_source=DataSource.EXECUTOR), +] def _apply_default_peptide_material_type_mappings(config: Dict[str, Any]) -> None: @@ -145,12 +222,10 @@ def _apply_default_peptide_material_type_mappings(config: Dict[str, Any]) -> Non def _utc_now_iso8601_ms() -> str: - """返回与 Bioyond 接口兼容的 UTC 时间戳。""" return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") def load_peptide_config(config_path: str | Path) -> Dict[str, Any]: - """从 JSON 文件读取多肽站配置。""" path = Path(config_path) with path.open("r", encoding="utf-8") as file: return json.load(file) @@ -163,42 +238,25 @@ def fetch_workflow_list( filter_text: str = "", include_detail: bool = True, ) -> Dict[str, Any]: - """调用工作流列表接口。""" + """调试专用:直接 HTTP 拉取工作流列表。运行时应使用 BioyondPeptideStation.fetch_workflow_list。""" + assert DEBUG_CLI_ENABLED, "模块级 fetch_workflow_list 仅供调试;运行时请调用站点实例方法" resolved_config = dict(config or {}) if config_path is not None: resolved_config.update(load_peptide_config(config_path)) - api_host = str(resolved_config.get("api_host", "")).rstrip("/") api_key = str(resolved_config.get("api_key", "")) timeout = int(resolved_config.get("timeout", 10)) - - if not api_host: - raise ValueError("缺少 api_host 配置") - if not api_key: - raise ValueError("缺少 api_key 配置") - + if not api_host or not api_key: + raise ValueError("缺少 api_host/api_key 配置") url = f"{api_host}/api/lims/workflow/work-flow-list" payload = { "apiKey": api_key, "requestTime": _utc_now_iso8601_ms(), - "data": { - "type": workflow_type, - "filter": filter_text, - "includeDetail": include_detail, - }, + "data": {"type": workflow_type, "filter": filter_text, "includeDetail": include_detail}, } - result: Dict[str, Any] = { - "url": url, - "request_payload": payload, - } - + result: Dict[str, Any] = {"url": url, "request_payload": payload} try: - response = requests.post( - url, - json=payload, - timeout=timeout, - headers={"Content-Type": "application/json"}, - ) + response = requests.post(url, json=payload, timeout=timeout, headers={"Content-Type": "application/json"}) result["http_status"] = response.status_code try: result["response"] = response.json() @@ -218,7 +276,7 @@ def fetch_workflow_list( icon="preparation_station.webp", ) class BioyondPeptideStation(BioyondWorkstation): - """多肽工作站占位实现。""" + """多肽 LIMS 工作站。""" _REQUIRED_CONFIG_KEYS = ("api_key", "api_host", "warehouse_mapping") @@ -233,7 +291,6 @@ class BioyondPeptideStation(BioyondWorkstation): ) -> None: if _BIOYOND_IMPORT_ERROR is not None: raise RuntimeError(f"BioyondPeptideStation 基类导入失败: {_BIOYOND_IMPORT_ERROR}") from _BIOYOND_IMPORT_ERROR - kwargs.pop("children", None) merged_config: Dict[str, Any] = {} if config_path is not None: @@ -246,23 +303,13 @@ class BioyondPeptideStation(BioyondWorkstation): merged_config.update(bioyond_config) merged_config.update(kwargs) _apply_default_peptide_material_type_mappings(merged_config) - missing = [k for k in self._REQUIRED_CONFIG_KEYS if not merged_config.get(k)] if missing: raise ValueError(f"BioyondPeptideStation 缺少必要配置: {', '.join(missing)}") - self.protocol_type = protocol_type self.bioyond_config = merged_config - self._created_order_ids: set[str] = set() - self._created_order_codes: set[str] = set() - - logger.info("BioyondPeptideStation 初始化开始") - logger.info(f" - API Host: {self.bioyond_config.get('api_host', '')}") - logger.info(f" - Workflow 映射数量: {len(self.bioyond_config.get('workflow_mappings', {}))}") - super().__init__(bioyond_config=self.bioyond_config, deck=deck) - - logger.info("BioyondPeptideStation 初始化完成") + logger.info("BioyondPeptideStation 初始化完成: %s", self.bioyond_config.get("api_host", "")) def _debug_call_session(self, action_name: str): parent_debug_session = getattr(super(), "_debug_call_session", None) @@ -270,267 +317,340 @@ class BioyondPeptideStation(BioyondWorkstation): return parent_debug_session(action_name) return nullcontext() - @staticmethod def fetch_workflow_list( - config: Optional[Dict[str, Any]] = None, - config_path: Optional[str | Path] = None, + self, workflow_type: int = 0, filter_text: str = "", include_detail: bool = True, ) -> Dict[str, Any]: - """静态辅助方法,便于直接拉取工作流列表。""" - return fetch_workflow_list( - config=config, - config_path=config_path, - workflow_type=workflow_type, - filter_text=filter_text, - include_detail=include_detail, - ) + """运行时通过 RPC 拉取工作流列表。""" + payload = {"type": workflow_type, "filter": filter_text, "includeDetail": include_detail} + data = self._require_hardware_interface().query_workflow(json.dumps(payload, ensure_ascii=False)) + items = self._as_list(data.get("items") if isinstance(data, dict) else data) + return {"items": items, "totalCount": data.get("totalCount") if isinstance(data, dict) else len(items), "raw": data} @action(auto_prefix=True, description="上传多肽样品 Excel 文件") - def upload_sample_excel( - self, - file_path: str, - content_type: Optional[str] = None, - ) -> Dict[str, Any]: - """上传样品 Excel 到 Bioyond LIMS。 - - Args: - file_path: 本地 Excel 文件路径;建议使用完整路径。如果使用相对路径,必须以 `./` 开头。 - content_type: 文件 MIME 类型;为空时根据文件名自动推断。 - """ + def upload_sample_excel(self, file_path: str, content_type: Optional[str] = None) -> Dict[str, Any]: with self._debug_call_session("upload_sample_excel"): - return self._upload_sample_excel_file(file_path, content_type=content_type) + result = self._upload_sample_excel_file(file_path, content_type=content_type) + file_info = result.get("lims_file_info") if isinstance(result.get("lims_file_info"), dict) else {} + return { + "success": True, + "data": file_info, + "relative_path": str(result.get("relative_path") or ""), + "sample_file_parameter": str(result.get("sample_file_parameter") or ""), + "upload_result": result, + } @action( always_free=True, - description="按工作流名称提交多肽实验到 Bioyond LIMS", + description="查询 LIMS 样品 Excel 列表,可选确定性解析", handles=[ ActionOutputHandle( - key="order_id", - data_type="bioyond_order_id", - label="实验ID", - data_key="order_id", - data_source=DataSource.EXECUTOR, - ), - ActionOutputHandle( - key="order_ids", - data_type="bioyond_order_ids", - label="实验ID列表", - data_key="order_ids", - data_source=DataSource.EXECUTOR, - ), - ActionOutputHandle( - key="resultTable", - data_type="table", - label="装载确认表", - data_key="resultTable", - data_source=DataSource.EXECUTOR, - ), - ActionOutputHandle( - key="sample_file", + key="sample_excel_relative_path", data_type="bioyond_sample_file", - label="样品文件", - data_key="sample_file", + label="样品 Excel 相对路径", + data_key="sample_excel_relative_path", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="sample_excel_data", + data_type="json", + label="样品 Excel 列表", + data_key="sample_excel_data", data_source=DataSource.EXECUTOR, ), ], ) + def list_sample_excels( + self, + begin_date: str = "", + end_date: str = "", + name_filter: str = "", + sample_excel_pattern: str = "", + deterministic_resolve: bool = False, + ) -> Dict[str, Any]: + with self._debug_call_session("list_sample_excels"): + records = self._list_sample_excels( + name_filter=name_filter or sample_excel_pattern.replace("*", ""), + begin_date=begin_date or None, + end_date=end_date or None, + ) + payload: Dict[str, Any] = {"success": True, "sample_excel_data": records} + if deterministic_resolve: + pattern = sample_excel_pattern or name_filter + if not pattern: + raise PeptideWorkflowError("确定性解析模式需要 sample_excel_pattern 或 name_filter") + selected = self._select_sample_excel_record(records, pattern) + payload["sample_excel_relative_path"] = str(selected.get("relativePath") or "").replace("/", "\\") + payload["selected_sample_excel"] = selected + return payload + + @action( + always_free=True, + description="查询子工作流步骤参数(支持必填/可选/隐藏过滤)", + handles=[ + ActionOutputHandle( + key="step_parameters_raw_json", + data_type="json", + label="步骤参数 JSON", + data_key="step_parameters_raw_json", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="filtered_subworkflows", + data_type="json", + label="匹配子工作流", + data_key="filtered_subworkflows", + data_source=DataSource.EXECUTOR, + ), + ], + ) + def get_step_parameters( + self, + sub_workflow_id: str = "", + workflow_name_filter: str = "", + subworkflow_name_filter: str = "", + required_para: bool = True, + optional_parameter: bool = True, + hidden_para: bool = False, + ) -> Dict[str, Any]: + with self._debug_call_session("get_step_parameters"): + if sub_workflow_id.strip(): + step_data = self._query_step_parameters(sub_workflow_id.strip()) + flattened = self._flatten_step_parameters(step_data) + filtered = self._filter_step_parameter_records(flattened, required_para, optional_parameter, hidden_para) + augmented = { + "subworkflowId": sub_workflow_id.strip(), + "code": 1, + "data": {"filteredParameters": filtered, "raw": step_data}, + } + return {"step_parameters_raw_json": augmented, "filtered_subworkflows": []} + + bindings = self._filter_workflow_records( + self._query_workflow_records(workflow_name_filter), + workflow_name_filter=workflow_name_filter, + subworkflow_name_filter=subworkflow_name_filter, + ) + if len(bindings) != 1: + message = f"匹配到 {len(bindings)} 个子工作流,请收窄 workflow_name_filter/subworkflow_name_filter" + status = { + "code": 0 if bindings else -1, + "message": message, + "data": {"matches": bindings}, + "matches": len(bindings), + } + return {"step_parameters_raw_json": status, "filtered_subworkflows": bindings} + + binding = bindings[0] + step_data = self._query_step_parameters(binding["subworkflowId"]) + flattened = self._flatten_step_parameters(step_data) + filtered = self._filter_step_parameter_records(flattened, required_para, optional_parameter, hidden_para) + augmented = { + "workflowId": binding.get("workflowId"), + "workflowName": binding.get("workflowName"), + "subworkflowId": binding.get("subworkflowId"), + "subworkflowName": binding.get("subworkflowName"), + "code": 1, + "data": {"filteredParameters": filtered, "rawData": step_data}, + } + return {"step_parameters_raw_json": augmented, "filtered_subworkflows": bindings} + + @action(always_free=True, description="按工作流名称提交多肽实验(非 Day1)", handles=_SUBMIT_INPUT_HANDLES + _SUBMIT_OUTPUT_HANDLES) def submit_experiment( self, required_params: PeptideGenericSubmitRequiredParams, - optional_params: Optional[PeptideSubmitOptionalParams] = None, - timeout_seconds: int = 3600, - assignee_user_ids: Optional[List[str]] = None, - **kwargs: Any, + optional_params: Optional[PeptideGenericSubmitOptionalParams] = None, + sample_excel_relative_path: str = "", ) -> Dict[str, Any]: - """通用多肽提交入口。 - - Args: - required_params: 必填参数组,包含 workflow_name 和 sample_excel_pattern。 - optional_params: 可选参数组;parameter_values 可按参数 key 覆盖工作流默认值。 - timeout_seconds: 传递给后续手动确认动作的超时时间。 - assignee_user_ids: 传递给后续手动确认动作的用户 ID 列表。 - """ - del kwargs return self._submit_experiment_core( + day_key=None, required_params=required_params, optional_params=optional_params, - default_workflow_name="", - timeout_seconds=timeout_seconds, - assignee_user_ids=assignee_user_ids, + sample_excel_relative_path=sample_excel_relative_path, + generic=True, ) - @action( - always_free=True, - description="提交多肽 Day2 定量实验到 Bioyond LIMS", - handles=[ - ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="sample_file", data_type="bioyond_sample_file", label="样品文件", data_key="sample_file", data_source=DataSource.EXECUTOR), - ], - ) + @action(always_free=True, description="提交 Day2 多肽定量实验", handles=_SUBMIT_INPUT_HANDLES + _SUBMIT_OUTPUT_HANDLES) def submit_experiment_day2( self, - required_params: PeptideSubmitRequiredParams, - optional_params: Optional[PeptideSubmitOptionalParams] = None, - timeout_seconds: int = 3600, - assignee_user_ids: Optional[List[str]] = None, - **kwargs: Any, + required_params: PeptideDay2RequiredParams, + optional_params: Optional[PeptideDay2OptionalParams] = None, + sample_excel_relative_path: str = "", ) -> Dict[str, Any]: - """提交 Day2,工作流名称由站点封装。""" - del kwargs - return self._submit_experiment_core( - required_params=required_params, - optional_params=optional_params, - default_workflow_name=DAY2_PEPTIDE_WORKFLOW_NAME, - timeout_seconds=timeout_seconds, - assignee_user_ids=assignee_user_ids, - ) + return self._submit_experiment_core("day2", required_params, optional_params, sample_excel_relative_path) - @action( - always_free=True, - description="提交多肽 Day3 线肽环化实验到 Bioyond LIMS", - handles=[ - ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="sample_file", data_type="bioyond_sample_file", label="样品文件", data_key="sample_file", data_source=DataSource.EXECUTOR), - ], - ) + @action(always_free=True, description="提交 Day3 线肽环化实验", handles=_SUBMIT_INPUT_HANDLES + _SUBMIT_OUTPUT_HANDLES) def submit_experiment_day3( self, - required_params: PeptideSubmitRequiredParams, - optional_params: Optional[PeptideSubmitOptionalParams] = None, - timeout_seconds: int = 3600, - assignee_user_ids: Optional[List[str]] = None, - **kwargs: Any, + required_params: PeptideDay3RequiredParams, + optional_params: Optional[PeptideDay3OptionalParams] = None, + sample_excel_relative_path: str = "", ) -> Dict[str, Any]: - """提交 Day3,工作流名称由站点封装。""" - del kwargs - return self._submit_experiment_core( - required_params=required_params, - optional_params=optional_params, - default_workflow_name=DAY3_PEPTIDE_WORKFLOW_NAME, - timeout_seconds=timeout_seconds, - assignee_user_ids=assignee_user_ids, - ) + return self._submit_experiment_core("day3", required_params, optional_params, sample_excel_relative_path) + + @action(always_free=True, description="提交 Day4 环肽酰化(酶标)实验", handles=_SUBMIT_INPUT_HANDLES + _SUBMIT_OUTPUT_HANDLES) + def submit_experiment_day4( + self, + required_params: PeptideDay4RequiredParams, + optional_params: Optional[PeptideDay4OptionalParams] = None, + sample_excel_relative_path: str = "", + ) -> Dict[str, Any]: + return self._submit_experiment_core("day4", required_params, optional_params, sample_excel_relative_path) + + @action(always_free=True, description="提交 Day4 环肽酰化 LCMS 实验", handles=_SUBMIT_INPUT_HANDLES + _SUBMIT_OUTPUT_HANDLES) + def submit_experiment_day4_LCMS( + self, + required_params: PeptideDay4LCMSRequiredParams, + optional_params: Optional[PeptideDay4LCMSOptionalParams] = None, + sample_excel_relative_path: str = "", + ) -> Dict[str, Any]: + return self._submit_experiment_core("day4_lcms", required_params, optional_params, sample_excel_relative_path) @action( always_free=True, - description="提交多肽 Day4 环肽酰化实验到 Bioyond LIMS", + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={"materials_loaded": False, "timeout_seconds": 3600, "assignee_user_ids": []}, + feedback_interval=300, + description="Day1 线肽合成提交占位(暂不创建订单)", handles=[ - ActionOutputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="resultTable", data_type="table", label="装载确认表", data_key="resultTable", data_source=DataSource.EXECUTOR), - ActionOutputHandle(key="sample_file", data_type="bioyond_sample_file", label="样品文件", data_key="sample_file", data_source=DataSource.EXECUTOR), + ActionInputHandle( + key="sample_excel_relative_path", + data_type="bioyond_sample_file", + label="样品 Excel", + data_key="sample_excel_relative_path", + data_source=DataSource.HANDLE, + io_type="source", + ), ], ) - def submit_experiment_day4( + def submit_experiment_day1( self, - required_params: PeptideSubmitRequiredParams, - optional_params: Optional[PeptideSubmitOptionalParams] = None, - timeout_seconds: int = 3600, - assignee_user_ids: Optional[List[str]] = None, + required_params: PeptideDay1RequiredParams, + optional_params: Optional[PeptideDay1OptionalParams] = None, + sample_excel_relative_path: str = "", **kwargs: Any, ) -> Dict[str, Any]: - """提交 Day4 默认酶标+LCMS 工作流;Day4 运行效果仍需现场验证。""" + # TODO: Day1 订单创建待 API 现场验证后再接入 create_order;目前只回显占位结构。 del kwargs - return self._submit_experiment_core( - required_params=required_params, - optional_params=optional_params, - default_workflow_name=DAY4_PEPTIDE_WORKFLOW_NAME, - timeout_seconds=timeout_seconds, - assignee_user_ids=assignee_user_ids, + optional = dict(optional_params or {}) + sample_file, selected = self._resolve_submit_sample_file( + required_params, + optional, + sample_excel_relative_path, ) + cem_method = str(required_params.get("cem_method_file_name") or DAY1_CEM_METHOD_DEFAULT).strip() or DAY1_CEM_METHOD_DEFAULT + partial_entries, override_warnings = self._build_partial_parameter_entries( + sample_excel_relative_path=sample_file, + day_key="day1", + parameter_overrides=optional.get("parameter_overrides"), + extra_autofill=[{"Key": DAY1_CEM_METHOD_KEY, "Value": cem_method}], + ) + binding = self._resolve_workflow_binding("day1") + return { + "success": True, + "status": "manual_confirm_placeholder", + "message": "Day1 订单创建暂未启用,请人工确认样品与方法文件后继续下游节点。", + "workflow": binding, + "sample_file": sample_file, + "selected_sample_excel": selected, + "partial_parameter_entries": partial_entries, + "cem_method_file_name": cem_method, + "auto_register_materials": bool(optional.get("auto_register_materials", True)), + "warnings": override_warnings, + } def _submit_experiment_core( self, - *, + day_key: Optional[str], required_params: Dict[str, Any], - optional_params: Optional[PeptideSubmitOptionalParams] = None, - default_workflow_name: str = "", - timeout_seconds: int = 3600, - assignee_user_ids: Optional[List[str]] = None, + optional_params: Optional[Dict[str, Any]], + sample_excel_relative_path: str = "", + *, + generic: bool = False, ) -> Dict[str, Any]: - """多肽提交共享实现:Excel -> 工作流参数 -> LIMS 创建订单。""" optional = dict(optional_params or {}) - workflow_name = str(required_params.get("workflow_name") or default_workflow_name or "").strip() - if not workflow_name: - raise PeptideWorkflowError("提交实验必须提供 workflow_name(工作流名称)") - - action_name = "submit_experiment" if not default_workflow_name else f"submit_{workflow_name}" + warnings: List[str] = [] + action_name = "submit_experiment" if generic else f"submit_experiment_{day_key}" with self._debug_call_session(action_name): - sample_file, selected_sample_excel = self._resolve_submit_sample_file(required_params, optional) - workflow = self._resolve_workflow_by_name(workflow_name) - sub_workflow_id = workflow["sub_workflow_id"] - step_data = self._workflow_step_data(sub_workflow_id) - raw_parameters = self._extract_workflow_parameters(step_data) - if not self._looks_like_step_parameter_map(raw_parameters): - raise PeptideWorkflowError(f"工作流 {workflow_name} 未返回可用步骤参数,无法创建订单") + if generic: + workflow_name = str(required_params.get("workflow_name") or "").strip() + if not workflow_name: + raise PeptideWorkflowError("submit_experiment 必须提供 workflow_name") + if workflow_name == DAY1_PEPTIDE_WORKFLOW_NAME or "day1" in workflow_name.lower(): + raise PeptideWorkflowError("Day1 请使用 submit_experiment_day1;通用提交暂不支持 Day1 线肽合成") + subworkflow_name = str(optional.get("subworkflow_name") or "").strip() + binding = self._resolve_workflow_binding_from_names(workflow_name, subworkflow_name) + else: + binding = self._resolve_workflow_binding(day_key or "") - param_values = self._build_param_values( - raw_parameters, - sample_file=sample_file, - parameter_overrides=optional.get("parameter_values") or {}, + sample_file, selected = self._resolve_submit_sample_file(required_params, optional, sample_excel_relative_path) + partial_entries, override_warnings = self._build_partial_parameter_entries( + sample_excel_relative_path=sample_file, + day_key=day_key, + parameter_overrides=optional.get("parameter_overrides"), ) - order_code, order_name = self._build_order_identity(workflow_name=workflow_name, order_name=optional.get("order_name")) - order_payload = [ - { - "orderCode": order_code, - "orderName": order_name, - "borderNumber": int(optional.get("border_number") or 1), - "workFlowId": self._require_uuid(sub_workflow_id, "workFlowId"), - "paramValues": self._normalize_param_values(param_values), - } - ] - extend_properties = optional.get("extend_properties") - if extend_properties not in (None, ""): - order_payload[0]["extendProperties"] = str(extend_properties) + warnings.extend(override_warnings) - create_order_result = self._create_order(order_payload) - parsed_result = self._parse_result(create_order_result) - order_ids = self._extract_order_ids_from_result(parsed_result) + step_data = self._query_step_parameters(binding["sub_workflow_id"]) + flattened = self._flatten_step_parameters(step_data) + # 未来校验可能基于 TaskDisplayable/Value/DisplayValue 分类(见 get_step_parameters)。 + resolved_entries = self._resolve_parameter_entries_against_live_steps(partial_entries, flattened) + param_values = self._group_resolved_entries_to_param_values(resolved_entries) + order_code, order_name = self._build_order_identity(day_key or "generic", optional.get("order_name")) + order_payload = self._create_order_payload( + order_code=order_code, + order_name=order_name, + sub_workflow_id=binding["sub_workflow_id"], + param_values=param_values, + border_number=int(optional.get("border_number") or 1), + extend_properties=optional.get("extend_properties"), + ) + create_order_raw = self._create_order(order_payload) + allocation = self._parse_create_order_allocation_map(create_order_raw) + order_ids = allocation["order_ids"] order_id = order_ids[0] if order_ids else "" - self._created_order_ids.update(order_ids) - self._created_order_codes.add(order_code) - result_table = self._build_result_table(parsed_result) - start_experiment_info = { - "order_id": order_id, - "order_ids": order_ids, - "resultTable": result_table, - "materials_loaded": False, - "timeout_seconds": timeout_seconds, - "assignee_user_ids": list(assignee_user_ids or []), - } + if not allocation["allocation_rows"]: + warnings.append("create_order_allocation_unavailable_for_result_table") + result_table = self._build_result_table(allocation["materials_by_type"]) + auto_register = bool(optional.get("auto_register_materials", True)) + material_registration = ( + {"requested": True, "status": "not_implemented"} if auto_register else {"requested": False, "status": "skipped"} + ) return { "success": bool(order_ids), "order_id": order_id, "order_ids": order_ids, "order_code": order_code, "order_name": order_name, - "workflow": workflow, + "workflow": binding, + "sub_workflow_id": binding["sub_workflow_id"], "sample_file": sample_file, - "selected_sample_excel": selected_sample_excel, - "payload": order_payload, - "create_order_result": parsed_result, + "selected_sample_excel": selected, + "payload_summary": {"borderNumber": int(optional.get("border_number") or 1), "orderCode": order_code}, + "create_order_data_raw": create_order_raw, + "allocation_map": allocation["allocation_map"], + "allocation_rows": allocation["allocation_rows"], "resultTable": result_table, - "start_experiment": start_experiment_info, - "confirmation_message": "请按 resultTable 完成多肽物料装载后调用 start_experiment。", + "start_experiment": { + "order_id": order_id, + "order_ids": order_ids, + "resultTable": result_table, + "materials_loaded": False, + }, + "auto_register_materials": auto_register, + "material_registration": material_registration, + "warnings": warnings, } @action( always_free=True, node_type=NodeType.MANUAL_CONFIRM, placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, - goal_default={ - "materials_loaded": False, - "timeout_seconds": 3600, - "assignee_user_ids": [], - }, + goal_default={"materials_loaded": False, "timeout_seconds": 3600, "assignee_user_ids": []}, feedback_interval=300, - description="请核对并装载多肽物料;确认后启动 Bioyond 调度器", + description="确认物料装载后启动调度器", handles=[ ActionInputHandle(key="order_id", data_type="bioyond_order_id", label="实验ID", data_key="order_id", data_source=DataSource.HANDLE, io_type="source"), ActionInputHandle(key="order_ids", data_type="bioyond_order_ids", label="实验ID列表", data_key="order_ids", data_source=DataSource.HANDLE, io_type="source"), @@ -543,16 +663,12 @@ class BioyondPeptideStation(BioyondWorkstation): order_ids: Optional[List[str]] = None, resultTable: Optional[Dict[str, Any]] = None, materials_loaded: bool = False, - timeout_seconds: int = 3600, - assignee_user_ids: Optional[List[str]] = None, **kwargs: Any, ) -> Dict[str, Any]: - """手动装载确认后启动 LIMS 调度器。""" - del timeout_seconds, assignee_user_ids with self._debug_call_session("start_experiment"): resolved_order_ids = self._extract_order_ids(order_id=order_id, order_ids=order_ids, **kwargs) table_rows = resultTable.get("data") if isinstance(resultTable, dict) else [] - if table_rows and not bool(materials_loaded): + if table_rows and not materials_loaded: raise RuntimeError("多肽物料装载未确认,拒绝启动调度器") result = self._run_scheduler_action("scheduler_start", "启动") result["order_ids"] = resolved_order_ids @@ -560,19 +676,18 @@ class BioyondPeptideStation(BioyondWorkstation): result["resultTable"] = resultTable or {} return result - @action(always_free=True, description="复位多肽实验前状态") + @action(always_free=True, description="复位调度器/订单/库位") def reset( self, reset_operations: Optional[List[str]] = None, - dry_run: bool = True, + dry_run: bool = False, order_id: str = "", location_id: str = "", **kwargs: Any, ) -> Dict[str, Any]: - """按显式操作列表复位调度器、订单状态或库位。""" with self._debug_call_session("reset"): operations = self._normalize_reset_operations(reset_operations) - planned = [{"operation": operation, "endpoint": self._reset_operation_endpoint(operation)} for operation in operations] + planned = [{"operation": op, "endpoint": self._reset_operation_endpoint(op)} for op in operations] result: Dict[str, Any] = {"dry_run": bool(dry_run), "planned_calls": planned, "executed_calls": [], "skipped_operations": []} if dry_run: return result @@ -582,23 +697,593 @@ class BioyondPeptideStation(BioyondWorkstation): code = rpc.scheduler_reset() result["executed_calls"].append({"operation": operation, "result": {"code": code}}) elif operation == "reset_order_status": - resolved_order_id = str(kwargs.get("reset_order_id") or order_id or kwargs.get("order_id") or "").strip() - if not resolved_order_id: + resolved = str(kwargs.get("reset_order_id") or order_id or kwargs.get("order_id") or "").strip() + if not resolved: result["skipped_operations"].append({"operation": operation, "reason": "缺少 order_id/reset_order_id"}) continue - code = rpc.reset_order_status(resolved_order_id) - result["executed_calls"].append({"operation": operation, "order_id": resolved_order_id, "result": {"code": code}}) + code = rpc.reset_order_status(resolved) + result["executed_calls"].append({"operation": operation, "order_id": resolved, "result": {"code": code}}) elif operation == "reset_location": - resolved_location_id = str(kwargs.get("reset_location_id") or location_id or kwargs.get("location_id") or "").strip() - if not resolved_location_id: + resolved = str(kwargs.get("reset_location_id") or location_id or kwargs.get("location_id") or "").strip() + if not resolved: result["skipped_operations"].append({"operation": operation, "reason": "缺少 location_id/reset_location_id"}) continue - code = rpc.reset_location(resolved_location_id) - result["executed_calls"].append({"operation": operation, "location_id": resolved_location_id, "result": {"code": code}}) + code = rpc.reset_location(resolved) + result["executed_calls"].append({"operation": operation, "location_id": resolved, "result": {"code": code}}) else: raise ValueError(f"未知 reset operation: {operation}") return result + @action(always_free=True, description="启动 Bioyond 调度器") + def scheduler_start(self, **kwargs: Any) -> Dict[str, Any]: + del kwargs + return self._run_scheduler_action("scheduler_start", "启动") + + @action(always_free=True, description="停止 Bioyond 调度器") + def scheduler_stop(self, **kwargs: Any) -> Dict[str, Any]: + del kwargs + return self._run_scheduler_action("scheduler_stop", "停止") + + @action(always_free=True, description="暂停 Bioyond 调度器") + def scheduler_pause(self, **kwargs: Any) -> Dict[str, Any]: + del kwargs + return self._run_scheduler_action("scheduler_pause", "暂停") + + @action(always_free=True, description="继续 Bioyond 调度器") + def scheduler_continue(self, **kwargs: Any) -> Dict[str, Any]: + del kwargs + return self._run_scheduler_action("scheduler_continue", "继续") + + @action(always_free=True, description="查询 LIMS 订单列表") + def get_order_list( + self, + time_type: str = "", + begin_time: Any = None, + end_time: Any = None, + status: str = "", + filter_text: str = "", + skip_count: int = 0, + page_count: int = 20, + sorting: str = "", + ) -> Dict[str, Any]: + params = self._normalize_order_list_params( + { + "timeType": time_type, + "beginTime": begin_time, + "endTime": end_time, + "status": status, + "filter": filter_text, + "skipCount": skip_count, + "pageCount": page_count, + "sorting": sorting, + } + ) + with self._debug_call_session("get_order_list"): + raw = self._require_hardware_interface().order_query(json.dumps(params, ensure_ascii=False)) + items = self._as_list(raw.get("items") if isinstance(raw, dict) else raw) + return {"success": True, "raw": raw, "items": items, "total_count": raw.get("totalCount") if isinstance(raw, dict) else len(items)} + + @action(always_free=True, description="查询单订单实验报告") + def get_order_report(self, order_id: str) -> Dict[str, Any]: + resolved = self._require_uuid(order_id, "order_id") + with self._debug_call_session("get_order_report"): + raw = self._require_hardware_interface().order_report(resolved) + return {"success": True, "order_id": resolved, "raw": raw, "summary": self._normalize_order_report(raw)} + + @action(always_free=True, description="聚合订单报告(占位)") + def get_aggregated_order_report(self, order_id: str) -> Dict[str, Any]: + # TODO: 待多肽侧确认聚合需求后再实现。 + # Sirna 风格聚合通常组合以下接口: + # - /api/lims/order/order-report + # - /api/lims/order/order-list (order-query) + # - /api/lims/order/gantt-with-simulation-by-order-id + # - /api/lims/order/gantts-by-order-id + # - /api/lims/storage/material-info + resolved = self._require_uuid(order_id, "order_id") + return { + "success": False, + "status": "not_implemented", + "order_id": resolved, + "message": "聚合报告尚未实现,请使用 get_order_report。", + } + + # ---------- 样品 Excel ---------- + + def _resolve_submit_sample_file( + self, + required_params: Dict[str, Any], + optional_params: Dict[str, Any], + sample_excel_relative_path: str = "", + ) -> Tuple[str, Dict[str, Any]]: + del optional_params # local upload 已在 plan 移除范围内 + direct = str(sample_excel_relative_path or "").strip() + if direct: + return direct.replace("/", "\\"), {"relativePath": direct, "fileName": Path(direct).name} + pattern = str(required_params.get("sample_excel_pattern") or "").strip() + if not pattern: + raise PeptideWorkflowError("必须提供 sample_excel_relative_path 或 sample_excel_pattern") + selected = self._select_sample_excel_record( + self._list_sample_excels(name_filter=pattern.replace("*", "")), + pattern, + ) + relative = str(selected.get("relativePath") or "").replace("/", "\\") + if not relative: + raise PeptideWorkflowError(f"样品 Excel 记录缺少 relativePath: {selected}") + return relative, selected + + def _select_sample_excel_record(self, records: List[Dict[str, Any]], pattern: str) -> Dict[str, Any]: + matched = [ + record + for record in records + if self._filename_matches_pattern(str(record.get("fileName") or ""), pattern) + or self._filename_matches_pattern(str(record.get("relativePath") or ""), pattern) + ] + if not matched: + raise PeptideWorkflowError(f"未找到匹配 {pattern!r} 的样品 Excel") + if len(matched) > 1: + names = ", ".join(str(item.get("fileName") or "") for item in matched) + raise PeptideWorkflowError(f"找到多个匹配 {pattern!r} 的样品 Excel: {names}") + return matched[0] + + def _list_sample_excels(self, name_filter: str = "", begin_date: Any = None, end_date: Any = None) -> List[Dict[str, Any]]: + rpc = self._require_hardware_interface() + payload = {"beginDate": begin_date, "endDate": end_date, "nameFilter": name_filter or None} + response = rpc.post( + url=f"{rpc.host}/api/lims/order/sample-info-excels", + params={"apiKey": rpc.api_key, "requestTime": _utc_now_iso8601_ms(), "data": payload}, + ) + if not response or response.get("code") != 1: + raise RuntimeError(f"样品 Excel 列表查询失败: {response}") + data = response.get("data") + return data if isinstance(data, list) else [] + + def _upload_sample_excel_file(self, local_excel_path: str | Path, content_type: Optional[str] = None) -> Dict[str, Any]: + api_host = str(self.bioyond_config.get("api_host", "")).rstrip("/") + timeout = int(self.bioyond_config.get("timeout", 30) or 30) + local_path = Path(str(local_excel_path).strip()).expanduser() + if not local_path.is_absolute() and not str(local_excel_path).startswith("./"): + raise ValueError("样品 Excel 路径请使用完整路径;相对路径必须以 ./ 开头") + if not local_path.exists() or not local_path.is_file(): + raise FileNotFoundError(f"样品 Excel 不存在: {local_path}") + resolved_content_type = ( + content_type + or mimetypes.guess_type(local_path.name)[0] + or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + ) + with local_path.open("rb") as file: + response = requests.post( + f"{api_host}/api/lims/order/up-load-sample-file", + files={"file": (local_path.name, file, resolved_content_type)}, + timeout=timeout, + ) + try: + body: Any = response.json() + except ValueError: + body = {"raw_text": response.text} + if response.status_code >= 400 or not isinstance(body, dict) or body.get("code") != 1: + raise RuntimeError(f"样品 Excel 上传失败: status={response.status_code}, body={body}") + file_info = body.get("data") if isinstance(body.get("data"), dict) else {} + remote = str(file_info.get("filePath") or "") + return { + "success": True, + "lims_file_info": file_info, + "relative_path": remote, + "sample_file_parameter": remote.replace("/", "\\") if remote else "", + "response": body, + } + + @staticmethod + def _filename_matches_pattern(file_name: str, pattern: str) -> bool: + if not pattern or pattern == "*": + return True + if "*" in pattern: + from fnmatch import fnmatch + + return fnmatch(file_name, pattern) + # 无通配符 → 子串匹配(plan 中样品 pattern 通常是文件名前缀,如 "DPR019")。 + return pattern in file_name + + # ---------- 工作流 / 步骤参数 ---------- + + def _resolve_workflow_binding(self, day_key: str) -> Dict[str, Any]: + config = DAY_WORKFLOW_BINDINGS.get(day_key) + if not config: + raise PeptideWorkflowError(f"未知 day_key: {day_key}") + return self._resolve_workflow_binding_from_names(config["root_name"], config["sub_name"]) + + def _resolve_workflow_binding_from_names(self, workflow_name: str, subworkflow_name: str = "") -> Dict[str, Any]: + bindings = self._filter_workflow_records( + self._query_workflow_records(workflow_name), + workflow_name_filter=workflow_name, + subworkflow_name_filter=subworkflow_name, + ) + if not bindings: + raise PeptideWorkflowError(f"未找到工作流 {workflow_name!r} 的可用子工作流") + if len(bindings) > 1: + names = ", ".join(item.get("subworkflowName", "") for item in bindings) + raise PeptideWorkflowError(f"工作流 {workflow_name!r} 匹配到多个子工作流: {names}") + item = bindings[0] + return { + "workflow_name": item.get("workflowName"), + "root_workflow_id": item.get("workflowId"), + "sub_workflow_id": item.get("subworkflowId"), + "sub_workflow_name": item.get("subworkflowName"), + "raw": item, + } + + def _query_workflow_records(self, workflow_name_filter: str, include_detail: bool = True) -> List[Dict[str, Any]]: + payload = {"type": 0, "filter": workflow_name_filter, "includeDetail": include_detail} + data = self._require_hardware_interface().query_workflow(json.dumps(payload, ensure_ascii=False)) + records: List[Dict[str, Any]] = [] + for item in self._as_list(data.get("items") if isinstance(data, dict) else data): + if not isinstance(item, dict): + continue + root_id = str(item.get("id") or "") + root_name = str(item.get("name") or "") + for sub in self._as_list(item.get("subWorkflows")): + if not isinstance(sub, dict): + continue + if sub.get("isSaved") is False: + continue + records.append( + { + "workflowId": root_id, + "workflowName": root_name, + "subworkflowId": str(sub.get("id") or ""), + "subworkflowName": str(sub.get("name") or ""), + "sequence": sub.get("sequence"), + "status": sub.get("status"), + } + ) + return records + + def _filter_workflow_records( + self, + workflow_records: List[Dict[str, Any]], + *, + workflow_name_filter: str = "", + subworkflow_name_filter: str = "", + ) -> List[Dict[str, Any]]: + wf_filter = workflow_name_filter.strip() + sub_filter = subworkflow_name_filter.strip() + + def _match_name(value: str, needle: str) -> bool: + if not needle: + return True + return needle in value + + filtered = [ + record + for record in workflow_records + if _match_name(str(record.get("workflowName") or ""), wf_filter) + and _match_name(str(record.get("subworkflowName") or ""), sub_filter) + ] + if wf_filter: + exact = [r for r in filtered if str(r.get("workflowName") or "") == wf_filter] + if exact: + filtered = exact + if sub_filter: + exact_sub = [r for r in filtered if str(r.get("subworkflowName") or "") == sub_filter] + if exact_sub: + filtered = exact_sub + return filtered + + def _query_step_parameters(self, sub_workflow_id: str) -> Any: + data = self._require_hardware_interface().workflow_step_query(self._require_uuid(sub_workflow_id, "sub_workflow_id")) + return data or {} + + def _flatten_step_parameters(self, step_data: Any) -> List[Dict[str, Any]]: + parsed = self._json_loads_if_string(step_data) + if not isinstance(parsed, dict): + return [] + flattened: List[Dict[str, Any]] = [] + for step_id, modules in parsed.items(): + if not self._looks_like_uuid_text(step_id): + continue + for module in self._as_list(modules): + if not isinstance(module, dict): + continue + step_name = str(module.get("name") or "") + module_m = module.get("m") + module_n = module.get("n") + for parameter in self._as_list(module.get("parameterList") or module.get("ParameterList")): + if not isinstance(parameter, dict): + continue + key = parameter.get("Key") or parameter.get("key") + if not key: + continue + flattened.append( + { + "step": str(step_id), + "step_name": step_name, + "Key": str(key), + "display_para_name": ( + parameter.get("display_para_name") + or parameter.get("displayParaName") + or parameter.get("DisplayName") + or parameter.get("name") + or str(key) + ), + "m": parameter.get("m", module_m), + "n": parameter.get("n", module_n), + "TaskDisplayable": parameter.get("TaskDisplayable", parameter.get("task_displayable", parameter.get("taskDisplayable"))), + "Value": parameter.get("Value", parameter.get("value")), + "DisplayValue": parameter.get("DisplayValue", parameter.get("displayValue")), + } + ) + return flattened + + def _filter_step_parameter_records( + self, + records: List[Dict[str, Any]], + required_para: bool, + optional_parameter: bool, + hidden_para: bool, + ) -> List[Dict[str, Any]]: + filtered: List[Dict[str, Any]] = [] + for record in records: + displayable = record.get("TaskDisplayable") + is_hidden = displayable in (0, "0", False) + is_displayable = displayable in (1, "1", True) + has_value = self._parameter_value_present(record.get("Value")) or self._parameter_value_present(record.get("DisplayValue")) + if is_hidden and hidden_para: + filtered.append(record) + elif is_displayable and not has_value and required_para: + filtered.append(record) + elif is_displayable and has_value and optional_parameter: + filtered.append(record) + # TaskDisplayable 既非 0/1 时(None 或其它),保守跳过避免误归类。 + return filtered + + @staticmethod + def _parameter_value_present(value: Any) -> bool: + if value is None: + return False + if isinstance(value, str) and value == "": + return False + return True + + def _build_partial_parameter_entries( + self, + *, + sample_excel_relative_path: str, + day_key: Optional[str], + parameter_overrides: Any = None, + extra_autofill: Optional[List[Dict[str, Any]]] = None, + ) -> Tuple[List[Dict[str, Any]], List[str]]: + warnings: List[str] = [] + entries: List[Dict[str, Any]] = [{"Key": PEPTIDE_SAMPLE_FILE_KEY, "Value": sample_excel_relative_path}] + if day_key == "day1" and extra_autofill: + entries.extend(extra_autofill) + entries.extend(self._normalize_override_list(parameter_overrides, warnings)) + return entries, warnings + + def _normalize_override_list(self, overrides: Any, warnings: List[str]) -> List[Dict[str, Any]]: + if overrides is None or overrides == "" or overrides == []: + return [] + if isinstance(overrides, dict): + # 容错:用户传 dict 时按 {Key: Value} 展开。 + overrides = [{"Key": k, "Value": v} for k, v in overrides.items()] + if not isinstance(overrides, list): + raise PeptideWorkflowError("parameter_overrides 必须是列表或字典") + normalized: List[Dict[str, Any]] = [] + seen: Dict[Tuple[Any, ...], int] = {} + for raw in overrides: + entry = self._normalize_parameter_entry(raw, allow_key_only=True) + if entry is None: + continue + key = (entry.get("Key"), entry.get("m"), entry.get("n")) + if key in seen: + warnings.append(f"parameter_overrides 重复项 {key},采用最后一次覆盖") + normalized[seen[key]] = entry + else: + seen[key] = len(normalized) + normalized.append(entry) + return normalized + + def _normalize_parameter_entry(self, entry: Any, *, allow_key_only: bool = False) -> Optional[Dict[str, Any]]: + if not isinstance(entry, dict): + return None + key = entry.get("Key") or entry.get("key") + value = entry.get("Value", entry.get("value")) + if not key: + return None + if value is None and not allow_key_only: + return None + normalized: Dict[str, Any] = {"Key": str(key), "Value": value} + for axis in ("m", "n"): + if axis in entry and entry[axis] is not None: + try: + normalized[axis] = int(entry[axis]) + except (TypeError, ValueError): + normalized[axis] = entry[axis] + return normalized + + def _resolve_parameter_entries_against_live_steps( + self, + partial_entries: List[Dict[str, Any]], + flattened: List[Dict[str, Any]], + ) -> List[Dict[str, Any]]: + resolved: List[Dict[str, Any]] = [] + for partial in partial_entries: + key = partial.get("Key") + m_filter = partial.get("m") + n_filter = partial.get("n") + matches = [ + live + for live in flattened + if live.get("Key") == key + and (m_filter is None or live.get("m") == m_filter) + and (n_filter is None or live.get("n") == n_filter) + ] + if len(matches) != 1: + raise PeptideWorkflowError( + f"参数 Key={key} m={m_filter if m_filter is not None else ''} " + f"n={n_filter if n_filter is not None else ''} 期望唯一匹配,实际 {len(matches)} 条" + ) + live = matches[0] + resolved.append( + { + "step": live.get("step"), + "Key": key, + "Value": partial.get("Value"), + "m": partial.get("m", live.get("m")), + "n": partial.get("n", live.get("n")), + } + ) + return resolved + + def _group_resolved_entries_to_param_values(self, resolved_entries: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + grouped: Dict[str, List[Dict[str, Any]]] = {} + for entry in resolved_entries: + step_id = str(entry.get("step") or "") + if not self._looks_like_uuid_text(step_id): + continue + payload_entry: Dict[str, Any] = { + "key": str(entry.get("Key")), + "value": "" if entry.get("Value") is None else str(entry.get("Value")), + } + for axis in ("m", "n"): + if entry.get(axis) is not None: + try: + payload_entry[axis] = int(entry[axis]) + except (TypeError, ValueError): + payload_entry[axis] = entry[axis] + grouped.setdefault(step_id, []).append(payload_entry) + return grouped + + def _build_order_identity(self, day_key: str, order_name_override: Any = None) -> Tuple[str, str]: + stamp = datetime.now().strftime("%y%m%d-%H%M%S") + order_code = f"EXP{stamp}" + if order_name_override: + return order_code, str(order_name_override) + return order_code, f"实验{stamp}" + + def _create_order_payload( + self, + *, + order_code: str, + order_name: str, + sub_workflow_id: str, + param_values: Dict[str, List[Dict[str, Any]]], + border_number: int, + extend_properties: Any, + ) -> List[Dict[str, Any]]: + item: Dict[str, Any] = { + "orderCode": order_code, + "orderName": order_name, + "borderNumber": border_number, + "workFlowId": self._require_uuid(sub_workflow_id, "workFlowId"), + "paramValues": param_values, + "extendProperties": "" if extend_properties in (None, "") else str(extend_properties), + } + return [item] + + def _create_order(self, order_payload: List[Dict[str, Any]]) -> Any: + return self._require_hardware_interface().create_order(json.dumps(order_payload, ensure_ascii=False)) + + def _parse_create_order_allocation_map(self, create_order_data_raw: Any) -> Dict[str, Any]: + parsed = self._parse_result(create_order_data_raw) + allocation_map: Dict[str, Any] = {} + if isinstance(parsed, dict): + allocation_map = parsed + order_ids = [key for key in allocation_map if self._looks_like_uuid_text(key)] + rows: List[Dict[str, Any]] = [] + for order_key in order_ids: + for row in self._as_list(allocation_map.get(order_key)): + if isinstance(row, dict): + rows.append(row) + materials_by_type: Dict[str, List[Dict[str, Any]]] = {} + for row in rows: + mode = str(row.get("materialTypeMode") or "Unknown") + materials_by_type.setdefault(mode, []).append(row) + return { + "allocation_map": allocation_map, + "allocation_rows": rows, + "order_ids": order_ids, + "materials_by_type": materials_by_type, + } + + def _build_result_table(self, materials_by_type: Dict[str, List[Dict[str, Any]]], table_name: str = "resultTable") -> Dict[str, Any]: + material_info_cache: Dict[str, Dict[str, Any]] = {} + ordered_modes: List[str] = [] + for mode in MATERIAL_TYPE_ORDER: + if mode in materials_by_type: + ordered_modes.append(mode) + for mode in materials_by_type: + if mode not in ordered_modes: + ordered_modes.append(mode) + rows: List[Dict[str, Any]] = [] + for mode in ordered_modes: + for record in materials_by_type.get(mode, []): + material_id = str(record.get("materialId") or "") + location_code = str(record.get("locationShowName") or record.get("locationCode") or "") + rows.append( + { + "whName": self._resolve_wh_name_by_material_id(material_id, material_info_cache), + "locationCode": location_code, + "materialName": str(record.get("materialName") or ""), + "quantity": str(record.get("quantity") or ""), + } + ) + return {"data": rows, "columns": copy.deepcopy(RESULT_TABLE_COLUMNS), "tableName": table_name} + + def _resolve_wh_name_by_material_id(self, material_id: str, cache: Dict[str, Dict[str, Any]]) -> str: + if not material_id: + return "" + if material_id not in cache: + try: + cache[material_id] = self._require_hardware_interface().material_info(material_id) or {} + except Exception as exc: + logger.warning("material_info 查询失败 material_id=%s: %s", material_id, exc) + cache[material_id] = {} + locations = self._as_list(cache[material_id].get("locations")) + location = next((loc for loc in locations if isinstance(loc, dict)), {}) + return str(location.get("whName") or "") + + def _normalize_order_list_params(self, params: Dict[str, Any]) -> Dict[str, Any]: + return { + "timeType": params.get("timeType") or "", + "beginTime": params.get("beginTime"), + "endTime": params.get("endTime"), + "status": params.get("status") or "", + "filter": params.get("filter") or "", + "skipCount": int(params.get("skipCount") or 0), + "pageCount": int(params.get("pageCount") or 20), + "sorting": params.get("sorting") or "", + } + + def _normalize_order_report(self, raw: Any) -> Dict[str, Any]: + if not isinstance(raw, dict): + return {} + return { + "id": raw.get("id"), + "name": raw.get("name"), + "code": raw.get("code"), + "workflow_name": raw.get("workflowName"), + "status": raw.get("status"), + "status_name": raw.get("statusName"), + "pre_intakes_count": len(self._as_list(raw.get("preIntakes"))), + "result_list_count": len(self._as_list(raw.get("resultList"))), + } + + # ---------- 基础设施 ---------- + + def _run_scheduler_action(self, method_name: str, label: str) -> Dict[str, Any]: + rpc = self._require_hardware_interface() + method = getattr(rpc, method_name, None) + if not callable(method): + raise RuntimeError(f"RPC 缺少调度器方法: {method_name}") + code = method() + success = code == 1 + return {"success": success, "code": code, "message": f"调度器{label}{'成功' if success else '失败'}"} + + def _require_hardware_interface(self): + interface = getattr(self, "hardware_interface", None) + if interface is None: + raise RuntimeError("BioyondPeptideStation 未绑定 hardware_interface") + return interface + @staticmethod def _normalize_reset_operations(reset_operations: Optional[List[str]]) -> List[str]: alias_map = { @@ -610,331 +1295,32 @@ class BioyondPeptideStation(BioyondWorkstation): "location": "reset_location", "reset_location": "reset_location", } - operations = list(reset_operations or DEFAULT_RESET_OPERATIONS) normalized: List[str] = [] - for operation in operations: - key = str(operation).strip() - canonical = alias_map.get(key) + for operation in list(reset_operations or DEFAULT_RESET_OPERATIONS): + canonical = alias_map.get(str(operation).strip()) if not canonical: raise ValueError(f"未知 reset operation: {operation}") if canonical not in normalized: normalized.append(canonical) return normalized - @action(always_free=True, description="直接启动 Bioyond 多肽调度器") - def scheduler_start(self, **kwargs: Any) -> Dict[str, Any]: - """直接调用 Bioyond 调度器启动接口。""" - del kwargs - return self._run_scheduler_action("scheduler_start", "启动") - - @action(always_free=True, description="直接停止 Bioyond 多肽调度器") - def scheduler_stop(self, **kwargs: Any) -> Dict[str, Any]: - """直接调用 Bioyond 调度器停止接口。""" - del kwargs - return self._run_scheduler_action("scheduler_stop", "停止") - - @action(always_free=True, description="直接暂停 Bioyond 多肽调度器") - def scheduler_pause(self, **kwargs: Any) -> Dict[str, Any]: - """直接调用 Bioyond 调度器暂停接口。""" - del kwargs - return self._run_scheduler_action("scheduler_pause", "暂停") - - @action(always_free=True, description="直接继续 Bioyond 多肽调度器") - def scheduler_continue(self, **kwargs: Any) -> Dict[str, Any]: - """直接调用 Bioyond 调度器继续接口。""" - del kwargs - return self._run_scheduler_action("scheduler_continue", "继续") - - def _resolve_submit_sample_file(self, required_params: Dict[str, Any], optional_params: Dict[str, Any]) -> tuple[str, Dict[str, Any]]: - pattern = str(required_params.get("sample_excel_pattern") or "").strip() - if not pattern: - raise PeptideWorkflowError("提交实验必须提供 sample_excel_pattern(样品 Excel 文件名匹配模式)") - if bool(optional_params.get("auto_upload_local_excel")): - local_path = self._resolve_local_excel_path(optional_params, pattern) - self._upload_sample_excel_file(local_path) - selected = self._select_available_sample_excel(pattern) - sample_file = str(selected.get("relativePath") or selected.get("filePath") or "").replace("/", "\\") - if not sample_file: - raise PeptideWorkflowError(f"样品 Excel 匹配 {pattern!r},但返回记录缺少 relativePath/filePath") - return sample_file, selected - - def _select_available_sample_excel(self, pattern: str) -> Dict[str, Any]: - return self._find_sample_excel(self._list_sample_excels(name_filter=pattern.replace("*", "")), pattern) - - def _find_sample_excel(self, records: List[Dict[str, Any]], pattern: str) -> Dict[str, Any]: - matched = [record for record in records if self._filename_matches_pattern(str(record.get("fileName") or ""), pattern)] - if not matched: - raise PeptideWorkflowError(f"未找到匹配 {pattern!r} 的样品 Excel,工作流已停止") - if len(matched) > 1: - names = ", ".join(str(item.get("fileName") or "") for item in matched) - raise PeptideWorkflowError(f"找到多个匹配 {pattern!r} 的样品 Excel: {names},请收窄匹配模式") - return matched[0] - - def _list_sample_excels(self, name_filter: str = "", begin_date: Any = None, end_date: Any = None) -> List[Dict[str, Any]]: - api_host = str(self.bioyond_config.get("api_host", "")).rstrip("/") - api_key = str(self.bioyond_config.get("api_key", "")) - timeout = int(self.bioyond_config.get("timeout", 30) or 30) - if not api_host or not api_key: - raise ValueError("缺少 api_host/api_key 配置") - response = requests.post( - f"{api_host}/api/lims/order/sample-info-excels", - json={ - "apiKey": api_key, - "requestTime": _utc_now_iso8601_ms(), - "data": {"beginDate": begin_date, "endDate": end_date, "nameFilter": name_filter or None}, - }, - timeout=timeout, - headers={"Content-Type": "application/json"}, - ) - try: - body: Any = response.json() - except ValueError: - body = {"raw_text": response.text} - if response.status_code >= 400 or not isinstance(body, dict) or body.get("code") != 1: - raise RuntimeError(f"样品 Excel 列表查询失败: status={response.status_code}, response={body}") - data = body.get("data") - return data if isinstance(data, list) else [] - - def _upload_sample_excel_file(self, local_excel_path: str | Path, content_type: Optional[str] = None) -> Dict[str, Any]: - api_host = str(self.bioyond_config.get("api_host", "")).rstrip("/") - timeout = int(self.bioyond_config.get("timeout", 30) or 30) - if not api_host: - raise ValueError("缺少 api_host 配置") - local_path = Path(str(local_excel_path).strip()).expanduser() - if not local_path.is_absolute() and not str(local_excel_path).startswith("./"): - raise ValueError("样品 Excel 文件路径请使用完整路径;相对路径必须以 ./ 开头") - if not local_path.exists() or not local_path.is_file(): - raise FileNotFoundError(f"样品 Excel 文件不存在: {local_path}") - resolved_content_type = content_type or mimetypes.guess_type(local_path.name)[0] or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - logger.info(f"上传多肽样品 Excel: {local_path.name}") - with local_path.open("rb") as file: - response = requests.post( - f"{api_host}/api/lims/order/up-load-sample-file", - files={"file": (local_path.name, file, resolved_content_type)}, - timeout=timeout, - ) - try: - body: Any = response.json() - except ValueError: - body = {"raw_text": response.text} - result = { - "endpoint": "/api/lims/order/up-load-sample-file", - "http_status": response.status_code, - "content_type": response.headers.get("content-type"), - "request": { - "file_path": str(local_path), - "file_name": local_path.name, - "field_name": "file", - "file_content_type": resolved_content_type, - "wrapped_envelope": False, - }, - "response": body, - } - if response.status_code >= 400 or not isinstance(body, dict) or body.get("code") != 1: - raise RuntimeError(f"样品 Excel 上传失败: {result}") - file_info = body.get("data") if isinstance(body.get("data"), dict) else {} - remote_file_path = str(file_info.get("filePath") or "") - result.update( - { - "success": True, - "lims_file_info": file_info, - "relative_path": remote_file_path, - "sample_file_parameter": remote_file_path.replace("/", "\\") if remote_file_path else "", - } - ) - return result - - def _resolve_local_excel_path(self, optional_params: Dict[str, Any], pattern: str) -> Path: - explicit = str(optional_params.get("local_excel_path") or self.bioyond_config.get("default_local_excel_path") or "").strip() - if explicit: - return Path(explicit).expanduser() - matches = sorted(Path.cwd().glob(pattern)) - if not matches: - raise FileNotFoundError(f"本地未找到样品 Excel: {Path.cwd() / pattern}") - return matches[-1] + @staticmethod + def _reset_operation_endpoint(operation: str) -> str: + return { + "scheduler_reset": "/api/lims/scheduler/reset", + "reset_order_status": "/api/lims/order/reset-order-status", + "reset_location": "/api/lims/storage/reset-location", + }.get(operation, "") @staticmethod - def _filename_matches_pattern(file_name: str, pattern: str) -> bool: - if pattern == "*" or not pattern: - return True - if pattern.startswith("*") and pattern.endswith("*"): - return pattern.strip("*") in file_name - if pattern.startswith("*"): - return file_name.endswith(pattern[1:]) - if pattern.endswith("*"): - return file_name.startswith(pattern[:-1]) - return file_name == pattern - - def _resolve_workflow_by_name(self, workflow_name: str) -> Dict[str, Any]: - rpc = self._require_hardware_interface() - params = {"type": 0, "filter": workflow_name, "includeDetail": True} - data = rpc.query_workflow(json.dumps(params, ensure_ascii=False)) - records = list(self._iter_dicts(data)) - exact_records = [record for record in records if self._record_name(record) == workflow_name] - root = self._choose_workflow_record(exact_records) or self._choose_workflow_record(records) - root_id = self._record_id(root) or self._workflow_id_from_config(workflow_name) - sub = self._choose_sub_workflow_record(root, workflow_name) or root - sub_id = self._record_id(sub) or root_id - if not root_id or not sub_id: - raise RuntimeError(f"无法解析工作流 {workflow_name}: {data}") - return {"workflow_name": workflow_name, "root_workflow_id": str(root_id), "sub_workflow_id": str(sub_id), "raw": root or data} - - def _workflow_step_data(self, sub_workflow_id: str) -> Any: - data = self._require_hardware_interface().workflow_step_query(self._require_uuid(sub_workflow_id, "sub_workflow_id")) - if not data: - logger.warning(f"LIMS 未返回子工作流参数: {sub_workflow_id}") - return data or {} - - def _build_order_identity(self, *, workflow_name: str, order_name: Any = None) -> tuple[str, str]: - suffix = datetime.now().strftime("%m%d%H%M%S") - order_code = f"UL{suffix}" - if order_name: - return order_code, str(order_name) - if "DAY2" in workflow_name.upper(): - label = "Day2" - elif "DAY3" in workflow_name.upper(): - label = "Day3" - elif "DAY4" in workflow_name.upper(): - label = "Day4" - else: - label = "Peptide" - return order_code, f"UL-{label}-{suffix}" - - def _build_param_values(self, raw_parameters: Dict[str, Any], *, sample_file: str, parameter_overrides: Dict[str, Any]) -> Dict[str, Any]: - param_values = self._filter_raw_parameters(raw_parameters) - if not self._set_peptide_existing_parameter_value(param_values, PEPTIDE_SAMPLE_FILE_KEYS, sample_file): - appended = self._append_peptide_raw_parameter_value(param_values, raw_parameters, PEPTIDE_SAMPLE_FILE_KEYS, sample_file) - if appended is None: - self._append_peptide_parameter_value(param_values, PEPTIDE_SAMPLE_FILE_KEYS, sample_file) - for key, value in dict(parameter_overrides or {}).items(): - if not self._set_peptide_existing_parameter_value(param_values, [str(key)], value): - appended = self._append_peptide_raw_parameter_value(param_values, raw_parameters, [str(key)], value) - if appended is None: - self._append_peptide_parameter_value(param_values, [str(key)], value) - return param_values - - def _filter_raw_parameters(self, raw_parameters: Dict[str, Any]) -> Dict[str, Any]: - filtered: Dict[str, List[Dict[str, Any]]] = {} - for step_id, modules in raw_parameters.items(): - if not self._looks_like_uuid_text(step_id): - continue - entries: List[Dict[str, Any]] = [] - for module in modules if isinstance(modules, list) else []: - if not isinstance(module, dict): - continue - module_m = module.get("m") - module_n = module.get("n") - parameter_list = module.get("parameterList") or module.get("ParameterList") or [] - for parameter in parameter_list if isinstance(parameter_list, list) else []: - if not isinstance(parameter, dict): - continue - if not self._peptide_raw_parameter_matches(parameter, {"TaskDisplayable": [1, "1", True]}): - continue - key = self._case_value(parameter, "key", "Key") - include_value, value = self._peptide_raw_parameter_output_value(parameter) - if not key or not include_value: - continue - entry: Dict[str, Any] = {"key": str(key), "value": self._peptide_raw_parameter_output_text(value)} - m_value = parameter.get("m", module_m) - n_value = parameter.get("n", module_n) - if m_value is not None: - entry["m"] = m_value - if n_value is not None: - entry["n"] = n_value - entries.append(entry) - if entries: - filtered[str(step_id)] = entries - return filtered - - def _append_peptide_raw_parameter_value(self, param_values: Dict[str, Any], raw_parameters: Dict[str, Any], keys: Iterable[str], value: Any) -> Optional[str]: - wanted = set(keys) - for step_id, modules in raw_parameters.items(): - if not self._looks_like_uuid_text(step_id): - continue - for module in modules if isinstance(modules, list) else []: - if not isinstance(module, dict): - continue - parameter_list = module.get("parameterList") or module.get("ParameterList") or [] - for parameter in parameter_list if isinstance(parameter_list, list) else []: - if not isinstance(parameter, dict): - continue - key = self._case_value(parameter, "key", "Key") - if key not in wanted: - continue - entry: Dict[str, Any] = {"key": str(key), "value": self._peptide_raw_parameter_output_text(value)} - for axis in ("m", "n"): - axis_value = parameter.get(axis, module.get(axis)) - if axis_value is not None: - entry[axis] = axis_value - param_values.setdefault(str(step_id), []).append(entry) - return str(key) - return None - - def _append_peptide_parameter_value(self, param_values: Dict[str, Any], keys: Iterable[str], value: Any) -> str: - for step_id, entries in param_values.items(): - if self._looks_like_uuid_text(step_id) and isinstance(entries, list): - key = next(iter(keys)) - entries.append({"m": 0, "n": 0, "key": key, "value": self._peptide_raw_parameter_output_text(value)}) - return str(key) - raise RuntimeError("LIMS 工作流参数未包含可追加的 UUID step bucket") - - def _set_peptide_existing_parameter_value(self, param_values: Any, keys: Iterable[str], value: Any) -> bool: - wanted = set(keys) - updated = False - for entry in self._iter_peptide_parameter_entries(param_values): - key = entry.get("key") if "key" in entry else entry.get("Key") - if key not in wanted: - continue - value_key = "Value" if "Value" in entry else "value" - display_key = "DisplayValue" if "DisplayValue" in entry else "displayValue" - value_text = self._peptide_raw_parameter_output_text(value) - entry[value_key] = value_text - if display_key in entry: - entry[display_key] = value_text - updated = True - return updated - - def _iter_peptide_parameter_entries(self, value: Any) -> Iterable[Dict[str, Any]]: - if isinstance(value, dict): - for child in value.values(): - yield from self._iter_peptide_parameter_entries(child) - elif isinstance(value, list): - for item in value: - if isinstance(item, dict) and ("key" in item or "Key" in item): - yield item - yield from self._iter_peptide_parameter_entries(item) - - @staticmethod - def _case_value(obj: Dict[str, Any], *keys: str, missing: Any = None) -> Any: - for key in keys: - if key in obj: - return obj.get(key) - return missing - - @classmethod - def _peptide_raw_parameter_matches(cls, parameter: Dict[str, Any], field_filters: Dict[str, Any]) -> bool: - for field_name, expected in field_filters.items(): - actual = cls._case_value(parameter, str(field_name), str(field_name)[0].lower() + str(field_name)[1:], missing=None) - if isinstance(expected, (list, tuple, set)): - if actual not in expected: - return False - elif actual != expected: - return False - return True - - @classmethod - def _peptide_raw_parameter_output_value(cls, parameter: Dict[str, Any]) -> tuple[bool, Any]: - missing = object() - for key in ("value", "Value", "displayValue", "DisplayValue", "defaultValue", "DefaultValue"): - value = cls._case_value(parameter, key, missing=missing) - if value is not missing and value not in (None, ""): - return True, value - return False, "" - - def _peptide_raw_parameter_output_text(self, value: Any) -> str: - if isinstance(value, (dict, list)): - return self._json_dumps_stable(value) - return "" if value is None else str(value) + def _extract_order_ids(order_id: str = "", order_ids: Optional[List[str]] = None, **kwargs: Any) -> List[str]: + resolved: List[str] = [] + if order_id: + resolved.append(str(order_id)) + raw = order_ids if order_ids is not None else kwargs.get("order_ids") + if isinstance(raw, list): + resolved.extend(str(value) for value in raw if value) + return list(dict.fromkeys(resolved)) def _parse_result(self, result: Any) -> Any: if not isinstance(result, str): @@ -951,296 +1337,19 @@ class BioyondPeptideStation(BioyondWorkstation): except (ValueError, SyntaxError): return text - def _extract_order_ids_from_result(self, value: Any) -> List[str]: - ids: List[str] = [] - parsed = self._parse_result(value) - - def add(candidate: Any) -> None: - if candidate and self._looks_like_order_id(candidate): - ids.append(str(candidate)) - - def visit(obj: Any) -> None: - obj = self._parse_result(obj) - if isinstance(obj, str): - add(obj) - return - if isinstance(obj, list): - for item in obj: - visit(item) - return - if not isinstance(obj, dict): - return - for key in ("orderId", "orderID", "order_id"): - add(obj.get(key)) - for key in ("orderIds", "order_ids"): - for item in self._as_list(obj.get(key)): - add(item) - if obj.get("id") and any(obj.get(key) for key in ("orderCode", "orderName", "statusName", "status")): - add(obj.get("id")) - if len(obj) == 1: - first_key = next(iter(obj)) - add(first_key) - for item in obj.values(): - if isinstance(item, (dict, list)): - visit(item) - - visit(parsed) - return list(dict.fromkeys(ids)) - - @staticmethod - def _looks_like_order_id(value: Any) -> bool: - text = str(value) - lowered = text.lower() - return len(text) >= 8 and ("-" in text or lowered.startswith(("order", "bso", "3a"))) - @staticmethod def _looks_like_uuid_text(value: Any) -> bool: text = str(value) return len(text) == 36 and text.count("-") == 4 - def _looks_like_uuid(self, value: Any) -> bool: - try: - UUID(str(value)) - return True - except (TypeError, ValueError, AttributeError): - return False - - def _looks_like_step_parameter_map(self, value: Any) -> bool: - parsed = self._json_loads_if_string(value) - return isinstance(parsed, dict) and any(self._looks_like_uuid(key) and isinstance(item, list) for key, item in parsed.items()) - - def _extract_workflow_parameters(self, step_data: Any) -> Any: - parsed = self._json_loads_if_string(step_data) - if self._looks_like_step_parameter_map(parsed): - return parsed - for key in ("paramValues", "stepParameters", "workflowParameter", "parameters", "data"): - value = self._find_first_key(parsed, key) - if value not in (None, {}, []): - loaded = self._json_loads_if_string(value) - if self._looks_like_step_parameter_map(loaded): - return loaded - return parsed - - @staticmethod - def _is_blank_parameter_value(value: Any) -> bool: - return value is None or (isinstance(value, str) and value.strip() == "") - - def _create_order(self, order_payload: List[Dict[str, Any]]) -> Any: - if not order_payload: - raise RuntimeError("缺少 LIMS 订单负载,无法提交多肽实验") - return self._require_hardware_interface().create_order(json.dumps(self._canonicalize_create_payload(order_payload), ensure_ascii=False)) - def _require_uuid(self, value: Any, field_name: str) -> str: try: return str(UUID(str(value))) except (TypeError, ValueError, AttributeError) as exc: - raise ValueError(f"LIMS 创建订单字段 {field_name} 必须是 UUID: {value!r}") from exc - - def _canonicalize_create_payload(self, order_payload: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - canonical_payload: List[Dict[str, Any]] = [] - for index, item in enumerate(order_payload): - if not isinstance(item, dict): - raise ValueError(f"LIMS 创建订单 payload[{index}] 必须是对象") - canonical_item = copy.deepcopy(item) - workflow_id = canonical_item.get("workFlowId") or canonical_item.pop("workflowId", None) - canonical_item.pop("workflowId", None) - canonical_item["workFlowId"] = self._require_uuid(workflow_id, "workFlowId") - if "ExtendProperties" in canonical_item and "extendProperties" not in canonical_item: - canonical_item["extendProperties"] = canonical_item.pop("ExtendProperties") - else: - canonical_item.pop("ExtendProperties", None) - canonical_item["paramValues"] = self._normalize_param_values(canonical_item.get("paramValues")) - canonical_payload.append(canonical_item) - return canonical_payload - - def _normalize_param_values(self, param_values: Any) -> Dict[str, Any]: - parsed = self._json_loads_if_string(param_values) - if not isinstance(parsed, dict): - return {} - normalized: Dict[str, Any] = {} - for step_id, entries in parsed.items(): - if not self._looks_like_uuid(step_id): - continue - normalized_entries = [entry for item in self._as_list(entries) if (entry := self._normalize_parameter_entry(item)) is not None] - if normalized_entries: - normalized[str(step_id)] = normalized_entries - return normalized - - def _normalize_parameter_entry(self, entry: Any) -> Optional[Dict[str, Any]]: - if not isinstance(entry, dict): - return None - normalized = copy.deepcopy(entry) - for source_key, target_key in _PARAMETER_KEY_ALIASES.items(): - if source_key in normalized and target_key not in normalized: - normalized[target_key] = normalized.pop(source_key) - elif source_key in normalized: - normalized.pop(source_key) - key = normalized.get("key") - value = normalized.get("value") - display_value = normalized.get("displayValue") - if self._is_blank_parameter_value(value) and not self._is_blank_parameter_value(display_value): - value = display_value - if self._is_blank_parameter_value(key) or self._is_blank_parameter_value(value): - return None - sanitized: Dict[str, Any] = {"key": str(key), "value": self._peptide_raw_parameter_output_text(value)} - for axis in ("m", "n"): - axis_value = normalized.get(axis) - if self._is_blank_parameter_value(axis_value): - continue - try: - sanitized[axis] = int(axis_value) - except (TypeError, ValueError): - sanitized[axis] = axis_value - return sanitized - - def _build_result_table(self, create_order_result: Any) -> Dict[str, Any]: - rows: List[Dict[str, Any]] = [] - for record in self._iter_dicts(create_order_result): - material_name = record.get("materialName") or record.get("name") or record.get("materialTypeName") - location = record.get("materialLocation") or record.get("locationName") or record.get("targetLocation") or record.get("materialTargetLocation") - material_code = record.get("materialCode") or record.get("code") or record.get("materialBarCode") - quantity = record.get("quantity") or record.get("useQuantity") or record.get("actualQuantity") - if any(value not in (None, "") for value in (material_name, location, material_code, quantity)): - rows.append( - { - "material_name": "" if material_name is None else str(material_name), - "material_code": "" if material_code is None else str(material_code), - "location": "" if location is None else str(location), - "quantity": "" if quantity is None else str(quantity), - } - ) - return { - "tableName": "多肽物料装载确认", - "columns": [ - {"key": "material_name", "title": "物料"}, - {"key": "material_code", "title": "编号"}, - {"key": "location", "title": "库位"}, - {"key": "quantity", "title": "数量"}, - ], - "data": rows, - } - - def _run_scheduler_action(self, method_name: str, label: str) -> Dict[str, Any]: - with self._debug_call_session(method_name): - rpc = self._require_hardware_interface() - before = self._safe_scheduler_status() - code = getattr(rpc, method_name)() - after = self._safe_scheduler_status() - return { - "success": code == 1, - "operation": method_name, - "operation_label": label, - "code": code, - "scheduler_status_before": before, - "scheduler_status_after": after, - } - - def _safe_scheduler_status(self) -> Dict[str, Any]: - try: - status = self._require_hardware_interface().scheduler_status() - return status if isinstance(status, dict) else {} - except Exception as exc: - return {"error": str(exc)} - - def _require_hardware_interface(self): - rpc = getattr(self, "hardware_interface", None) - if rpc is None: - raise RuntimeError("Bioyond RPC 客户端未初始化") - return rpc + raise ValueError(f"{field_name} 必须是 UUID: {value!r}") from exc @staticmethod - def _extract_order_ids(order_id: str = "", order_ids: Optional[List[str]] = None, **kwargs: Any) -> List[str]: - raw_order_ids = order_ids if order_ids is not None else kwargs.get("order_ids") - if isinstance(raw_order_ids, list): - resolved = [str(value) for value in raw_order_ids if value] - elif isinstance(raw_order_ids, str) and raw_order_ids.strip(): - try: - parsed = json.loads(raw_order_ids) - resolved = [str(value) for value in parsed] if isinstance(parsed, list) else [raw_order_ids] - except ValueError: - resolved = [raw_order_ids] - else: - resolved = [] - if order_id: - resolved.insert(0, str(order_id)) - return list(dict.fromkeys(resolved)) - - @staticmethod - def _reset_operation_endpoint(operation: str) -> str: - return { - "scheduler_reset": "/api/lims/scheduler/reset", - "reset_order_status": "/api/lims/order/reset-order-status", - "reset_location": "/api/lims/storage/reset-location", - }.get(operation, "") - - def _iter_dicts(self, obj: Any) -> Iterable[Dict[str, Any]]: - parsed = self._json_loads_if_string(obj) - if isinstance(parsed, dict): - yield parsed - for value in parsed.values(): - yield from self._iter_dicts(value) - elif isinstance(parsed, list): - for item in parsed: - yield from self._iter_dicts(item) - - def _record_name(self, record: Optional[Dict[str, Any]]) -> Optional[str]: - if not isinstance(record, dict): - return None - for key in ("name", "workflowName", "workFlowName", "displayName"): - if record.get(key): - return str(record[key]) - return None - - def _record_id(self, record: Optional[Dict[str, Any]]) -> Optional[str]: - if not isinstance(record, dict): - return None - for key in ("id", "workflowId", "workFlowId", "subWorkflowId", "subWorkFlowId"): - if record.get(key): - return str(record[key]) - return None - - def _choose_workflow_record(self, records: Iterable[Dict[str, Any]]) -> Optional[Dict[str, Any]]: - for record in records: - if isinstance(record, dict) and ("subWorkflows" in record or "workflows" in record or "workflowId" in record or "id" in record): - return record - return None - - def _choose_sub_workflow_record(self, root: Optional[Dict[str, Any]], workflow_name: str) -> Optional[Dict[str, Any]]: - if not isinstance(root, dict): - return None - candidates = [] - for key in ("subWorkflows", "subworkflows", "workflows", "Workflows"): - candidates.extend([item for item in self._as_list(root.get(key)) if isinstance(item, dict)]) - for record in candidates: - if self._record_name(record) == workflow_name: - return record - return candidates[0] if candidates else None - - def _workflow_id_from_config(self, workflow_name: str) -> Optional[str]: - mappings = self.bioyond_config.get("workflow_mappings", {}) or {} - if isinstance(mappings, dict): - value = mappings.get(workflow_name) - if value: - return str(value) - return None - - def _find_first_key(self, obj: Any, key: str) -> Any: - parsed = self._json_loads_if_string(obj) - if isinstance(parsed, dict): - if key in parsed: - return parsed.get(key) - for value in parsed.values(): - found = self._find_first_key(value, key) - if found is not None: - return found - elif isinstance(parsed, list): - for item in parsed: - found = self._find_first_key(item, key) - if found is not None: - return found - return None - - def _json_loads_if_string(self, value: Any) -> Any: + def _json_loads_if_string(value: Any) -> Any: if not isinstance(value, str): return value text = value.strip() @@ -1251,9 +1360,6 @@ class BioyondPeptideStation(BioyondWorkstation): except ValueError: return value - def _json_dumps_stable(self, value: Any) -> str: - return json.dumps(value, ensure_ascii=False, sort_keys=True, separators=(",", ":")) - @staticmethod def _as_list(value: Any) -> List[Any]: if value is None: @@ -1262,27 +1368,17 @@ class BioyondPeptideStation(BioyondWorkstation): def main() -> int: - """命令行入口:读取配置并拉取工作流列表。""" - parser = argparse.ArgumentParser(description="Peptide Station 工作流列表拉取") + assert DEBUG_CLI_ENABLED, "CLI 工作流探测仅在 DEBUG_CLI_ENABLED=True 时可用" + parser = argparse.ArgumentParser(description="Peptide Station 工作流列表拉取(调试)") parser.add_argument("config_path", help="JSON 配置文件路径") - parser.add_argument("--workflow-type", type=int, default=0, help="工作流类型,默认 0") - parser.add_argument("--filter", default="", help="工作流名称过滤字段") + parser.add_argument("--workflow-type", type=int, default=0) + parser.add_argument("--filter", default="") args = parser.parse_args() - - result = fetch_workflow_list( - config_path=args.config_path, - workflow_type=args.workflow_type, - filter_text=args.filter, - ) + result = fetch_workflow_list(config_path=args.config_path, workflow_type=args.workflow_type, filter_text=args.filter) print(json.dumps(result, ensure_ascii=False, indent=2)) - response_body = result.get("response", {}) - is_success = ( - result.get("http_status") == 200 - and isinstance(response_body, dict) - and response_body.get("code") == 1 - ) - return 0 if is_success else 1 + ok = result.get("http_status") == 200 and isinstance(response_body, dict) and response_body.get("code") == 1 + return 0 if ok else 1 if __name__ == "__main__": diff --git a/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/__init__.py b/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py b/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py new file mode 100644 index 00000000..f2a4dfb4 --- /dev/null +++ b/unilabos/devices/workstation/bioyond_studio/peptide_station/tests/test_peptide_station_contracts.py @@ -0,0 +1,642 @@ +"""多肽站 AST/参数/结果表 离线契约测试。""" + +from __future__ import annotations + +import importlib +import inspect +import json +import sys +from pathlib import Path +from typing import Any, Dict, List +from unittest.mock import MagicMock + +import pytest + +REPO_ROOT = Path(__file__).resolve().parents[6] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +MODULE_PATH = "unilabos.devices.workstation.bioyond_studio.peptide_station.peptide_station" +CLASS_NAME = "BioyondPeptideStation" + +ORDER_GUID = "3a20eabe-bad5-ef95-49bd-7ffbd5df189d" +CREATE_ALLOCATION = { + ORDER_GUID: [ + { + "materialId": "mat-tip", + "materialName": "200μL枪头盒", + "materialCode": "0008-00105", + "quantity": "1个", + "materialTypeMode": "Consumables", + "locationCode": "1-01", + "locationShowName": "1-01", + }, + { + "materialId": "mat-plate", + "materialName": "96孔板", + "materialCode": "PLATE-96", + "quantity": "1", + "materialTypeMode": "Sample", + "locationCode": "A1", + "locationShowName": "A1-show", + }, + { + "materialId": "mat-extra", + "materialName": "未知耗材", + "materialCode": "X-1", + "quantity": "2", + "materialTypeMode": "Future", + "locationCode": "Z9", + "locationShowName": "", + }, + ] +} + +FLATTENED_LIVE = [ + {"step": "39c78d4b-b5d3-f721-2001-9d52000084c3", "step_name": "S1", "Key": "SampleFile", "m": 0, "n": 0, "Value": "", "DisplayValue": "", "TaskDisplayable": 1}, + {"step": "39c78d4b-b5d3-f721-2001-9d52000084c3", "step_name": "S1", "Key": "Example", "m": 0, "n": 0, "Value": "x", "DisplayValue": "x", "TaskDisplayable": 1}, + {"step": "39c78d4b-b5d3-f721-2001-9d52000084c4", "step_name": "S2", "Key": "protocol", "m": 14, "n": 28, "Value": "", "DisplayValue": "", "TaskDisplayable": 1}, + {"step": "39c78d4b-b5d3-f721-2001-9d52000084c5", "step_name": "S3", "Key": "CEMMethodFileName", "m": 0, "n": 0, "Value": "", "DisplayValue": "", "TaskDisplayable": 1}, +] + + +def _import_module() -> Any: + return importlib.import_module(MODULE_PATH) + + +def _make_station() -> Any: + module = _import_module() + cls = getattr(module, CLASS_NAME) + station = object.__new__(cls) + station.bioyond_config = {"api_host": "http://test", "api_key": "k", "warehouse_mapping": {}} + rpc = MagicMock() + rpc.host = "http://test" + rpc.api_key = "k" + rpc.material_info.return_value = {"locations": [{"whName": "自动化堆栈", "code": "1-01"}]} + station.hardware_interface = rpc + return station + + +# --------------------------------------------------------------------------- +# 1. AST/导入面 +# --------------------------------------------------------------------------- + + +def test_required_actions_exposed() -> None: + cls = getattr(_import_module(), CLASS_NAME) + required = { + "upload_sample_excel", + "list_sample_excels", + "get_step_parameters", + "submit_experiment", + "submit_experiment_day1", + "submit_experiment_day2", + "submit_experiment_day3", + "submit_experiment_day4", + "submit_experiment_day4_LCMS", + "start_experiment", + "reset", + "scheduler_start", + "scheduler_stop", + "scheduler_pause", + "scheduler_continue", + "get_order_list", + "get_order_report", + "get_aggregated_order_report", + } + have = {name for name, _ in inspect.getmembers(cls, inspect.isfunction)} + missing = sorted(required - have) + assert not missing, f"缺少动作: {missing}" + + +def test_manual_confirm_node_types() -> None: + module = _import_module() + cls = getattr(module, CLASS_NAME) + manual = {"submit_experiment_day1", "start_experiment"} + normal = { + "submit_experiment", + "submit_experiment_day2", + "submit_experiment_day3", + "submit_experiment_day4", + "submit_experiment_day4_LCMS", + "reset", + "scheduler_start", + "list_sample_excels", + "get_step_parameters", + "get_order_list", + "get_order_report", + } + for name in manual: + meta = getattr(getattr(cls, name), "_action_registry_meta", {}) + assert meta.get("node_type") == module.NodeType.MANUAL_CONFIRM, name + for name in normal: + meta = getattr(getattr(cls, name), "_action_registry_meta", {}) + assert meta.get("node_type") != module.NodeType.MANUAL_CONFIRM, name + + +def test_submit_and_reset_signatures_exclude_legacy_manual_confirm() -> None: + cls = getattr(_import_module(), CLASS_NAME) + for name in ( + "submit_experiment", + "submit_experiment_day2", + "submit_experiment_day3", + "submit_experiment_day4", + "submit_experiment_day4_LCMS", + "reset", + ): + params = inspect.signature(getattr(cls, name)).parameters + assert "timeout_seconds" not in params, name + assert "assignee_user_ids" not in params, name + + +def test_day1_submit_accepts_manual_confirm_kwargs() -> None: + """plan: Day1 是 MANUAL_CONFIRM;框架会注入 timeout_seconds/assignee_user_ids,函数必须能接收。""" + cls = getattr(_import_module(), CLASS_NAME) + sig = inspect.signature(cls.submit_experiment_day1) + has_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()) + assert has_kwargs, "submit_experiment_day1 必须有 **kwargs 以容纳人工确认框架字段" + + +def test_typed_dicts_present() -> None: + module = _import_module() + for cls_name in ( + "PeptideGenericSubmitRequiredParams", + "PeptideGenericSubmitOptionalParams", + "PeptideDay1RequiredParams", + "PeptideDay1OptionalParams", + "PeptideDay2RequiredParams", + "PeptideDay2OptionalParams", + "PeptideDay3RequiredParams", + "PeptideDay3OptionalParams", + "PeptideDay4RequiredParams", + "PeptideDay4OptionalParams", + "PeptideDay4LCMSRequiredParams", + "PeptideDay4LCMSOptionalParams", + ): + assert hasattr(module, cls_name), cls_name + + +def test_workflow_constants_split() -> None: + module = _import_module() + assert module.DAY4_PEPTIDE_WORKFLOW_NAME == "Day4环肽酰化-酶标" + assert module.DAY4_LCMS_PEPTIDE_WORKFLOW_NAME == "Day4环肽酰化-酶标+LCMS" + assert module.DAY_WORKFLOW_BINDINGS["day4_lcms"]["sub_name"] == "Day4环肽酰化-酶标LCMS" + assert module.DAY1_CEM_METHOD_DEFAULT == "5microdouble-20250911.MPM" + + +# --------------------------------------------------------------------------- +# 2. Sample Excel +# --------------------------------------------------------------------------- + + +def test_list_sample_excels_modes() -> None: + station = _make_station() + records = [ + {"fileName": "DPR019-a.xlsx", "relativePath": "upload\\sample\\DPR019-a.xlsx"}, + {"fileName": "DPR019-b.xlsx", "relativePath": "upload\\sample\\DPR019-b.xlsx"}, + ] + station._list_sample_excels = MagicMock(return_value=records) # type: ignore[method-assign] + + info = station.list_sample_excels(sample_excel_pattern="DPR019-a", deterministic_resolve=False) + assert "sample_excel_data" in info + assert "sample_excel_relative_path" not in info + + resolved = station.list_sample_excels(sample_excel_pattern="DPR019-a", deterministic_resolve=True) + assert resolved["sample_excel_relative_path"] == "upload\\sample\\DPR019-a.xlsx" + + with pytest.raises(Exception): + station.list_sample_excels(sample_excel_pattern="DPR019", deterministic_resolve=True) + + +def test_resolve_submit_sample_file_direct_path() -> None: + station = _make_station() + relative, selected = station._resolve_submit_sample_file({}, {}, "upload/sample/x.xlsx") + assert relative == "upload\\sample\\x.xlsx" + assert selected["fileName"] == "x.xlsx" + + +def test_filename_matches_pattern_substring_and_glob() -> None: + station = _make_station() + assert station._filename_matches_pattern("DPR019-20260421-thrombin-5.xlsx", "DPR019") + assert station._filename_matches_pattern("a.xlsx", "*.xlsx") + assert not station._filename_matches_pattern("a.xlsx", "*.docx") + assert station._filename_matches_pattern("a.xlsx", "") + + +# --------------------------------------------------------------------------- +# 3. Step parameter helper +# --------------------------------------------------------------------------- + + +def test_filter_step_parameters_preserves_zero_and_skips_unknown() -> None: + station = _make_station() + records = [ + {"TaskDisplayable": 1, "Value": 0, "DisplayValue": ""}, + {"TaskDisplayable": 1, "Value": "", "DisplayValue": ""}, + {"TaskDisplayable": 0, "Value": "", "DisplayValue": ""}, + {"TaskDisplayable": None, "Value": "", "DisplayValue": ""}, + ] + filtered = station._filter_step_parameter_records(records, True, True, True) + assert {(r.get("Value"), r.get("TaskDisplayable")) for r in filtered} == {(0, 1), ("", 1), ("", 0)} + + +def test_get_step_parameters_zero_match_returns_status() -> None: + station = _make_station() + station._query_workflow_records = MagicMock(return_value=[]) # type: ignore[method-assign] + out = station.get_step_parameters(workflow_name_filter="不存在") + status = out["step_parameters_raw_json"] + assert status.get("code") == -1 + assert out["filtered_subworkflows"] == [] + + +def test_get_step_parameters_multi_match_returns_status() -> None: + station = _make_station() + station._query_workflow_records = MagicMock(return_value=[ # type: ignore[method-assign] + {"workflowId": "w1", "workflowName": "A", "subworkflowId": "s1", "subworkflowName": "A1"}, + {"workflowId": "w1", "workflowName": "A", "subworkflowId": "s2", "subworkflowName": "A2"}, + ]) + out = station.get_step_parameters(workflow_name_filter="A") + assert out["step_parameters_raw_json"].get("code") == 0 + assert len(out["filtered_subworkflows"]) == 2 + + +def test_get_step_parameters_direct_sub_workflow_id() -> None: + station = _make_station() + station._query_step_parameters = MagicMock(return_value={ # type: ignore[method-assign] + "39c78d4b-b5d3-f721-2001-9d52000084c3": [ + {"name": "S1", "m": 0, "n": 0, "parameterList": [ + {"Key": "SampleFile", "TaskDisplayable": 1, "Value": "", "DisplayValue": ""}, + ]}, + ] + }) + out = station.get_step_parameters(sub_workflow_id="39c78d4b-b5d3-f721-2001-9d52000084c3") + augmented = out["step_parameters_raw_json"] + assert augmented["code"] == 1 + assert any(p["Key"] == "SampleFile" for p in augmented["data"]["filteredParameters"]) + + +# --------------------------------------------------------------------------- +# 4. Partial parameter entries + live resolution +# --------------------------------------------------------------------------- + + +def test_partial_entries_inject_samplefile_and_overrides() -> None: + station = _make_station() + entries, warnings = station._build_partial_parameter_entries( + sample_excel_relative_path="upload\\sample\\f.xlsx", + day_key="day2", + parameter_overrides=[{"Key": "Example", "Value": 0}], + ) + assert entries[0] == {"Key": "SampleFile", "Value": "upload\\sample\\f.xlsx"} + assert any(e["Key"] == "Example" and e["Value"] == 0 for e in entries) + assert warnings == [] + + +def test_day1_partial_entries_inject_cem_default() -> None: + station = _make_station() + entries, _ = station._build_partial_parameter_entries( + sample_excel_relative_path="upload\\sample\\f.xlsx", + day_key="day1", + extra_autofill=[{"Key": "CEMMethodFileName", "Value": "5microdouble-20250911.MPM"}], + ) + assert any(e["Key"] == "CEMMethodFileName" and e["Value"] == "5microdouble-20250911.MPM" for e in entries) + + +def test_overrides_duplicate_last_write_wins_warning() -> None: + station = _make_station() + entries, warnings = station._build_partial_parameter_entries( + sample_excel_relative_path="x", + day_key="day2", + parameter_overrides=[ + {"Key": "Example", "m": 0, "n": 0, "Value": "first"}, + {"Key": "Example", "m": 0, "n": 0, "Value": "second"}, + ], + ) + example_entries = [e for e in entries if e["Key"] == "Example"] + assert len(example_entries) == 1 + assert example_entries[0]["Value"] == "second" + assert any("重复" in w for w in warnings) + + +def test_resolve_against_live_unique_match_and_failure() -> None: + station = _make_station() + resolved = station._resolve_parameter_entries_against_live_steps( + [{"Key": "SampleFile", "Value": "upload\\sample\\f.xlsx"}], FLATTENED_LIVE + ) + assert resolved[0]["step"] == "39c78d4b-b5d3-f721-2001-9d52000084c3" + assert resolved[0]["m"] == 0 and resolved[0]["n"] == 0 + # 没有 protocol 在 m/n=0/0 处 → 0 匹配 + with pytest.raises(Exception) as exc: + station._resolve_parameter_entries_against_live_steps( + [{"Key": "protocol", "m": 0, "n": 0, "Value": "v"}], FLATTENED_LIVE + ) + assert "0 条" in str(exc.value) + + +def test_group_resolved_entries_uses_lowercase_keys() -> None: + station = _make_station() + grouped = station._group_resolved_entries_to_param_values([ + {"step": "39c78d4b-b5d3-f721-2001-9d52000084c3", "Key": "SampleFile", "m": 0, "n": 0, "Value": "x"}, + ]) + step_entries = grouped["39c78d4b-b5d3-f721-2001-9d52000084c3"] + assert step_entries[0] == {"key": "SampleFile", "value": "x", "m": 0, "n": 0} + + +def test_create_order_payload_shape() -> None: + station = _make_station() + payload = station._create_order_payload( + order_code="EXP260518-103000", + order_name="实验260518-103000", + sub_workflow_id="3a1d35f9-63ce-67d6-1784-9f6abcca4eda", + param_values={"39c78d4b-b5d3-f721-2001-9d52000084c3": [{"key": "SampleFile", "value": "x", "m": 0, "n": 0}]}, + border_number=1, + extend_properties=None, + ) + assert isinstance(payload, list) and len(payload) == 1 + item = payload[0] + assert item["workFlowId"] == "3a1d35f9-63ce-67d6-1784-9f6abcca4eda" + assert item["paramValues"] + assert item["extendProperties"] == "" + assert item["borderNumber"] == 1 + + +def test_order_identity_format() -> None: + station = _make_station() + code, name = station._build_order_identity("day2") + assert code.startswith("EXP") and len(code) == 16 # EXP + YYMMDD-HHmmss + assert name.startswith("实验") + code2, name2 = station._build_order_identity("day2", "自定义") + assert name2 == "自定义" + + +# --------------------------------------------------------------------------- +# 5. Generic submit / day wrappers (含会抦住 BUG 1 的用例) +# --------------------------------------------------------------------------- + + +def _wire_submit_pipeline(station: Any) -> None: + station._resolve_workflow_binding_from_names = MagicMock(return_value={ # type: ignore[method-assign] + "workflow_name": "DAY2多肽定量", + "root_workflow_id": "3a1d35f0-9436-895b-2eda-039a5465275e", + "sub_workflow_id": "3a1d35f0-9f7e-c2c1-0bc0-8d94b81d90ca", + "sub_workflow_name": "DAY2多肽定量", + "raw": {}, + }) + station._resolve_workflow_binding = MagicMock(side_effect=lambda day_key: station._resolve_workflow_binding_from_names("DAY2多肽定量")) # type: ignore[method-assign] + station._query_step_parameters = MagicMock(return_value={}) # type: ignore[method-assign] + station._flatten_step_parameters = MagicMock(return_value=FLATTENED_LIVE) # type: ignore[method-assign] + station._create_order = MagicMock(return_value=json.dumps(CREATE_ALLOCATION)) # type: ignore[method-assign] + + +def test_submit_experiment_generic_succeeds() -> None: + """plan §「Generic And Day 1 Submit」line 919-924;这条同时抦住 BUG 1(binding= 关键字)。""" + station = _make_station() + _wire_submit_pipeline(station) + result = station.submit_experiment( + {"workflow_name": "DAY2多肽定量", "sample_excel_pattern": ""}, + {"parameter_overrides": []}, + sample_excel_relative_path="upload/sample/f.xlsx", + ) + assert result["success"] is True + assert result["order_id"] == ORDER_GUID + assert result["resultTable"]["tableName"] == "resultTable" + + +def test_submit_experiment_rejects_day1_alias() -> None: + station = _make_station() + with pytest.raises(Exception): + station.submit_experiment( + {"workflow_name": "Day1线肽合成", "sample_excel_pattern": "x"}, + {}, + sample_excel_relative_path="upload/sample/f.xlsx", + ) + + +def test_submit_experiment_day2_calls_pipeline() -> None: + station = _make_station() + _wire_submit_pipeline(station) + result = station.submit_experiment_day2( + {"sample_excel_pattern": ""}, + {"parameter_overrides": []}, + sample_excel_relative_path="upload/sample/f.xlsx", + ) + assert result["success"] is True + assert result["order_ids"] == [ORDER_GUID] + assert result["auto_register_materials"] is True + assert result["material_registration"]["status"] == "not_implemented" + + +def test_day1_placeholder_does_not_call_create_order() -> None: + station = _make_station() + station._resolve_workflow_binding = MagicMock(return_value={ # type: ignore[method-assign] + "workflow_name": "Day1线肽合成", + "root_workflow_id": "rid", + "sub_workflow_id": "sid", + "sub_workflow_name": "Day1线肽合成", + "raw": {}, + }) + station._create_order = MagicMock(side_effect=AssertionError("Day1 不应触达 create_order")) # type: ignore[method-assign] + out = station.submit_experiment_day1( + {"sample_excel_pattern": "", "cem_method_file_name": ""}, + {}, + sample_excel_relative_path="upload/sample/f.xlsx", + # 模拟人工确认框架注入的字段(这条会抦住 BUG 3) + timeout_seconds=3600, + assignee_user_ids=[], + materials_loaded=False, + ) + assert out["status"] == "manual_confirm_placeholder" + assert out["cem_method_file_name"] == "5microdouble-20250911.MPM" + assert isinstance(out["partial_parameter_entries"], list) + + +# --------------------------------------------------------------------------- +# 6. Allocation map parsing + resultTable +# --------------------------------------------------------------------------- + + +def test_parse_allocation_map_extracts_order_id_and_groups() -> None: + station = _make_station() + parsed = station._parse_create_order_allocation_map(json.dumps(CREATE_ALLOCATION)) + assert parsed["order_ids"] == [ORDER_GUID] + assert len(parsed["allocation_rows"]) == 3 + assert set(parsed["materials_by_type"].keys()) == {"Consumables", "Sample", "Future"} + + +def test_parse_allocation_map_handles_python_str_repr() -> None: + """RPC.create_order 返回的是 str(dict),含单引号。""" + station = _make_station() + parsed = station._parse_create_order_allocation_map(str(CREATE_ALLOCATION)) + assert parsed["order_ids"] == [ORDER_GUID] + + +def test_parse_allocation_map_empty() -> None: + station = _make_station() + parsed = station._parse_create_order_allocation_map("{}") + assert parsed["allocation_rows"] == [] + assert parsed["order_ids"] == [] + + +def test_build_result_table_order_and_columns() -> None: + station = _make_station() + parsed = station._parse_create_order_allocation_map(json.dumps(CREATE_ALLOCATION)) + table = station._build_result_table(parsed["materials_by_type"]) + assert table["tableName"] == "resultTable" + assert [c["key"] for c in table["columns"]] == ["whName", "locationCode", "materialName", "quantity"] + # 顺序:Sample → Consumables → Future(未知 mode 保留在末尾) + names = [row["materialName"] for row in table["data"]] + assert names == ["96孔板", "200μL枪头盒", "未知耗材"] + # locationShowName 优先 locationCode + assert table["data"][0]["locationCode"] == "A1-show" + assert table["data"][1]["locationCode"] == "1-01" + + +def test_build_result_table_empty_returns_empty_data() -> None: + station = _make_station() + table = station._build_result_table({}) + assert table["data"] == [] + assert [c["key"] for c in table["columns"]] == ["whName", "locationCode", "materialName", "quantity"] + + +def test_resolve_wh_name_handles_material_info_failure() -> None: + station = _make_station() + station.hardware_interface.material_info.side_effect = RuntimeError("HTTP 500") + cache: Dict[str, Dict[str, Any]] = {} + assert station._resolve_wh_name_by_material_id("mat-1", cache) == "" + + +def test_submit_returns_warning_when_allocation_empty() -> None: + station = _make_station() + _wire_submit_pipeline(station) + station._create_order = MagicMock(return_value="{}") # type: ignore[method-assign] + result = station.submit_experiment_day2( + {"sample_excel_pattern": ""}, + {}, + sample_excel_relative_path="upload/sample/f.xlsx", + ) + assert "create_order_allocation_unavailable_for_result_table" in result["warnings"] + + +# --------------------------------------------------------------------------- +# 7. Reports + workflow records +# --------------------------------------------------------------------------- + + +def test_get_order_list_passes_json_string() -> None: + station = _make_station() + station.hardware_interface.order_query.return_value = {"items": [], "totalCount": 0} + station.get_order_list(filter_text="abc", page_count=10) + args, kwargs = station.hardware_interface.order_query.call_args + payload = json.loads(args[0]) + assert payload["filter"] == "abc" + assert payload["pageCount"] == 10 + + +def test_get_order_report_calls_typed_rpc() -> None: + station = _make_station() + station.hardware_interface.order_report.return_value = {"id": ORDER_GUID, "name": "x", "preIntakes": [], "resultList": []} + out = station.get_order_report(ORDER_GUID) + station.hardware_interface.order_report.assert_called_once_with(ORDER_GUID) + assert out["success"] is True + assert out["summary"]["id"] == ORDER_GUID + + +def test_get_aggregated_order_report_is_todo_placeholder() -> None: + station = _make_station() + out = station.get_aggregated_order_report(ORDER_GUID) + assert out["status"] == "not_implemented" + + +def test_query_workflow_records_filters_unsaved_subworkflows() -> None: + station = _make_station() + station.hardware_interface.query_workflow.return_value = { + "items": [ + { + "id": "rid", + "name": "Day3线肽环化", + "subWorkflows": [ + {"id": "saved-id", "name": "Day3线肽环化", "isSaved": True}, + {"id": "draft-id", "name": "Day3线肽环化-草稿", "isSaved": False}, + ], + } + ] + } + records = station._query_workflow_records("Day3线肽环化") + assert [r["subworkflowId"] for r in records] == ["saved-id"] + + +# --------------------------------------------------------------------------- +# 8. Debug / fetch_workflow_list 守护 +# --------------------------------------------------------------------------- + + +def test_module_fetch_workflow_list_is_debug_guarded() -> None: + module = _import_module() + assert module.DEBUG_CLI_ENABLED is False + with pytest.raises(AssertionError): + module.fetch_workflow_list(config={"api_host": "http://x", "api_key": "k"}) + + +def test_station_fetch_workflow_list_uses_rpc() -> None: + station = _make_station() + station.hardware_interface.query_workflow.return_value = {"items": [], "totalCount": 0} + station.fetch_workflow_list(filter_text="Day2") + args, _ = station.hardware_interface.query_workflow.call_args + payload = json.loads(args[0]) + assert payload["filter"] == "Day2" + assert payload["includeDetail"] is True + + +# --------------------------------------------------------------------------- +# 9. start_experiment 装载闸门 +# --------------------------------------------------------------------------- + + +def test_start_experiment_blocks_when_materials_not_loaded() -> None: + station = _make_station() + station.hardware_interface.scheduler_start.return_value = 1 + with pytest.raises(RuntimeError): + station.start_experiment( + order_id=ORDER_GUID, + resultTable={"data": [{"materialName": "x"}]}, + materials_loaded=False, + ) + + +def test_start_experiment_starts_when_table_empty() -> None: + station = _make_station() + station.hardware_interface.scheduler_start.return_value = 1 + result = station.start_experiment(order_id=ORDER_GUID, resultTable={"data": []}) + assert result["success"] is True + assert result["order_ids"] == [ORDER_GUID] + + +# --------------------------------------------------------------------------- +# 10. Reset +# --------------------------------------------------------------------------- + + +def test_reset_dry_run_default_false_and_planned_calls() -> None: + station = _make_station() + sig = inspect.signature(station.reset) + assert sig.parameters["dry_run"].default is False + out = station.reset(reset_operations=["scheduler_reset"], dry_run=True) + assert out["dry_run"] is True + assert out["planned_calls"][0]["endpoint"] == "/api/lims/scheduler/reset" + + +def test_reset_executes_typed_rpc_calls() -> None: + station = _make_station() + station.hardware_interface.scheduler_reset.return_value = 1 + station.hardware_interface.reset_order_status.return_value = 1 + station.hardware_interface.reset_location.return_value = 1 + out = station.reset( + reset_operations=["scheduler_reset", "reset_order_status", "reset_location"], + dry_run=False, + order_id=ORDER_GUID, + location_id="loc-1", + ) + station.hardware_interface.scheduler_reset.assert_called_once_with() + station.hardware_interface.reset_order_status.assert_called_once_with(ORDER_GUID) + station.hardware_interface.reset_location.assert_called_once_with("loc-1") + assert len(out["executed_calls"]) == 3