From 633c8b3d2ccd66ebe6ef341942a7859c97631dff Mon Sep 17 00:00:00 2001 From: yxz321 Date: Tue, 12 May 2026 15:42:14 +0800 Subject: [PATCH] feat: RNA add Sirna experiment controls and workflow bindings - Add bound Sirna workflow names for experiment 1 and experiment 2 submissions. - Route named workflow submission through a shared create-order core with minimal param payloads. - Add direct scheduler controls and cancel/take-out manual confirmation handling. --- .../sirna_station/sirna_station.py | 518 +++++++++++++++--- 1 file changed, 442 insertions(+), 76 deletions(-) diff --git a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py index c6a34a32..8a7dc3ed 100644 --- a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py +++ b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py @@ -123,13 +123,53 @@ DEFAULT_SIRNA_MATERIAL_TYPE_MAPPINGS = { "bioyond_sirna_cell_culture_plate": ["细胞培养板", ""], "bioyond_sirna_reagent_trough": ["试剂槽RiboGreen", ""], } +SIRNA_EXPERIMENT_1_WORKFLOW_NAME = "场景一:报告基因检测流程" +SIRNA_EXPERIMENT_1_SUB_WORKFLOW_NAME = "报告基因检测流程" +SIRNA_EXPERIMENT_1_WORKFLOW_ID = "3a1fc8e9-f807-3f9e-6f48-7132f594141a" +SIRNA_EXPERIMENT_1_SUB_WORKFLOW_ID = "3a1fc8ea-35b0-ce0c-1a46-ab506b647e4e" +SIRNA_EXPERIMENT_2_WORKFLOW_NAME = "场景二:基因表达检测" +SIRNA_EXPERIMENT_2_SUB_WORKFLOW_NAME = "基因编辑检测" +SIRNA_EXPERIMENT_2_WORKFLOW_ID = "3a1fcdbd-316c-a4b8-a7ee-a262099552fa" +SIRNA_EXPERIMENT_2_SUB_WORKFLOW_ID = "3a1fd2d4-5d3f-fae1-8b3d-ec6d0abb6646" +SIRNA_WORKFLOW_BINDING_LAST_UPDATED = "2026-05-12" +class SubmitExperimentRequiredParams(TypedDict): + workflow_name: Annotated[str, Field(description="工作流名称(必填,不填写工作流 ID)")] + sample_throughput: Annotated[int, Field(description="样品通量(1-96,必填),表示一次实验处理的样品数量。")] + + +class SubmitExperimentOptionalParams(TypedDict, total=False): + sub_workflow_name: Annotated[str, Field(description="子工作流名称(可选;为空时选中根工作流下的可用子工作流)")] + order_code: Annotated[str, Field(description="订单编号(可选,自动生成)")] + order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")] + parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")] + auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料(默认True)")] + + +# 绑定信息(最后更新 2026-05-12): +# 工作流「场景一:报告基因检测流程」= 3a1fc8e9-f807-3f9e-6f48-7132f594141a +# 子工作流「报告基因检测流程」= 3a1fc8ea-35b0-ce0c-1a46-ab506b647e4e class Experiment1RequiredParams(TypedDict): sample_throughput: Annotated[int, Field(description="样品通量(1-96,必填),表示一次实验处理的样品数量。")] class Experiment1OptionalParams(TypedDict, total=False): + order_code: Annotated[str, Field(description="订单编号(可选,自动生成)")] + order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")] + parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")] + auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料(默认True)")] + + +# 绑定信息(最后更新 2026-05-12): +# 工作流「场景二:基因表达检测」= 3a1fcdbd-316c-a4b8-a7ee-a262099552fa +# 子工作流「基因编辑检测」= 3a1fd2d4-5d3f-fae1-8b3d-ec6d0abb6646 +class Experiment2RequiredParams(TypedDict): + sample_throughput: Annotated[int, Field(description="样品通量(1-96,必填),表示一次实验处理的样品数量。")] + + +class Experiment2OptionalParams(TypedDict, total=False): + order_code: Annotated[str, Field(description="订单编号(可选,自动生成)")] order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")] parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")] auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料(默认True)")] @@ -448,6 +488,132 @@ class BioyondSirnaStation(BioyondWorkstation): refresh_material_cache=refresh_material_cache, ) + @action( + always_free=True, + description="手动确认后直接启动 Bioyond 小核酸调度器,不执行装载确认门禁", + ) + def scheduler_start( + self, + **kwargs: Any, + ) -> Dict[str, Any]: + """直接调用 Bioyond 调度器启动接口。""" + return self._run_scheduler_action("scheduler_start", "启动", **kwargs) + + @action( + always_free=True, + description="手动确认后直接停止 Bioyond 小核酸调度器", + ) + def scheduler_stop( + self, + **kwargs: Any, + ) -> Dict[str, Any]: + """直接调用 Bioyond 调度器停止接口。""" + return self._run_scheduler_action("scheduler_stop", "停止", **kwargs) + + @action( + always_free=True, + description="手动确认后直接暂停 Bioyond 小核酸调度器", + ) + def scheduler_pause( + self, + **kwargs: Any, + ) -> Dict[str, Any]: + """直接调用 Bioyond 调度器暂停接口。""" + return self._run_scheduler_action("scheduler_pause", "暂停", **kwargs) + + @action( + always_free=True, + description="手动确认后直接继续 Bioyond 小核酸调度器", + ) + def scheduler_continue( + self, + **kwargs: Any, + ) -> Dict[str, Any]: + """直接调用 Bioyond 调度器继续接口。""" + return self._run_scheduler_action("scheduler_continue", "继续", **kwargs) + + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={ + "order_id": "", + "order_code": "", + "cancel_experiment": True, + "take_out_remaining": True, + "confirmed": False, + "timeout_seconds": 3600, + "assignee_user_ids": [], + }, + feedback_interval=300, + description="手动确认后取消 Bioyond 实验,并按订单快照释放残留物料", + ) + def cancel_experiment_and_take_out( + self, + order_id: str = "", + order_code: str = "", + cancel_experiment: bool = True, + take_out_remaining: bool = True, + confirmed: bool = False, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """取消实验并释放订单残留物料。""" + with self._debug_call_session("cancel_experiment_and_take_out"): + del timeout_seconds, assignee_user_ids + api_host = self._kwarg_text(kwargs, "api_host") + api_key = self._kwarg_text(kwargs, "api_key") + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + if not self._as_manual_gate(confirmed): + raise RuntimeError("取消实验/释放物料需要操作员确认") + + normalized_order_id = str(order_id or "").strip() + filter_text = str(order_code or normalized_order_id or "").strip() + if not filter_text: + raise RuntimeError("取消实验/释放物料需要提供 order_id 或 order_code") + + rpc = self._require_hardware_interface("order_query") + snapshot = self._query_order_snapshot(rpc, filter_text) + targets = self._extract_takeout_targets(snapshot, normalized_order_id) + resolved_order_id = str(targets.get("order_id") or normalized_order_id or "").strip() + if not resolved_order_id and cancel_experiment: + raise RuntimeError("未能解析可取消的 Bioyond order_id") + + result: Dict[str, Any] = { + "success": True, + "order_id": resolved_order_id, + "order_code": str(order_code or ""), + "cleanup_targets": targets, + "selected_operations": { + "cancel_experiment": bool(cancel_experiment), + "take_out_remaining": bool(take_out_remaining), + }, + } + if cancel_experiment: + self._require_rpc_method(rpc, "cancel_experiment") + cancel_result = rpc.cancel_experiment(resolved_order_id) + result["cancel_experiment"] = cancel_result + result["success"] = bool(cancel_result == 1) + logger.info( + "小核酸取消实验返回: order_id=%s result=%s", + resolved_order_id, + cancel_result, + ) + if take_out_remaining: + if targets.get("requires_take_out"): + take_out_result = self._take_out_remaining_materials(rpc, targets) + result["take_out"] = take_out_result + logger.info("小核酸取消后 take_out 返回: %s", take_out_result) + else: + result["take_out"] = { + "skipped": True, + "reason": "订单快照未发现 preIntake/material 释放目标", + } + logger.info("小核酸取消后跳过 take_out: 未发现释放目标") + result["confirmation_message"] = "取消/释放操作已提交" if result["success"] else "取消实验失败,请检查 LIMS 状态" + return result + @action( always_free=True, node_type=NodeType.MANUAL_CONFIRM, @@ -541,26 +707,175 @@ class BioyondSirnaStation(BioyondWorkstation): - confirmation_message (str): 确认消息 - registration_result (Dict): 物料注册结果 """ - with self._debug_call_session("submit_experiment_1"): - if isinstance(required_params, dict): - sample_throughput = required_params.get("sample_throughput") - else: - sample_throughput = required_params - sample_throughput = int(sample_throughput) + optional_params = optional_params or {} + if isinstance(required_params, dict): + sample_throughput = required_params.get("sample_throughput") + else: + sample_throughput = required_params + return self._submit_experiment_core( + action_name="submit_experiment_1", + experiment_number=1, + workflow_name=SIRNA_EXPERIMENT_1_WORKFLOW_NAME, + sub_workflow_name=SIRNA_EXPERIMENT_1_SUB_WORKFLOW_NAME, + sample_throughput=int(sample_throughput), + order_code=str(optional_params.get("order_code", "") or ""), + order_name=str(optional_params.get("order_name", "") or ""), + parameter_overrides=optional_params.get("parameter_overrides", ""), + auto_register_materials=bool(optional_params.get("auto_register_materials", True)), + timeout_seconds=timeout_seconds, + assignee_user_ids=assignee_user_ids, + **kwargs, + ) - if optional_params is None: - optional_params = {} - order_name = optional_params.get("order_name", "") - parameter_overrides = optional_params.get("parameter_overrides", "") - auto_register_materials = optional_params.get("auto_register_materials", True) + @action( + always_free=True, + description="按工作流名称提交小核酸实验(通用入口,不暴露工作流 ID)", + ) + def submit_experiment( + self, + required_params: SubmitExperimentRequiredParams, + optional_params: Optional[SubmitExperimentOptionalParams] = None, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """按工作流名称提交小核酸实验到 Bioyond LIMS。""" + optional_params = optional_params or {} + workflow_name = "" + sample_throughput: Any = None + if isinstance(required_params, dict): + workflow_name = str(required_params.get("workflow_name", "") or "") + sample_throughput = required_params.get("sample_throughput") + else: + sample_throughput = required_params + return self._submit_experiment_core( + action_name="submit_experiment", + experiment_number=None, + workflow_name=workflow_name, + sub_workflow_name=str(optional_params.get("sub_workflow_name", "") or ""), + sample_throughput=int(sample_throughput), + order_code=str(optional_params.get("order_code", "") or ""), + order_name=str(optional_params.get("order_name", "") or ""), + parameter_overrides=optional_params.get("parameter_overrides", ""), + auto_register_materials=bool(optional_params.get("auto_register_materials", True)), + timeout_seconds=timeout_seconds, + assignee_user_ids=assignee_user_ids, + **kwargs, + ) + + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={ + "optional_params": {"auto_register_materials": True}, + "timeout_seconds": 3600, + "assignee_user_ids": [], + }, + feedback_interval=300, + description="提交小核酸实验2(基因表达检测)", + handles=[ + ActionOutputHandle( + key="order_id", + data_type="bioyond_order_id", + label="实验ID", + data_key="order_id", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="order_ids", + data_type="bioyond_order_ids", + label="实验ID列表", + data_key="order_ids", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="target_device", + data_type="device_id", + label="目标设备", + data_key="target_device", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="resource", + data_type="resource", + label="待装载物料", + data_key="resource", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="coin_cell_code", data_type="array", + label="物料名称", + data_key="coin_cell_code", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="mount_resource", data_type="resource", + label="库位", + data_key="mount_resource", + data_source=DataSource.EXECUTOR, + ), + ], + ) + def submit_experiment_2( + self, + required_params: Experiment2RequiredParams, + optional_params: Optional[Experiment2OptionalParams] = None, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """提交小核酸实验2(基因表达检测)到 Bioyond LIMS。""" + optional_params = optional_params or {} + if isinstance(required_params, dict): + sample_throughput = required_params.get("sample_throughput") + else: + sample_throughput = required_params + return self._submit_experiment_core( + action_name="submit_experiment_2", + experiment_number=2, + workflow_name=SIRNA_EXPERIMENT_2_WORKFLOW_NAME, + sub_workflow_name=SIRNA_EXPERIMENT_2_SUB_WORKFLOW_NAME, + sample_throughput=int(sample_throughput), + order_code=str(optional_params.get("order_code", "") or ""), + order_name=str(optional_params.get("order_name", "") or ""), + parameter_overrides=optional_params.get("parameter_overrides", ""), + auto_register_materials=bool(optional_params.get("auto_register_materials", True)), + timeout_seconds=timeout_seconds, + assignee_user_ids=assignee_user_ids, + **kwargs, + ) + + def _submit_experiment_core( + self, + action_name: str, + experiment_number: Optional[int], + workflow_name: str, + sub_workflow_name: str, + sample_throughput: int, + order_code: str, + order_name: str, + parameter_overrides: Any, + auto_register_materials: bool, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + with self._debug_call_session(action_name): + if self._is_blank(workflow_name): + raise ValueError("提交实验必须提供 workflow_name(工作流名称),不能提供或依赖 workflow id") target_device = ( self._kwarg_text(kwargs, "unilabos_device_id") or self._kwarg_text(kwargs, "device_id") or "bioyond_sirna_station" ) logger.info( - "小核酸实验1提交开始: sample_throughput=%s target_device=%s " - "auto_register_materials=%s overrides=%s", + "小核酸实验提交开始: action=%s experiment=%s workflow=%s sub_workflow=%s " + "sample_throughput=%s target_device=%s auto_register_materials=%s overrides=%s", + action_name, + experiment_number, + workflow_name, + sub_workflow_name, sample_throughput, target_device, bool(auto_register_materials), @@ -569,11 +884,13 @@ class BioyondSirnaStation(BioyondWorkstation): del timeout_seconds, assignee_user_ids, kwargs rpc = self._require_hardware_interface("create_order") - - # 自动解析实验1工作流(无需用户指定workflow_name) - workflow = self._resolve_experiment_1_workflow(rpc) + workflow = self._resolve_experiment_workflow( + rpc, + workflow_name=workflow_name, + sub_workflow_name=sub_workflow_name, + ) logger.info( - "小核酸实验1工作流已解析: root=%s(%s) sub=%s(%s)", + "小核酸实验工作流已解析: root=%s(%s) sub=%s(%s)", workflow.get("workflow_name", ""), workflow.get("root_workflow_id", ""), workflow.get("sub_workflow_name", ""), @@ -581,28 +898,29 @@ class BioyondSirnaStation(BioyondWorkstation): ) step_data = rpc.workflow_step_query(workflow["sub_workflow_id"]) - - # Parse parameter_overrides from text format - parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides) - + parsed_overrides = self._parse_parameter_overrides_text(str(parameter_overrides or "")) param_values, parameter_template = self._build_param_values_from_step_data( step_data, parameter_overrides=parsed_overrides, - include_all_task_displayable=True, + include_all_task_displayable=False, ) if not param_values: - logger.error("小核酸实验1参数构建失败: LIMS 子工作流未返回可用 paramValues") + logger.error("小核酸实验参数构建失败: LIMS 子工作流未返回可用 paramValues") raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues") param_entry_count = sum(len(entries) for entries in param_values.values()) logger.info( - "小核酸实验1参数已构建: steps=%s entries=%s template_items=%s overrides=%s", + "小核酸实验参数已构建: steps=%s entries=%s template_items=%s overrides=%s", len(param_values), param_entry_count, len(parameter_template), len(parsed_overrides), ) - resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name) + resolved_order_code, resolved_order_name = self._build_bioyond_order_identity( + experiment_number=experiment_number, + order_code=order_code, + order_name=order_name, + ) order_payload = [ { "orderCode": resolved_order_code, @@ -614,7 +932,7 @@ class BioyondSirnaStation(BioyondWorkstation): } ] - logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})") + logger.info("正在提交小核酸实验: %s (%s)", resolved_order_name, resolved_order_code) raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False)) parsed_result = self._parse_lims_result(raw_result) material_records = self._extract_create_order_materials(parsed_result) @@ -642,7 +960,7 @@ class BioyondSirnaStation(BioyondWorkstation): } create_success = self._create_result_success(parsed_result, order_ids, material_records) logger.info( - "小核酸实验1创建结果: success=%s order_ids=%s materials=%s " + "小核酸实验创建结果: success=%s order_ids=%s materials=%s " "suggested_locations=%s material_types=%s", create_success, order_ids, @@ -652,29 +970,22 @@ class BioyondSirnaStation(BioyondWorkstation): ) if not create_success: logger.error( - "小核酸实验1创建未返回成功结果: order_code=%s order_ids=%s materials=%s", + "小核酸实验创建未返回成功结果: order_code=%s order_ids=%s materials=%s", resolved_order_code, order_ids, len(material_records), ) elif not material_records: - logger.warning("小核酸实验1创建成功但未返回物料分配记录: order_code=%s", resolved_order_code) + logger.warning("小核酸实验创建成功但未返回物料分配记录: order_code=%s", resolved_order_code) elif not suggested_locations: - logger.warning("小核酸实验1创建成功但未解析到推荐库位: order_code=%s", resolved_order_code) + logger.warning("小核酸实验创建成功但未解析到推荐库位: order_code=%s", resolved_order_code) registration_result = { "registered": [], "skipped": material_records, - "reason": "submit_experiment_1_resource_sync_disabled", + "reason": f"{action_name}_resource_sync_disabled", "auto_register_materials": bool(auto_register_materials), } - # if auto_register_materials and material_records: - # try: - # registration_result = self._register_materials_to_tree(material_records) - # logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树") - # except Exception as e: - # logger.error(f"物料注册失败: {e}") - # registration_result = {"error": str(e)} result = { "success": create_success, @@ -703,7 +1014,8 @@ class BioyondSirnaStation(BioyondWorkstation): } result.update(result["manual_load_probe"]) logger.info( - "小核酸实验1提交完成: order_code=%s success=%s manual_load_rows=%s", + "小核酸实验提交完成: action=%s order_code=%s success=%s manual_load_rows=%s", + action_name, resolved_order_code, result["success"], { @@ -1007,6 +1319,9 @@ class BioyondSirnaStation(BioyondWorkstation): ) -> Dict[str, Any]: """Guided manual-unload checkpoint that clears the deck after confirmation. + TODO: 未来应改成监听 Bioyond order_finish/finish signal 后触发的卸载向导, + 而不是由工作流显式调用的本地 deck 清理节点。 + Mirrors :meth:`start_experiment` in shape: three boolean gates default to ``False``, three sibling-array display tables (Sample / Consumables / Reagent × material_name / material_code / location / quantity) @@ -1627,12 +1942,8 @@ class BioyondSirnaStation(BioyondWorkstation): workflow_name: str = "", sub_workflow_name: str = "", ) -> Dict[str, str]: - workflow_name = workflow_name or self._config_value("experiment_1_workflow_name", "sirna_exp1_workflow_name") or "" - sub_workflow_name = ( - sub_workflow_name - or self._config_value("experiment_1_sub_workflow_name", "sirna_exp1_sub_workflow_name") - or "" - ) + workflow_name = workflow_name or "" + sub_workflow_name = sub_workflow_name or "" workflow_query = {"type": 0, "filter": workflow_name or sub_workflow_name, "includeDetail": True} logger.info("正在解析小核酸工作流: filter=%r", workflow_query["filter"]) workflow_data = rpc.query_workflow(json.dumps(workflow_query, ensure_ascii=False)) @@ -1671,9 +1982,6 @@ class BioyondSirnaStation(BioyondWorkstation): "sub_workflow_id": sub_id, } - def _resolve_experiment_1_workflow(self, rpc: Any) -> Dict[str, str]: - return self._resolve_experiment_workflow(rpc) - def _build_param_values_from_step_data( self, step_data: Any, @@ -1686,6 +1994,8 @@ class BioyondSirnaStation(BioyondWorkstation): raise RuntimeError("workflow_step_query 未返回可解析的步骤参数") param_values: Dict[str, List[Dict[str, Any]]] = {} parameter_template: List[Dict[str, Any]] = [] + override_items = self._parameter_override_items(parameter_overrides) + override_keys = {key for key, _ in override_items} for step_id, value in parameter_map.items(): if not self._looks_like_uuid(step_id): continue @@ -1716,10 +2026,12 @@ class BioyondSirnaStation(BioyondWorkstation): parameter_template.append(template_item) if parameter_type.lower() == "hidden" or task_displayable == 0: continue - if not include_all_task_displayable and key != "protocolName": + is_required_default = key == "protocolName" + is_explicit_override = key in override_keys + if not include_all_task_displayable and not is_required_default and not is_explicit_override: continue value_for_create_order = self._value_for_create_order(parameter) - if self._is_blank(value_for_create_order): + if self._is_blank(value_for_create_order) and not is_explicit_override: continue entry: Dict[str, Any] = {"key": key, "value": "" if self._is_blank(value_for_create_order) else str(value_for_create_order)} m_value = parameter.get("m", module_m) @@ -1731,7 +2043,7 @@ class BioyondSirnaStation(BioyondWorkstation): entries.append(entry) if entries: param_values[str(step_id)] = entries - self._apply_parameter_overrides(param_values, parameter_overrides) + self._apply_parameter_overrides(param_values, override_items) return param_values, parameter_template def _value_for_create_order(self, parameter: Dict[str, Any]) -> Any: @@ -1746,24 +2058,9 @@ class BioyondSirnaStation(BioyondWorkstation): param_values: Dict[str, List[Dict[str, Any]]], overrides: Any, ) -> None: - if not overrides: + override_items = self._parameter_override_items(overrides) + if not override_items: return - if isinstance(overrides, dict): - override_items = [(str(key), value) for key, value in overrides.items()] - else: - override_items = [] - for override in self._as_list(overrides): - if not override: - continue - if isinstance(override, dict): - override_items.extend((str(key), value) for key, value in override.items()) - continue - if "=" not in str(override): - logger.error("参数覆盖格式错误: override=%r", override) - raise ValueError(f"参数覆盖必须使用 key=value 格式: {override!r}") - key, value = str(override).split("=", 1) - override_items.append((key, value)) - for key, value in override_items: matched = False for entries in param_values.values(): @@ -1775,25 +2072,63 @@ class BioyondSirnaStation(BioyondWorkstation): logger.error("参数覆盖未命中 paramValues: key=%s", key) raise ValueError(f"paramValues 中找不到可覆盖参数: {key}") + def _parameter_override_items(self, overrides: Any) -> List[Tuple[str, Any]]: + if not overrides: + return [] + if isinstance(overrides, dict): + return [(str(key), value) for key, value in overrides.items() if not self._is_blank(value)] + override_items: List[Tuple[str, Any]] = [] + for override in self._as_list(overrides): + if not override: + continue + if isinstance(override, dict): + override_items.extend( + (str(key), value) for key, value in override.items() if not self._is_blank(value) + ) + continue + if isinstance(override, (list, tuple)) and len(override) == 2: + if self._is_blank(override[1]): + continue + override_items.append((str(override[0]), override[1])) + continue + if "=" not in str(override): + logger.error("参数覆盖格式错误: override=%r", override) + raise ValueError(f"参数覆盖必须使用 key=value 格式: {override!r}") + key, value = str(override).split("=", 1) + if self._is_blank(value): + continue + override_items.append((key, value)) + return override_items + def _build_bioyond_order_identity( self, order_code: str = "", order_name: str = "", + experiment_number: Optional[int] = None, ) -> Tuple[str, str]: if order_code and order_name: return order_code, order_name - prefix = self._config_value( - "experiment_1_order_prefix", - "sirna_exp1_order_prefix", - "sirna_exp1_order_code_prefix", - ) or "test" + prefix_keys: List[str] = [] + if experiment_number is not None: + prefix_keys.extend( + [ + f"experiment_{experiment_number}_order_prefix", + f"sirna_exp{experiment_number}_order_prefix", + f"sirna_exp{experiment_number}_order_code_prefix", + ] + ) + prefix_keys.extend( + [ + "experiment_1_order_prefix", + "sirna_exp1_order_prefix", + "sirna_exp1_order_code_prefix", + ] + ) + prefix = self._config_value(*prefix_keys) or "test" suffix = datetime.now().strftime("%m%d%H%M%S") value = f"{prefix}{suffix}" return order_code or value, order_name or value - def _build_experiment1_order_identity(self) -> Tuple[str, str]: - return self._build_bioyond_order_identity() - def _parse_experiment1_create_result(self, result: Any, order_code: Optional[str] = None) -> Dict[str, Any]: parsed_result = self._parse_lims_result(result) material_records = self._extract_create_order_materials(parsed_result) @@ -1858,6 +2193,37 @@ class BioyondSirnaStation(BioyondWorkstation): result["error"] = str(exc) return result + def _run_scheduler_action( + self, + method_name: str, + action_label: str, + **kwargs: Any, + ) -> Dict[str, Any]: + """统一封装直接调度器动作,保持 station action 只是薄包装。""" + with self._debug_call_session(method_name): + api_host = self._kwarg_text(kwargs, "api_host") + api_key = self._kwarg_text(kwargs, "api_key") + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + rpc = self._require_hardware_interface(method_name) + logger.info("正在%s小核酸调度器: method=%s", action_label, method_name) + result = getattr(rpc, method_name)() + success = result == 1 + if success: + logger.info("小核酸调度器%s成功: result=%s", action_label, result) + else: + logger.error("小核酸调度器%s失败或返回非成功码: result=%s", action_label, result) + return { + "success": success, + "return_info": result, + f"{method_name}_result": result, + "scheduler_action": method_name, + "confirmation_message": ( + f"调度器{action_label}成功" + if success + else f"调度器{action_label}失败,请检查 LIMS 状态" + ), + } + def _run_reset_operations( self, rpc: Any,