From 1519a7d9854cbd0be4735201b2bd97e122855a41 Mon Sep 17 00:00:00 2001 From: yxz321 Date: Fri, 1 May 2026 01:50:09 +0800 Subject: [PATCH] feat: RNA add Bioyond siRNA station resources and Experiment 1 submission - Add siRNA station runtime, decorator metadata, and lazy init - Implement Experiment 1 submit, start, and reset flows - Add siRNA deck and numeric warehouse stack resources - Move siRNA example config to temp_benyao --- .../sirna_station/sirna_station.py | 1103 ++++++++++++++++- unilabos/resources/bioyond/decks.py | 53 +- unilabos/resources/bioyond/warehouses.py | 64 + 3 files changed, 1211 insertions(+), 9 deletions(-) diff --git a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py index f055c2c1..8be5e576 100644 --- a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py +++ b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py @@ -3,12 +3,16 @@ from __future__ import annotations import argparse +import ast +import copy import json +import os import sys from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple from urllib import error, request +from uuid import UUID if __package__ in {None, ""}: repo_root = Path(__file__).resolve().parents[5] @@ -18,15 +22,51 @@ if __package__ in {None, ""}: from unilabos.utils.log import logger try: + from unilabos.resources.bioyond.decks import BIOYOND_SirnaStation_Deck as _SIRNA_DECK_CLASS +except Exception: # pragma: no cover - 允许无 pylabrobot 依赖时导入轻量 helper + _SIRNA_DECK_CLASS = None + +try: + from unilabos.registry.decorators import NodeType, action, device, not_action + _REGISTRY_IMPORT_ERROR: Optional[Exception] = None +except Exception as exc: # pragma: no cover - 允许无完整依赖时导入轻量 helper + _REGISTRY_IMPORT_ERROR = exc + + class NodeType: # type: ignore[no-redef] + MANUAL_CONFIRM = "manual_confirm" + + def device(*args: Any, **kwargs: Any): + def decorator(cls): + return cls + + return decorator + + def action(*args: Any, **kwargs: Any): + if len(args) == 1 and callable(args[0]): + return args[0] + + def decorator(func): + return func + + return decorator + + def not_action(func): + return func + +try: + from unilabos.devices.workstation.workstation_base import WorkstationBase from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation _BIOYOND_IMPORT_ERROR: Optional[Exception] = None except Exception as exc: # pragma: no cover - 允许在轻量探测模式下运行配置辅助函数 + WorkstationBase = object # type: ignore[assignment,misc] BioyondWorkstation = object # type: ignore[assignment,misc] _BIOYOND_IMPORT_ERROR = exc WORKFLOW_LIST_ENDPOINT = "/api/lims/workflow/work-flow-list" SUPPORTED_WORKFLOW_TYPES = {0, 1, 2} +DEFAULT_READY_SIGNAL = "READY" +DEFAULT_RESET_OPERATIONS = ("scheduler_reset", "reset_order_status", "reset_location") def _utc_now_iso8601_ms() -> str: @@ -124,13 +164,21 @@ def fetch_workflow_list( return result +@device( + id="bioyond_sirna_station", + category=["workstation", "bioyond", "bioyond_sirna_station"], + description="Bioyond 小核酸工作站", + display_name="Bioyond Sirna Station", + icon="preparation_station.webp", +) class BioyondSirnaStation(BioyondWorkstation): """小核酸工作站最小运行时实现。""" def __init__( self, bioyond_config: Optional[Dict[str, Any]] = None, - config: Optional[Dict[str, Any]] = None, + config: Optional[Any] = None, + config_path: Optional[str | Path] = None, deck: Optional[Any] = None, protocol_type: Optional[Any] = None, **kwargs: Any, @@ -138,12 +186,18 @@ class BioyondSirnaStation(BioyondWorkstation): if _BIOYOND_IMPORT_ERROR is not None: raise RuntimeError(f"BioyondSirnaStation 基类导入失败: {_BIOYOND_IMPORT_ERROR}") from _BIOYOND_IMPORT_ERROR + kwargs.pop("children", None) merged_config: Dict[str, Any] = {} - if config: + if config_path: + merged_config.update(load_sirna_config(config_path)) + if isinstance(config, (str, Path)): + merged_config.update(load_sirna_config(config)) + elif config: merged_config.update(config) if bioyond_config: merged_config.update(bioyond_config) merged_config.update(kwargs) + self._apply_env_api_config(merged_config) self.protocol_type = protocol_type self.bioyond_config = merged_config @@ -152,10 +206,35 @@ class BioyondSirnaStation(BioyondWorkstation): logger.info(f" - API Host: {self.bioyond_config.get('api_host', '')}") logger.info(f" - Workflow 映射数量: {len(self.bioyond_config.get('workflow_mappings', {}))}") - super().__init__(bioyond_config=self.bioyond_config, deck=deck) + self._lazy_frontend_init = deck is None or not self._has_required_api_config(self.bioyond_config) + if self._lazy_frontend_init: + WorkstationBase.__init__(self, deck=deck) + self.is_running = False + self.workflow_mappings = {} + self.workflow_sequence = [] + self.pending_task_params = [] + self.http_service = None + self.connection_monitor = None + self._http_service_config = self.bioyond_config.get("http_service_config", {}) + if "workflow_mappings" in self.bioyond_config: + self.workflow_mappings = dict(self.bioyond_config.get("workflow_mappings") or {}) + logger.warning( + "BioyondSirnaStation 以延迟模式初始化:缺少 deck 或 api_host/api_key," + "设备会先注册动作,首次调用动作时再检查 Bioyond API 配置。" + ) + else: + super().__init__(bioyond_config=self.bioyond_config, deck=deck) logger.info("BioyondSirnaStation 初始化完成") + @not_action + def post_init(self, ros_node: Any) -> None: + if getattr(self, "_lazy_frontend_init", False): + WorkstationBase.post_init(self, ros_node) + logger.warning("BioyondSirnaStation 延迟模式跳过 Bioyond 后台同步和 HTTP 服务启动") + return + super().post_init(ros_node) + def fetch_workflow_list( self, workflow_type: int = 0, @@ -174,6 +253,1022 @@ class BioyondSirnaStation(BioyondWorkstation): logger.info("正在通过 Bioyond RPC 查询小核酸工作流列表") return self.hardware_interface.query_workflow(json.dumps(payload_data, ensure_ascii=False)) + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={"timeout_seconds": 3600, "assignee_user_ids": []}, + feedback_interval=300, + description="复位小核酸实验前状态", + ) + def reset( + self, + reset_operations: Optional[ + List[Literal["scheduler_reset", "reset_order_status", "reset_location"]] + ] = None, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """复位调度器、订单状态和库位,并按清理读回结果决定是否 take-out。""" + reset_order_id = self._kwarg_text(kwargs, "reset_order_id") + reset_location_id = self._kwarg_text(kwargs, "reset_location_id") + cleanup_order_code = self._kwarg_text(kwargs, "cleanup_order_code") + api_host = self._kwarg_text(kwargs, "api_host") + api_key = self._kwarg_text(kwargs, "api_key") + ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL + del timeout_seconds, assignee_user_ids + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + self._require_ready_signal(ready_signal) + rpc = self._require_hardware_interface_for_reset() + result = self._run_reset_operations( + rpc, + reset_operations=reset_operations, + reset_order_id=reset_order_id, + reset_location_id=reset_location_id, + cleanup_order_code=cleanup_order_code, + ) + return self._with_ready_signal(result) + + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={"timeout_seconds": 3600, "assignee_user_ids": []}, + feedback_interval=300, + description="提交小核酸实验", + ) + def submit_experiment( + self, + workflow_name: str = "", + sub_workflow_name: str = "", + order_code: str = "", + order_name: str = "", + sample_throughput: int = 4, + parameter_overrides: Optional[Dict[str, Any]] = None, + include_all_task_displayable: bool = False, + reset_before_create: bool = True, + reset_operations: Optional[ + List[Literal["scheduler_reset", "reset_order_status", "reset_location"]] + ] = None, + reset_order_id: str = "", + reset_location_id: str = "", + cleanup_order_code: str = "", + api_host: str = "", + api_key: str = "", + ready_signal: str = "READY", + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """实时查询 LIMS 工作流和步骤参数,构造并提交 create-order。""" + del timeout_seconds, assignee_user_ids, kwargs + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + self._require_ready_signal(ready_signal) + rpc = self._require_hardware_interface("create_order") + workflow = self._resolve_experiment_workflow( + rpc, + workflow_name=workflow_name, + sub_workflow_name=sub_workflow_name, + ) + step_data = rpc.workflow_step_query(workflow["sub_workflow_id"]) + param_values, parameter_template = self._build_param_values_from_step_data( + step_data, + parameter_overrides=parameter_overrides or [], + include_all_task_displayable=include_all_task_displayable, + ) + if not param_values: + raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues") + + resolved_order_code, resolved_order_name = self._build_bioyond_order_identity(order_code, order_name) + order_payload = [ + { + "orderCode": resolved_order_code, + "orderName": resolved_order_name, + "borderNumber": int(sample_throughput), + "workFlowId": workflow["sub_workflow_id"], + "paramValues": param_values, + "extendProperties": "", + } + ] + + reset_result = None + if reset_before_create: + reset_result = self._run_reset_operations( + rpc, + reset_operations=reset_operations, + reset_order_id=reset_order_id, + reset_location_id=reset_location_id, + cleanup_order_code=cleanup_order_code, + ) + + logger.info(f"正在提交小核酸实验: {resolved_order_name} ({resolved_order_code})") + raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False)) + parsed_result = self._parse_lims_result(raw_result) + material_records = self._extract_create_order_materials(parsed_result) + suggested_locations = self._extract_suggested_locations(material_records) + order_ids = self._extract_created_order_ids(parsed_result) + self._last_submitted_order_ids = list(order_ids) + self._last_submitted_order_code = resolved_order_code + start_experiment_info = { + "order_ids": order_ids, + "order_code": resolved_order_code, + "order_name": resolved_order_name, + "workflow": workflow, + } + result = { + "success": self._create_result_success(parsed_result, order_ids, material_records), + "order_code": resolved_order_code, + "order_name": resolved_order_name, + "order_ids": order_ids, + "workflow": workflow, + "sample_throughput": int(sample_throughput), + "reset_result": reset_result, + "payload": order_payload, + "parameter_template": parameter_template, + "create_order_result": parsed_result, + "materials": material_records, + "suggested_locations": suggested_locations, + "start_experiment": start_experiment_info, + "confirmation_message": self._format_create_order_confirmation( + order_code=resolved_order_code, + order_name=resolved_order_name, + workflow=workflow, + order_ids=order_ids, + material_records=material_records, + suggested_locations=suggested_locations, + ), + } + return self._with_ready_signal(result) + + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={"timeout_seconds": 3600, "assignee_user_ids": []}, + feedback_interval=300, + description="启动小核酸实验调度", + ) + def start_experiment( + self, + submit_experiment_result: Optional[Dict[str, Any]] = None, + order_id: str = "", + order_ids: Optional[List[str]] = None, + api_host: str = "", + api_key: str = "", + ready_signal: str = "READY", + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """启动 Bioyond 调度器。""" + del timeout_seconds, assignee_user_ids, kwargs + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + self._require_ready_signal(ready_signal) + start_info = self._resolve_start_experiment_info(submit_experiment_result, order_id, order_ids) + rpc = self._require_hardware_interface("scheduler_start") + logger.info("正在启动小核酸调度器") + result = rpc.scheduler_start() + return self._with_ready_signal({ + "success": result == 1, + "return_info": result, + "scheduler_start_result": result, + "start_experiment": start_info, + "confirmation_message": "调度器启动成功" if result == 1 else "调度器启动失败,请检查 LIMS 状态", + }) + + def _require_hardware_interface(self, method_name: str) -> Any: + rpc = getattr(self, "hardware_interface", None) + if rpc is None: + rpc = self._initialize_hardware_interface_from_config() + if not hasattr(rpc, method_name): + raise RuntimeError(f"Bioyond RPC 客户端缺少 {method_name} 方法") + return rpc + + def _require_hardware_interface_for_reset(self) -> Any: + rpc = getattr(self, "hardware_interface", None) + if rpc is None: + rpc = self._initialize_hardware_interface_from_config() + return rpc + + def _initialize_hardware_interface_from_config(self) -> Any: + self._apply_env_api_config(self.bioyond_config) + if not self._has_required_api_config(self.bioyond_config): + raise RuntimeError( + "Bioyond RPC 客户端未初始化,且缺少 api_host/api_key。" + "请在前端节点 config 中配置 api_host、api_key,或在动作参数中传入 api_host、api_key。" + ) + from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC + + self.hardware_interface = BioyondV1RPC(self.bioyond_config) + return self.hardware_interface + + def _has_required_api_config(self, config: Dict[str, Any]) -> bool: + return not self._is_blank(config.get("api_host")) and not self._is_blank(config.get("api_key")) + + def _apply_env_api_config(self, config: Dict[str, Any]) -> None: + env_pairs = { + "api_host": ("BIOYOND_SIRNA_API_HOST", "BIOYOND_SIRNA_EXP1_API_HOST"), + "api_key": ("BIOYOND_SIRNA_API_KEY", "BIOYOND_SIRNA_EXP1_API_KEY"), + } + for key, env_names in env_pairs.items(): + if not self._is_blank(config.get(key)): + continue + for env_name in env_names: + value = os.environ.get(env_name) + if not self._is_blank(value): + config[key] = value + break + + def _update_runtime_api_config(self, api_host: str = "", api_key: str = "") -> None: + changed = False + if not self._is_blank(api_host) and self.bioyond_config.get("api_host") != api_host: + self.bioyond_config["api_host"] = api_host + changed = True + if not self._is_blank(api_key) and self.bioyond_config.get("api_key") != api_key: + self.bioyond_config["api_key"] = api_key + changed = True + if changed: + self.hardware_interface = None + + def _config_value(self, *keys: str) -> Optional[str]: + config = getattr(self, "bioyond_config", {}) or {} + for key in keys: + value = config.get(key) + if not self._is_blank(value): + return str(value) + return None + + def _resolve_experiment_workflow( + self, + rpc: Any, + workflow_name: str = "", + sub_workflow_name: str = "", + ) -> Dict[str, str]: + workflow_name = workflow_name or self._config_value("experiment_1_workflow_name", "sirna_exp1_workflow_name") or "" + sub_workflow_name = ( + sub_workflow_name + or self._config_value("experiment_1_sub_workflow_name", "sirna_exp1_sub_workflow_name") + or "" + ) + workflow_query = {"type": 0, "filter": workflow_name or sub_workflow_name, "includeDetail": True} + workflow_data = rpc.query_workflow(json.dumps(workflow_query, ensure_ascii=False)) + workflow_items = self._workflow_items(workflow_data) + roots_with_children = [item for item in workflow_items if self._sub_workflow_records(item)] + root_candidates = roots_with_children or workflow_items + if not workflow_name and sub_workflow_name: + roots_matching_sub = [ + item + for item in root_candidates + if self._select_workflow_record(self._sub_workflow_records(item), sub_workflow_name) + ] + if roots_matching_sub: + root_candidates = roots_matching_sub + root = self._select_workflow_record(root_candidates, workflow_name) + if not root: + raise RuntimeError("未从 LIMS 查询到可用的小核酸工作流") + sub = self._select_workflow_record(self._sub_workflow_records(root), sub_workflow_name) + if not sub: + label = self._record_name(root) or workflow_name or self._record_id(root) + raise RuntimeError(f"工作流 {label} 缺少可用子工作流") + sub_id = self._record_id(sub) + self._require_uuid(sub_id, "workFlowId") + return { + "workflow_name": self._record_name(root) or workflow_name, + "root_workflow_id": self._record_id(root), + "sub_workflow_name": self._record_name(sub) or sub_workflow_name, + "sub_workflow_id": sub_id, + } + + def _resolve_experiment_1_workflow(self, rpc: Any) -> Dict[str, str]: + return self._resolve_experiment_workflow(rpc) + + def _build_param_values_from_step_data( + self, + step_data: Any, + parameter_overrides: Any, + include_all_task_displayable: bool, + ) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]: + parameter_map = self._extract_workflow_parameter_map(step_data) + if not isinstance(parameter_map, dict): + raise RuntimeError("workflow_step_query 未返回可解析的步骤参数") + param_values: Dict[str, List[Dict[str, Any]]] = {} + parameter_template: List[Dict[str, Any]] = [] + for step_id, value in parameter_map.items(): + if not self._looks_like_uuid(step_id): + continue + entries: List[Dict[str, Any]] = [] + for module in self._as_list(value): + if not isinstance(module, dict): + continue + module_m = module.get("m") + module_n = module.get("n") + for parameter in self._as_list(module.get("parameterList") or module.get("ParameterList")): + if not isinstance(parameter, dict): + continue + key = self._parameter_key(parameter) + if not key: + continue + task_displayable = parameter.get("TaskDisplayable", parameter.get("taskDisplayable", 1)) + parameter_type = str(parameter.get("Type") or parameter.get("type") or "") + template_item = { + "step_id": str(step_id), + "module": module.get("name") or module.get("moduleName") or "", + "m": module_m, + "n": module_n, + "key": key, + "type": parameter_type, + "task_displayable": task_displayable, + "value": self._value_for_create_order(parameter), + } + parameter_template.append(template_item) + if parameter_type.lower() == "hidden" or task_displayable == 0: + continue + if not include_all_task_displayable and key != "protocolName": + continue + value_for_create_order = self._value_for_create_order(parameter) + if self._is_blank(value_for_create_order): + continue + entry: Dict[str, Any] = {"key": key, "value": "" if self._is_blank(value_for_create_order) else str(value_for_create_order)} + m_value = parameter.get("m", module_m) + n_value = parameter.get("n", module_n) + if not self._is_blank(m_value): + entry["m"] = int(m_value) + if not self._is_blank(n_value): + entry["n"] = int(n_value) + entries.append(entry) + if entries: + param_values[str(step_id)] = entries + self._apply_parameter_overrides(param_values, parameter_overrides) + return param_values, parameter_template + + def _value_for_create_order(self, parameter: Dict[str, Any]) -> Any: + value = parameter.get("Value") if "Value" in parameter else parameter.get("value") + display_value = parameter.get("DisplayValue") if "DisplayValue" in parameter else parameter.get("displayValue") + if self._is_blank(value) and not self._is_blank(display_value): + return display_value + return value + + def _apply_parameter_overrides( + self, + param_values: Dict[str, List[Dict[str, Any]]], + overrides: Any, + ) -> None: + if not overrides: + return + if isinstance(overrides, dict): + override_items = [(str(key), value) for key, value in overrides.items()] + else: + override_items = [] + for override in self._as_list(overrides): + if not override: + continue + if isinstance(override, dict): + override_items.extend((str(key), value) for key, value in override.items()) + continue + if "=" not in str(override): + raise ValueError(f"参数覆盖必须使用 key=value 格式: {override!r}") + key, value = str(override).split("=", 1) + override_items.append((key, value)) + + for key, value in override_items: + matched = False + for entries in param_values.values(): + for entry in entries: + if entry.get("key") == key: + entry["value"] = value + matched = True + if not matched: + raise ValueError(f"paramValues 中找不到可覆盖参数: {key}") + + def _build_bioyond_order_identity( + self, + order_code: str = "", + order_name: str = "", + ) -> Tuple[str, str]: + if order_code and order_name: + return order_code, order_name + prefix = self._config_value( + "experiment_1_order_prefix", + "sirna_exp1_order_prefix", + "sirna_exp1_order_code_prefix", + ) or "test" + suffix = datetime.now().strftime("%m%d%H%M%S") + value = f"{prefix}{suffix}" + return order_code or value, order_name or value + + def _build_experiment1_order_identity(self) -> Tuple[str, str]: + return self._build_bioyond_order_identity() + + def _parse_experiment1_create_result(self, result: Any, order_code: Optional[str] = None) -> Dict[str, Any]: + parsed_result = self._parse_lims_result(result) + material_records = self._extract_create_order_materials(parsed_result) + suggested_locations = self._extract_suggested_locations(material_records) + order_ids = self._extract_created_order_ids(parsed_result) + return { + "order_id": order_ids[0] if order_ids else None, + "order_ids": order_ids, + "order_code": order_code, + "materials": material_records, + "suggested_locations": suggested_locations, + "raw_result": parsed_result, + } + + def _reset_before_experiment_create(self, rpc: Any) -> Dict[str, Any]: + return self._run_reset_operations(rpc) + + def _run_reset_operations( + self, + rpc: Any, + reset_operations: Optional[List[str]] = None, + reset_order_id: str = "", + reset_location_id: str = "", + cleanup_order_code: str = "", + ) -> Dict[str, Any]: + operations = self._normalize_reset_operations(reset_operations) + cleanup_order_code = cleanup_order_code or self._config_value( + "experiment_1_cleanup_order_code", + "sirna_exp1_cleanup_order_code", + "experiment_1_reset_order_filter", + "sirna_exp1_reset_order_filter", + ) or "" + skipped_operations: List[Dict[str, str]] = [] + if "reset_order_status" in operations: + reset_order_id = self._resolve_reset_order_id(rpc, reset_order_id, cleanup_order_code) + if not reset_order_id: + skipped_operations.append({ + "operation": "reset_order_status", + "reason": "未查询到可复位订单,跳过订单状态复位", + }) + if "reset_location" in operations: + reset_location_id = self._resolve_reset_location_id(rpc, reset_location_id) + + result: Dict[str, Any] = { + "selected_operations": operations, + "reset_order_id": reset_order_id, + "reset_location_id": reset_location_id, + } + if skipped_operations: + result["skipped_operations"] = skipped_operations + if "scheduler_reset" in operations: + self._require_rpc_method(rpc, "scheduler_reset") + result["scheduler_reset"] = rpc.scheduler_reset() + if "reset_order_status" in operations and reset_order_id: + self._require_rpc_method(rpc, "reset_order_status") + result["reset_order_status"] = rpc.reset_order_status(reset_order_id) + if "reset_location" in operations: + self._require_rpc_method(rpc, "reset_location") + result["reset_location"] = rpc.reset_location(reset_location_id) + + if "reset_order_status" in operations and reset_order_id: + snapshot = self._query_order_snapshot(rpc, cleanup_order_code or reset_order_id) + targets = self._extract_takeout_targets(snapshot, reset_order_id) + result["cleanup_targets"] = targets + if targets["requires_take_out"]: + result["take_out"] = self._take_out_remaining_materials(rpc, targets) + return result + + def _resolve_reset_order_id(self, rpc: Any, reset_order_id: str = "", cleanup_order_code: str = "") -> str: + explicit_order_id = reset_order_id or self._config_value( + "experiment_1_reset_order_id", + "sirna_exp1_reset_order_id", + ) or "" + if explicit_order_id: + return explicit_order_id + + filter_text = cleanup_order_code or self._config_value( + "experiment_1_cleanup_order_code", + "sirna_exp1_cleanup_order_code", + "experiment_1_reset_order_filter", + "sirna_exp1_reset_order_filter", + "experiment_1_reset_order_code", + "sirna_exp1_reset_order_code", + "experiment_1_reset_order_name", + "sirna_exp1_reset_order_name", + ) or "" + if filter_text: + snapshot = self._query_order_snapshot(rpc, filter_text) + order_item = self._select_reset_order_item(snapshot, filter_text) + if order_item: + return str(order_item["id"]) + logger.warning(f"未能通过订单筛选条件 {filter_text!r} 查询到可复位订单,跳过 reset_order_status") + return "" + + last_order_ids = [ + str(order_id) + for order_id in self._as_list(getattr(self, "_last_submitted_order_ids", [])) + if not self._is_blank(order_id) + ] + if last_order_ids: + return last_order_ids[0] + + logger.warning( + "不能自动确定 reset_order_status 的订单ID。" + "请在站点配置中提供 experiment_1_reset_order_filter / experiment_1_cleanup_order_code," + "或先通过 submit_experiment 产生上游订单信息。本次跳过订单状态复位。" + ) + return "" + + def _resolve_reset_location_id(self, rpc: Any, reset_location_id: str = "") -> str: + explicit_location_id = reset_location_id or self._config_value( + "experiment_1_reset_location_id", + "sirna_exp1_reset_location_id", + ) or "" + if explicit_location_id: + return explicit_location_id + + location_code = self._config_value( + "experiment_1_reset_location_code", + "sirna_exp1_reset_location_code", + "reset_location_code", + ) or "" + location_name = self._config_value( + "experiment_1_reset_location_name", + "sirna_exp1_reset_location_name", + "reset_location_name", + ) or "" + warehouse_name = self._config_value( + "experiment_1_reset_location_warehouse_name", + "sirna_exp1_reset_location_warehouse_name", + "reset_location_warehouse_name", + ) or "" + + selector = location_code or location_name + if not selector: + raise RuntimeError( + "不能自动确定 reset_location 的库位ID。" + "请在站点配置中提供 experiment_1_reset_location_code/name " + "以及可选 experiment_1_reset_location_warehouse_name。" + ) + + mapped_location_id = self._location_id_from_mapping(rpc, selector) + if mapped_location_id: + return mapped_location_id + + inventory = self._query_location_inventory(rpc) + location = self._select_location_from_inventory( + inventory, + location_code=location_code, + location_name=location_name, + warehouse_name=warehouse_name, + ) + if location and location.get("id"): + return str(location["id"]) + + label = f"{warehouse_name}/{selector}" if warehouse_name else selector + raise RuntimeError(f"未能通过库位选择条件 {label!r} 查询到 reset-location 库位ID") + + def _select_reset_order_item(self, order_snapshot: Any, filter_text: str) -> Optional[Dict[str, Any]]: + items = self._order_items(order_snapshot) + if not items: + return None + exact_candidates = [ + item + for item in items + if any(str(item.get(key) or "") == filter_text for key in ("id", "orderCode", "code", "name")) + ] + candidates = exact_candidates or items + return self._latest_order_item(candidates) + + def _latest_order_item(self, items: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + if not items: + return None + timestamp_keys = ("creationTime", "requestTime", "startPreparationTime", "completeTime") + + def sort_key(item: Dict[str, Any]) -> str: + for key in timestamp_keys: + value = item.get(key) + if value: + return str(value) + return "" + + return sorted(items, key=sort_key, reverse=True)[0] + + def _location_id_from_mapping(self, rpc: Any, selector: str) -> str: + mapping = getattr(rpc, "location_mapping", None) + if isinstance(mapping, dict): + value = mapping.get(selector) + if value: + return str(value) + return "" + + def _query_location_inventory(self, rpc: Any) -> List[Dict[str, Any]]: + for method_name in ("locations_by_type", "query_locations_by_type"): + method = getattr(rpc, method_name, None) + if callable(method): + response = method(type=0, typeMode=0, materialType=0) + return self._location_inventory_items(response) + + if all(hasattr(rpc, attr) for attr in ("get", "host")): + response = rpc.get( + url=f"{rpc.host}/api/storage/location/locations-by-type", + params={"type": 0, "typeMode": 0, "materialType": 0}, + headers={"Accept": "application/json"}, + ) + return self._location_inventory_items(response) + + raise RuntimeError("Bioyond RPC 客户端缺少库位库存查询能力,不能自动解析 reset_location_id") + + def _location_inventory_items(self, response: Any) -> List[Dict[str, Any]]: + parsed = self._parse_lims_result(response) + if isinstance(parsed, dict) and isinstance(parsed.get("data"), list): + return [item for item in parsed["data"] if isinstance(item, dict)] + if isinstance(parsed, list): + return [item for item in parsed if isinstance(item, dict)] + return [] + + def _select_location_from_inventory( + self, + warehouses: List[Dict[str, Any]], + location_code: str = "", + location_name: str = "", + warehouse_name: str = "", + ) -> Optional[Dict[str, Any]]: + selector = location_code or location_name + matches: List[Dict[str, Any]] = [] + for warehouse in warehouses: + current_warehouse_name = str(warehouse.get("name") or warehouse.get("warehouseName") or "") + if warehouse_name and current_warehouse_name != warehouse_name: + continue + for location in self._as_list(warehouse.get("locations")): + if not isinstance(location, dict): + continue + values = { + str(location.get("id") or ""), + str(location.get("code") or ""), + str(location.get("name") or ""), + str(location.get("locationShowName") or ""), + str(location.get("locationCode") or ""), + } + if selector in values: + item = dict(location) + item.setdefault("warehouseName", current_warehouse_name) + matches.append(item) + if not matches: + return None + if len(matches) > 1 and not warehouse_name: + raise RuntimeError(f"库位选择条件 {selector!r} 匹配到多个堆栈,请配置 reset_location_warehouse_name") + return matches[0] + + def _normalize_reset_operations(self, reset_operations: Optional[List[str]]) -> List[str]: + values = reset_operations or list(DEFAULT_RESET_OPERATIONS) + aliases = { + "scheduler": "scheduler_reset", + "order": "reset_order_status", + "order_status": "reset_order_status", + "location": "reset_location", + "storage": "reset_location", + } + normalized: List[str] = [] + for operation in values: + key = str(operation).strip() + if not key: + continue + key = aliases.get(key, key) + if key not in DEFAULT_RESET_OPERATIONS: + raise ValueError(f"未知复位操作: {operation!r}; 支持 {list(DEFAULT_RESET_OPERATIONS)}") + if key not in normalized: + normalized.append(key) + return normalized + + def _require_rpc_method(self, rpc: Any, method_name: str) -> None: + if not hasattr(rpc, method_name): + raise RuntimeError(f"Bioyond RPC 客户端缺少 {method_name} 方法") + + def _require_ready_signal(self, ready_signal: str) -> None: + if str(ready_signal).strip().upper() != DEFAULT_READY_SIGNAL: + raise RuntimeError(f"小核酸工作流需要收到 {DEFAULT_READY_SIGNAL} 信号,当前为: {ready_signal!r}") + + def _with_ready_signal(self, result: Dict[str, Any]) -> Dict[str, Any]: + payload = dict(result) + payload["ready"] = DEFAULT_READY_SIGNAL + payload["signal"] = DEFAULT_READY_SIGNAL + payload["ready_signal"] = DEFAULT_READY_SIGNAL + payload["received_ready_signal"] = DEFAULT_READY_SIGNAL + return payload + + def _resolve_start_experiment_info( + self, + submit_experiment_result: Optional[Dict[str, Any]], + order_id: str, + order_ids: Optional[List[str]], + ) -> Dict[str, Any]: + payload = submit_experiment_result or {} + start_info = payload.get("start_experiment") if isinstance(payload, dict) else None + if isinstance(start_info, dict): + resolved_order_ids = [str(candidate) for candidate in self._as_list(start_info.get("order_ids")) if candidate] + if start_info.get("order_id"): + resolved_order_ids.append(str(start_info["order_id"])) + resolved_order_ids = list(dict.fromkeys(resolved_order_ids)) + if resolved_order_ids: + resolved = dict(start_info) + resolved["order_ids"] = resolved_order_ids + return resolved + resolved_order_ids = list(order_ids or []) + if order_id: + resolved_order_ids.insert(0, order_id) + if isinstance(payload, dict): + for candidate in self._as_list(payload.get("order_ids")): + if candidate: + resolved_order_ids.append(str(candidate)) + if payload.get("order_id"): + resolved_order_ids.append(str(payload["order_id"])) + resolved_order_ids = list(dict.fromkeys(resolved_order_ids)) + if not resolved_order_ids: + raise RuntimeError("启动实验需要 submit_experiment 上游结果,或显式提供 order_id/order_ids") + return {"order_ids": resolved_order_ids} + + def _query_order_snapshot(self, rpc: Any, filter_text: str) -> Any: + self._require_rpc_method(rpc, "order_query") + query = { + "timeType": "", + "beginTime": None, + "endTime": None, + "status": "", + "filter": filter_text, + "skipCount": 0, + "pageCount": 20, + "sorting": "", + } + return rpc.order_query(json.dumps(query, ensure_ascii=False)) + + def _extract_takeout_targets(self, order_snapshot: Any, fallback_order_id: str) -> Dict[str, Any]: + order_id = fallback_order_id + preintake_ids = set() + material_ids = set() + for item in self._order_items(order_snapshot): + order_id = str(item.get("id") or order_id) + for preintake in self._as_list(item.get("preIntakes")): + if not isinstance(preintake, dict): + continue + if preintake.get("id"): + preintake_ids.add(str(preintake["id"])) + if preintake.get("materialId"): + material_ids.add(str(preintake["materialId"])) + material_ids_text = str(preintake.get("materialIds") or "") + for material_id in material_ids_text.replace(";", "|").replace(",", "|").split("|"): + if material_id.strip(): + material_ids.add(material_id.strip()) + for sample in self._as_list(preintake.get("sampleMaterials")): + if isinstance(sample, dict) and sample.get("materialId"): + material_ids.add(str(sample["materialId"])) + return { + "order_id": order_id, + "preintake_ids": sorted(preintake_ids), + "material_ids": sorted(material_ids), + "requires_take_out": bool(preintake_ids or material_ids), + } + + def _take_out_remaining_materials(self, rpc: Any, targets: Dict[str, Any]) -> Any: + payload = { + "orderId": targets["order_id"], + "preintakeIds": targets["preintake_ids"], + "materialIds": targets["material_ids"], + } + if hasattr(rpc, "take_out"): + return rpc.take_out(payload["orderId"], payload["preintakeIds"], payload["materialIds"]) + if not all(hasattr(rpc, attr) for attr in ("post", "host", "api_key", "get_current_time_iso8601")): + raise RuntimeError("Bioyond RPC 客户端缺少 take-out 调用能力") + response = rpc.post( + url=f"{rpc.host}/api/lims/order/take-out", + params={ + "apiKey": rpc.api_key, + "requestTime": rpc.get_current_time_iso8601(), + "data": payload, + }, + ) + return response or {} + + def _extract_workflow_parameter_map(self, step_data: Any) -> Any: + parsed = self._json_loads_if_string(step_data) + if isinstance(parsed, dict) and self._looks_like_step_parameter_map(parsed): + return parsed + if isinstance(parsed, dict) and isinstance(parsed.get("data"), dict): + data = self._json_loads_if_string(parsed["data"]) + if isinstance(data, dict) and self._looks_like_step_parameter_map(data): + return data + return parsed + + def _workflow_items(self, workflow_data: Any) -> List[Dict[str, Any]]: + parsed = self._json_loads_if_string(workflow_data) + if isinstance(parsed, dict): + items = parsed.get("items") + if isinstance(items, list): + return [item for item in items if isinstance(item, dict)] + data = parsed.get("data") + if isinstance(data, dict) and isinstance(data.get("items"), list): + return [item for item in data["items"] if isinstance(item, dict)] + if isinstance(parsed, list): + return [item for item in parsed if isinstance(item, dict)] + return [] + + def _select_workflow_record( + self, + records: Iterable[Dict[str, Any]], + workflow_name: Optional[str], + ) -> Optional[Dict[str, Any]]: + candidates = [record for record in records if self._record_id(record)] + if not candidates: + return None + if workflow_name: + exact = [record for record in candidates if self._record_name(record) == workflow_name] + if exact: + return exact[0] + contains = [record for record in candidates if workflow_name in (self._record_name(record) or "")] + if contains: + return contains[0] + return candidates[0] + + def _sub_workflow_records(self, root_workflow: Dict[str, Any]) -> List[Dict[str, Any]]: + records: List[Dict[str, Any]] = [] + for key in ("subWorkflows", "subWorkflowList", "workflows", "children"): + value = root_workflow.get(key) + if isinstance(value, list): + records.extend(item for item in value if isinstance(item, dict)) + return records + + def _order_items(self, order_snapshot: Any) -> List[Dict[str, Any]]: + parsed = self._json_loads_if_string(order_snapshot) + if isinstance(parsed, dict): + data = parsed.get("data") + if isinstance(data, dict) and isinstance(data.get("items"), list): + return [item for item in data["items"] if isinstance(item, dict)] + if isinstance(parsed.get("items"), list): + return [item for item in parsed["items"] if isinstance(item, dict)] + return [] + + def _extract_create_order_materials(self, result: Any) -> List[Dict[str, Any]]: + parsed = self._parse_lims_result(result) + if isinstance(parsed, dict) and "data" in parsed: + parsed = self._parse_lims_result(parsed.get("data")) + records: List[Dict[str, Any]] = [] + if isinstance(parsed, dict): + for order_id, value in parsed.items(): + for item in self._as_list(value): + if not isinstance(item, dict): + continue + record = dict(item) + record.setdefault("orderId", order_id) + records.append(record) + elif isinstance(parsed, list): + records.extend(item for item in parsed if isinstance(item, dict)) + return records + + def _extract_suggested_locations(self, material_records: List[Dict[str, Any]]) -> List[Dict[str, str]]: + seen = set() + locations: List[Dict[str, str]] = [] + for record in material_records: + location_id = str(record.get("locationId") or "") + location_code = str(record.get("locationShowName") or record.get("locationCode") or "") + if not location_id and not location_code: + continue + key = (location_id, location_code) + if key in seen: + continue + seen.add(key) + locations.append( + { + "locationId": location_id, + "locationCode": str(record.get("locationCode") or ""), + "locationShowName": str(record.get("locationShowName") or ""), + "materialName": str(record.get("materialName") or ""), + "materialCode": str(record.get("materialCode") or ""), + "location_id": location_id, + "location_code": location_code, + "material_name": str(record.get("materialName") or ""), + "material_code": str(record.get("materialCode") or ""), + } + ) + return locations + + def _extract_created_order_ids(self, result: Any) -> List[str]: + parsed = self._parse_lims_result(result) + if isinstance(parsed, dict) and "data" in parsed: + parsed = self._parse_lims_result(parsed.get("data")) + if isinstance(parsed, dict): + return [str(key) for key in parsed.keys() if self._looks_like_uuid(key)] + if isinstance(parsed, str) and self._looks_like_uuid(parsed): + return [parsed] + return [] + + def _create_result_success(self, parsed_result: Any, order_ids: List[str], material_records: List[Dict[str, Any]]) -> bool: + if isinstance(parsed_result, dict) and "code" in parsed_result: + return parsed_result.get("code") == 1 + return bool(order_ids or material_records) + + def _format_create_order_confirmation( + self, + order_code: str, + order_name: str, + workflow: Dict[str, str], + order_ids: List[str], + material_records: List[Dict[str, Any]], + suggested_locations: List[Dict[str, str]], + ) -> str: + lines = [ + f"实验已提交: {order_name} ({order_code})", + f"工作流: {workflow.get('workflow_name', '')} / {workflow.get('sub_workflow_name', '')}", + ] + if order_ids: + lines.append(f"实验ID: {', '.join(order_ids)}") + if material_records: + lines.append("所需物料与建议库位:") + for index, record in enumerate(material_records, 1): + name = record.get("materialName") or "未命名物料" + code = record.get("materialCode") or "-" + quantity = record.get("quantity") or "-" + material_type = record.get("materialTypeName") or record.get("materialTypeMode") or "-" + location = record.get("locationShowName") or record.get("locationCode") or "-" + lines.append(f"{index}. {name} [{code}], {quantity}, {material_type}, 建议库位 {location}") + else: + lines.append("所需物料与建议库位: LIMS 未返回预分配记录") + if suggested_locations: + lines.append("库位汇总:") + for index, location in enumerate(suggested_locations, 1): + material = location.get("material_name") or "未命名物料" + code = location.get("location_code") or location.get("location_id") or "-" + lines.append(f"{index}. {material} -> {code}") + return "\n".join(lines) + + def _record_name(self, record: Optional[Dict[str, Any]]) -> str: + if not isinstance(record, dict): + return "" + for key in ("name", "workflowName", "workFlowName", "displayName"): + if record.get(key): + return str(record[key]) + return "" + + def _record_id(self, record: Optional[Dict[str, Any]]) -> str: + if not isinstance(record, dict): + return "" + for key in ("id", "workflowId", "workFlowId", "subWorkflowId", "subWorkFlowId"): + if record.get(key): + return str(record[key]) + return "" + + def _parameter_key(self, parameter: Dict[str, Any]) -> str: + value = parameter.get("Key") or parameter.get("key") + return "" if self._is_blank(value) else str(value) + + def _looks_like_step_parameter_map(self, value: Any) -> bool: + return isinstance(value, dict) and any(self._looks_like_uuid(key) and isinstance(item, list) for key, item in value.items()) + + def _parse_lims_result(self, result: Any) -> Any: + if not isinstance(result, str): + return result + text = result.strip() + if not text: + return text + try: + return json.loads(text) + except ValueError: + pass + try: + return ast.literal_eval(text) + except (ValueError, SyntaxError): + return text + + def _json_loads_if_string(self, value: Any) -> Any: + if isinstance(value, str): + try: + return json.loads(value) + except ValueError: + return value + return value + + def _require_uuid(self, value: Any, field_name: str) -> str: + try: + return str(UUID(str(value))) + except (TypeError, ValueError, AttributeError) as exc: + raise ValueError(f"{field_name} 必须是 UUID: {value!r}") from exc + + def _looks_like_uuid(self, value: Any) -> bool: + try: + UUID(str(value)) + except (TypeError, ValueError, AttributeError): + return False + return True + + def _as_list(self, value: Any) -> List[Any]: + if value is None: + return [] + return value if isinstance(value, list) else [value] + + def _kwarg_text(self, kwargs: Dict[str, Any], key: str) -> str: + value = kwargs.get(key) + return "" if self._is_blank(value) else str(value) + + def _is_blank(self, value: Any) -> bool: + if value is None: + return True + if isinstance(value, str): + return value.strip() == "" + if isinstance(value, list): + return all(self._is_blank(item) for item in value) + if isinstance(value, dict): + return not value + return False + def main() -> int: """命令行入口:读取配置并拉取工作流列表。""" diff --git a/unilabos/resources/bioyond/decks.py b/unilabos/resources/bioyond/decks.py index 5f3b2c4e..40d02521 100644 --- a/unilabos/resources/bioyond/decks.py +++ b/unilabos/resources/bioyond/decks.py @@ -1,6 +1,7 @@ from os import name from pylabrobot.resources import Deck, Coordinate, Rotation +from unilabos.registry.decorators import resource from unilabos.resources.bioyond.YB_warehouses import ( bioyond_warehouse_1x4x4, bioyond_warehouse_1x4x4_right, # 新增:右侧仓库 (A05~D08) @@ -23,6 +24,9 @@ from unilabos.resources.bioyond.YB_warehouses import ( from unilabos.resources.bioyond.warehouses import ( bioyond_warehouse_tipbox_storage_left, # 新增:Tip盒堆栈(左) bioyond_warehouse_tipbox_storage_right, # 新增:Tip盒堆栈(右) + bioyond_warehouse_sirna_automation_stack, + bioyond_warehouse_sirna_centrifuge_balance_plate_stack, + bioyond_warehouse_sirna_g3_liquid_handler, ) @@ -101,6 +105,50 @@ class BIOYOND_PolymerPreparationStation_Deck(Deck): for warehouse_name, warehouse in self.warehouses.items(): self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name]) +@resource( + id="BIOYOND_SirnaStation_Deck", + category=["deck"], + description="BIOYOND 小核酸工作站 Deck", + icon="配液站.webp", +) +class BIOYOND_SirnaStation_Deck(Deck): + def __init__( + self, + name: str = "SirnaStation_Deck", + size_x: float = 2700.0, + size_y: float = 1080.0, + size_z: float = 1500.0, + category: str = "deck", + setup: bool = False + ) -> None: + super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z) + if setup: + self.setup() + + @classmethod + def deserialize(cls, data: dict, allow_marshal: bool = False): + if data.get("children") and data.get("setup") is True: + data = data.copy() + data["setup"] = False + return super().deserialize(data, allow_marshal=allow_marshal) + + def setup(self) -> None: + # Sirna 读接口 /api/storage/location/locations-by-type 返回完整固定堆栈清单。 + # LIMS 在库物料接口仍使用相同的 自动化堆栈 名称和数字库位编码。 + self.warehouses = { + "G3移液站": bioyond_warehouse_sirna_g3_liquid_handler(), + "自动化堆栈": bioyond_warehouse_sirna_automation_stack(), + "离心机配平板堆栈": bioyond_warehouse_sirna_centrifuge_balance_plate_stack(), + } + self.warehouse_locations = { + "G3移液站": Coordinate(0.0, 0.0, 0.0), + "自动化堆栈": Coordinate(0.0, 180.0, 0.0), + "离心机配平板堆栈": Coordinate(0.0, 1300.0, 0.0), + } + + for warehouse_name, warehouse in self.warehouses.items(): + self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name]) + class BIOYOND_YB_Deck(Deck): def __init__( self, @@ -154,8 +202,3 @@ def YB_Deck(name: str) -> Deck: by=BIOYOND_YB_Deck(name=name) by.setup() return by - - - - - diff --git a/unilabos/resources/bioyond/warehouses.py b/unilabos/resources/bioyond/warehouses.py index 547af1e3..98b405f4 100644 --- a/unilabos/resources/bioyond/warehouses.py +++ b/unilabos/resources/bioyond/warehouses.py @@ -1,5 +1,69 @@ +from pylabrobot.resources import Coordinate +from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_resources + from unilabos.resources.warehouse import WareHouse, warehouse_factory + +def bioyond_warehouse_numeric_stack(name: str, rows: int = 10, columns: int = 17) -> WareHouse: + """创建 Bioyond 数字库位堆栈,库位名使用服务端返回的 行-列 格式。""" + num_items_x = columns + num_items_y = rows + num_items_z = 1 + dx = 10.0 + dy = 10.0 + dz = 10.0 + item_dx = 147.0 + item_dy = 106.0 + item_dz = 130.0 + locations = [ + Coordinate(dx + col * item_dx, dy + row * item_dy, dz) + for row in range(num_items_y) + for col in range(num_items_x) + ] + holders = create_homogeneous_resources( + klass=ResourceHolder, + locations=locations, + resource_size_x=127.0, + resource_size_y=86.0, + resource_size_z=25.0, + name_prefix=name, + ) + keys = [ + f"{row + 1}-{col + 1}" + for row in range(num_items_y) + for col in range(num_items_x) + ] + return WareHouse( + name=name, + size_x=dx + item_dx * num_items_x, + size_y=dy + item_dy * num_items_y, + size_z=dz + item_dz * num_items_z, + num_items_x=num_items_x, + num_items_y=num_items_y, + num_items_z=num_items_z, + ordering_layout="row-major", + sites={key: holder for key, holder in zip(keys, holders.values())}, + category="warehouse", + ) + + +# ================ 小核酸工作站相关堆栈 ================ + +def bioyond_warehouse_sirna_g3_liquid_handler(name: str = "G3移液站") -> WareHouse: + """创建小核酸 G3 移液站库位堆栈:1 行 x 14 列。""" + return bioyond_warehouse_numeric_stack(name, rows=1, columns=14) + + +def bioyond_warehouse_sirna_automation_stack(name: str = "自动化堆栈") -> WareHouse: + """创建小核酸自动化堆栈:10 行 x 17 列。""" + return bioyond_warehouse_numeric_stack(name, rows=10, columns=17) + + +def bioyond_warehouse_sirna_centrifuge_balance_plate_stack(name: str = "离心机配平板堆栈") -> WareHouse: + """创建小核酸离心机配平板堆栈:2 行 x 1 列。""" + return bioyond_warehouse_numeric_stack(name, rows=2, columns=1) + + # ================ 反应站相关堆栈 ================ def bioyond_warehouse_1x4x4(name: str) -> WareHouse: