feat: RNA add Sirna experiment controls and workflow bindings

- Add bound Sirna workflow names for experiment 1 and experiment 2 submissions.
- Route named workflow submission through a shared create-order core with minimal param payloads.
- Add direct scheduler controls and cancel/take-out manual confirmation handling.
This commit is contained in:
yxz321
2026-05-12 15:42:14 +08:00
parent e6ee6fc964
commit 633c8b3d2c

View File

@@ -123,13 +123,53 @@ DEFAULT_SIRNA_MATERIAL_TYPE_MAPPINGS = {
"bioyond_sirna_cell_culture_plate": ["细胞培养板", ""], "bioyond_sirna_cell_culture_plate": ["细胞培养板", ""],
"bioyond_sirna_reagent_trough": ["试剂槽RiboGreen", ""], "bioyond_sirna_reagent_trough": ["试剂槽RiboGreen", ""],
} }
SIRNA_EXPERIMENT_1_WORKFLOW_NAME = "场景一:报告基因检测流程"
SIRNA_EXPERIMENT_1_SUB_WORKFLOW_NAME = "报告基因检测流程"
SIRNA_EXPERIMENT_1_WORKFLOW_ID = "3a1fc8e9-f807-3f9e-6f48-7132f594141a"
SIRNA_EXPERIMENT_1_SUB_WORKFLOW_ID = "3a1fc8ea-35b0-ce0c-1a46-ab506b647e4e"
SIRNA_EXPERIMENT_2_WORKFLOW_NAME = "场景二:基因表达检测"
SIRNA_EXPERIMENT_2_SUB_WORKFLOW_NAME = "基因编辑检测"
SIRNA_EXPERIMENT_2_WORKFLOW_ID = "3a1fcdbd-316c-a4b8-a7ee-a262099552fa"
SIRNA_EXPERIMENT_2_SUB_WORKFLOW_ID = "3a1fd2d4-5d3f-fae1-8b3d-ec6d0abb6646"
SIRNA_WORKFLOW_BINDING_LAST_UPDATED = "2026-05-12"
class SubmitExperimentRequiredParams(TypedDict):
workflow_name: Annotated[str, Field(description="工作流名称(必填,不填写工作流 ID")]
sample_throughput: Annotated[int, Field(description="样品通量1-96必填表示一次实验处理的样品数量。")]
class SubmitExperimentOptionalParams(TypedDict, total=False):
sub_workflow_name: Annotated[str, Field(description="子工作流名称(可选;为空时选中根工作流下的可用子工作流)")]
order_code: Annotated[str, Field(description="订单编号(可选,自动生成)")]
order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")]
parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")]
auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料默认True")]
# 绑定信息(最后更新 2026-05-12
# 工作流「场景一:报告基因检测流程」= 3a1fc8e9-f807-3f9e-6f48-7132f594141a
# 子工作流「报告基因检测流程」= 3a1fc8ea-35b0-ce0c-1a46-ab506b647e4e
class Experiment1RequiredParams(TypedDict): class Experiment1RequiredParams(TypedDict):
sample_throughput: Annotated[int, Field(description="样品通量1-96必填表示一次实验处理的样品数量。")] sample_throughput: Annotated[int, Field(description="样品通量1-96必填表示一次实验处理的样品数量。")]
class Experiment1OptionalParams(TypedDict, total=False): class Experiment1OptionalParams(TypedDict, total=False):
order_code: Annotated[str, Field(description="订单编号(可选,自动生成)")]
order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")]
parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")]
auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料默认True")]
# 绑定信息(最后更新 2026-05-12
# 工作流「场景二:基因表达检测」= 3a1fcdbd-316c-a4b8-a7ee-a262099552fa
# 子工作流「基因编辑检测」= 3a1fd2d4-5d3f-fae1-8b3d-ec6d0abb6646
class Experiment2RequiredParams(TypedDict):
sample_throughput: Annotated[int, Field(description="样品通量1-96必填表示一次实验处理的样品数量。")]
class Experiment2OptionalParams(TypedDict, total=False):
order_code: Annotated[str, Field(description="订单编号(可选,自动生成)")]
order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")] order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")]
parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")] parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")]
auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料默认True")] auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料默认True")]
@@ -448,6 +488,132 @@ class BioyondSirnaStation(BioyondWorkstation):
refresh_material_cache=refresh_material_cache, refresh_material_cache=refresh_material_cache,
) )
@action(
always_free=True,
description="手动确认后直接启动 Bioyond 小核酸调度器,不执行装载确认门禁",
)
def scheduler_start(
self,
**kwargs: Any,
) -> Dict[str, Any]:
"""直接调用 Bioyond 调度器启动接口。"""
return self._run_scheduler_action("scheduler_start", "启动", **kwargs)
@action(
always_free=True,
description="手动确认后直接停止 Bioyond 小核酸调度器",
)
def scheduler_stop(
self,
**kwargs: Any,
) -> Dict[str, Any]:
"""直接调用 Bioyond 调度器停止接口。"""
return self._run_scheduler_action("scheduler_stop", "停止", **kwargs)
@action(
always_free=True,
description="手动确认后直接暂停 Bioyond 小核酸调度器",
)
def scheduler_pause(
self,
**kwargs: Any,
) -> Dict[str, Any]:
"""直接调用 Bioyond 调度器暂停接口。"""
return self._run_scheduler_action("scheduler_pause", "暂停", **kwargs)
@action(
always_free=True,
description="手动确认后直接继续 Bioyond 小核酸调度器",
)
def scheduler_continue(
self,
**kwargs: Any,
) -> Dict[str, Any]:
"""直接调用 Bioyond 调度器继续接口。"""
return self._run_scheduler_action("scheduler_continue", "继续", **kwargs)
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={
"order_id": "",
"order_code": "",
"cancel_experiment": True,
"take_out_remaining": True,
"confirmed": False,
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description="手动确认后取消 Bioyond 实验,并按订单快照释放残留物料",
)
def cancel_experiment_and_take_out(
self,
order_id: str = "",
order_code: str = "",
cancel_experiment: bool = True,
take_out_remaining: bool = True,
confirmed: bool = False,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""取消实验并释放订单残留物料。"""
with self._debug_call_session("cancel_experiment_and_take_out"):
del timeout_seconds, assignee_user_ids
api_host = self._kwarg_text(kwargs, "api_host")
api_key = self._kwarg_text(kwargs, "api_key")
self._update_runtime_api_config(api_host=api_host, api_key=api_key)
if not self._as_manual_gate(confirmed):
raise RuntimeError("取消实验/释放物料需要操作员确认")
normalized_order_id = str(order_id or "").strip()
filter_text = str(order_code or normalized_order_id or "").strip()
if not filter_text:
raise RuntimeError("取消实验/释放物料需要提供 order_id 或 order_code")
rpc = self._require_hardware_interface("order_query")
snapshot = self._query_order_snapshot(rpc, filter_text)
targets = self._extract_takeout_targets(snapshot, normalized_order_id)
resolved_order_id = str(targets.get("order_id") or normalized_order_id or "").strip()
if not resolved_order_id and cancel_experiment:
raise RuntimeError("未能解析可取消的 Bioyond order_id")
result: Dict[str, Any] = {
"success": True,
"order_id": resolved_order_id,
"order_code": str(order_code or ""),
"cleanup_targets": targets,
"selected_operations": {
"cancel_experiment": bool(cancel_experiment),
"take_out_remaining": bool(take_out_remaining),
},
}
if cancel_experiment:
self._require_rpc_method(rpc, "cancel_experiment")
cancel_result = rpc.cancel_experiment(resolved_order_id)
result["cancel_experiment"] = cancel_result
result["success"] = bool(cancel_result == 1)
logger.info(
"小核酸取消实验返回: order_id=%s result=%s",
resolved_order_id,
cancel_result,
)
if take_out_remaining:
if targets.get("requires_take_out"):
take_out_result = self._take_out_remaining_materials(rpc, targets)
result["take_out"] = take_out_result
logger.info("小核酸取消后 take_out 返回: %s", take_out_result)
else:
result["take_out"] = {
"skipped": True,
"reason": "订单快照未发现 preIntake/material 释放目标",
}
logger.info("小核酸取消后跳过 take_out: 未发现释放目标")
result["confirmation_message"] = "取消/释放操作已提交" if result["success"] else "取消实验失败,请检查 LIMS 状态"
return result
@action( @action(
always_free=True, always_free=True,
node_type=NodeType.MANUAL_CONFIRM, node_type=NodeType.MANUAL_CONFIRM,
@@ -541,26 +707,175 @@ class BioyondSirnaStation(BioyondWorkstation):
- confirmation_message (str): 确认消息 - confirmation_message (str): 确认消息
- registration_result (Dict): 物料注册结果 - registration_result (Dict): 物料注册结果
""" """
with self._debug_call_session("submit_experiment_1"): optional_params = optional_params or {}
if isinstance(required_params, dict): if isinstance(required_params, dict):
sample_throughput = required_params.get("sample_throughput") sample_throughput = required_params.get("sample_throughput")
else: else:
sample_throughput = required_params sample_throughput = required_params
sample_throughput = int(sample_throughput) return self._submit_experiment_core(
action_name="submit_experiment_1",
experiment_number=1,
workflow_name=SIRNA_EXPERIMENT_1_WORKFLOW_NAME,
sub_workflow_name=SIRNA_EXPERIMENT_1_SUB_WORKFLOW_NAME,
sample_throughput=int(sample_throughput),
order_code=str(optional_params.get("order_code", "") or ""),
order_name=str(optional_params.get("order_name", "") or ""),
parameter_overrides=optional_params.get("parameter_overrides", ""),
auto_register_materials=bool(optional_params.get("auto_register_materials", True)),
timeout_seconds=timeout_seconds,
assignee_user_ids=assignee_user_ids,
**kwargs,
)
if optional_params is None: @action(
optional_params = {} always_free=True,
order_name = optional_params.get("order_name", "") description="按工作流名称提交小核酸实验(通用入口,不暴露工作流 ID",
parameter_overrides = optional_params.get("parameter_overrides", "") )
auto_register_materials = optional_params.get("auto_register_materials", True) def submit_experiment(
self,
required_params: SubmitExperimentRequiredParams,
optional_params: Optional[SubmitExperimentOptionalParams] = None,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""按工作流名称提交小核酸实验到 Bioyond LIMS。"""
optional_params = optional_params or {}
workflow_name = ""
sample_throughput: Any = None
if isinstance(required_params, dict):
workflow_name = str(required_params.get("workflow_name", "") or "")
sample_throughput = required_params.get("sample_throughput")
else:
sample_throughput = required_params
return self._submit_experiment_core(
action_name="submit_experiment",
experiment_number=None,
workflow_name=workflow_name,
sub_workflow_name=str(optional_params.get("sub_workflow_name", "") or ""),
sample_throughput=int(sample_throughput),
order_code=str(optional_params.get("order_code", "") or ""),
order_name=str(optional_params.get("order_name", "") or ""),
parameter_overrides=optional_params.get("parameter_overrides", ""),
auto_register_materials=bool(optional_params.get("auto_register_materials", True)),
timeout_seconds=timeout_seconds,
assignee_user_ids=assignee_user_ids,
**kwargs,
)
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={
"optional_params": {"auto_register_materials": True},
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description="提交小核酸实验2基因表达检测",
handles=[
ActionOutputHandle(
key="order_id",
data_type="bioyond_order_id",
label="实验ID",
data_key="order_id",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="order_ids",
data_type="bioyond_order_ids",
label="实验ID列表",
data_key="order_ids",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="target_device",
data_type="device_id",
label="目标设备",
data_key="target_device",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="resource",
data_type="resource",
label="待装载物料",
data_key="resource",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="coin_cell_code", data_type="array",
label="物料名称",
data_key="coin_cell_code",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="mount_resource", data_type="resource",
label="库位",
data_key="mount_resource",
data_source=DataSource.EXECUTOR,
),
],
)
def submit_experiment_2(
self,
required_params: Experiment2RequiredParams,
optional_params: Optional[Experiment2OptionalParams] = None,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""提交小核酸实验2基因表达检测到 Bioyond LIMS。"""
optional_params = optional_params or {}
if isinstance(required_params, dict):
sample_throughput = required_params.get("sample_throughput")
else:
sample_throughput = required_params
return self._submit_experiment_core(
action_name="submit_experiment_2",
experiment_number=2,
workflow_name=SIRNA_EXPERIMENT_2_WORKFLOW_NAME,
sub_workflow_name=SIRNA_EXPERIMENT_2_SUB_WORKFLOW_NAME,
sample_throughput=int(sample_throughput),
order_code=str(optional_params.get("order_code", "") or ""),
order_name=str(optional_params.get("order_name", "") or ""),
parameter_overrides=optional_params.get("parameter_overrides", ""),
auto_register_materials=bool(optional_params.get("auto_register_materials", True)),
timeout_seconds=timeout_seconds,
assignee_user_ids=assignee_user_ids,
**kwargs,
)
def _submit_experiment_core(
self,
action_name: str,
experiment_number: Optional[int],
workflow_name: str,
sub_workflow_name: str,
sample_throughput: int,
order_code: str,
order_name: str,
parameter_overrides: Any,
auto_register_materials: bool,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
with self._debug_call_session(action_name):
if self._is_blank(workflow_name):
raise ValueError("提交实验必须提供 workflow_name工作流名称不能提供或依赖 workflow id")
target_device = ( target_device = (
self._kwarg_text(kwargs, "unilabos_device_id") self._kwarg_text(kwargs, "unilabos_device_id")
or self._kwarg_text(kwargs, "device_id") or self._kwarg_text(kwargs, "device_id")
or "bioyond_sirna_station" or "bioyond_sirna_station"
) )
logger.info( logger.info(
"小核酸实验1提交开始: sample_throughput=%s target_device=%s " "小核酸实验提交开始: action=%s experiment=%s workflow=%s sub_workflow=%s "
"auto_register_materials=%s overrides=%s", "sample_throughput=%s target_device=%s auto_register_materials=%s overrides=%s",
action_name,
experiment_number,
workflow_name,
sub_workflow_name,
sample_throughput, sample_throughput,
target_device, target_device,
bool(auto_register_materials), bool(auto_register_materials),
@@ -569,11 +884,13 @@ class BioyondSirnaStation(BioyondWorkstation):
del timeout_seconds, assignee_user_ids, kwargs del timeout_seconds, assignee_user_ids, kwargs
rpc = self._require_hardware_interface("create_order") rpc = self._require_hardware_interface("create_order")
workflow = self._resolve_experiment_workflow(
# 自动解析实验1工作流无需用户指定workflow_name rpc,
workflow = self._resolve_experiment_1_workflow(rpc) workflow_name=workflow_name,
sub_workflow_name=sub_workflow_name,
)
logger.info( logger.info(
"小核酸实验1工作流已解析: root=%s(%s) sub=%s(%s)", "小核酸实验工作流已解析: root=%s(%s) sub=%s(%s)",
workflow.get("workflow_name", ""), workflow.get("workflow_name", ""),
workflow.get("root_workflow_id", ""), workflow.get("root_workflow_id", ""),
workflow.get("sub_workflow_name", ""), workflow.get("sub_workflow_name", ""),
@@ -581,28 +898,29 @@ class BioyondSirnaStation(BioyondWorkstation):
) )
step_data = rpc.workflow_step_query(workflow["sub_workflow_id"]) step_data = rpc.workflow_step_query(workflow["sub_workflow_id"])
parsed_overrides = self._parse_parameter_overrides_text(str(parameter_overrides or ""))
# Parse parameter_overrides from text format
parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides)
param_values, parameter_template = self._build_param_values_from_step_data( param_values, parameter_template = self._build_param_values_from_step_data(
step_data, step_data,
parameter_overrides=parsed_overrides, parameter_overrides=parsed_overrides,
include_all_task_displayable=True, include_all_task_displayable=False,
) )
if not param_values: if not param_values:
logger.error("小核酸实验1参数构建失败: LIMS 子工作流未返回可用 paramValues") logger.error("小核酸实验参数构建失败: LIMS 子工作流未返回可用 paramValues")
raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues") raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues")
param_entry_count = sum(len(entries) for entries in param_values.values()) param_entry_count = sum(len(entries) for entries in param_values.values())
logger.info( logger.info(
"小核酸实验1参数已构建: steps=%s entries=%s template_items=%s overrides=%s", "小核酸实验参数已构建: steps=%s entries=%s template_items=%s overrides=%s",
len(param_values), len(param_values),
param_entry_count, param_entry_count,
len(parameter_template), len(parameter_template),
len(parsed_overrides), len(parsed_overrides),
) )
resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name) resolved_order_code, resolved_order_name = self._build_bioyond_order_identity(
experiment_number=experiment_number,
order_code=order_code,
order_name=order_name,
)
order_payload = [ order_payload = [
{ {
"orderCode": resolved_order_code, "orderCode": resolved_order_code,
@@ -614,7 +932,7 @@ class BioyondSirnaStation(BioyondWorkstation):
} }
] ]
logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})") logger.info("正在提交小核酸实验: %s (%s)", resolved_order_name, resolved_order_code)
raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False)) raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False))
parsed_result = self._parse_lims_result(raw_result) parsed_result = self._parse_lims_result(raw_result)
material_records = self._extract_create_order_materials(parsed_result) material_records = self._extract_create_order_materials(parsed_result)
@@ -642,7 +960,7 @@ class BioyondSirnaStation(BioyondWorkstation):
} }
create_success = self._create_result_success(parsed_result, order_ids, material_records) create_success = self._create_result_success(parsed_result, order_ids, material_records)
logger.info( logger.info(
"小核酸实验1创建结果: success=%s order_ids=%s materials=%s " "小核酸实验创建结果: success=%s order_ids=%s materials=%s "
"suggested_locations=%s material_types=%s", "suggested_locations=%s material_types=%s",
create_success, create_success,
order_ids, order_ids,
@@ -652,29 +970,22 @@ class BioyondSirnaStation(BioyondWorkstation):
) )
if not create_success: if not create_success:
logger.error( logger.error(
"小核酸实验1创建未返回成功结果: order_code=%s order_ids=%s materials=%s", "小核酸实验创建未返回成功结果: order_code=%s order_ids=%s materials=%s",
resolved_order_code, resolved_order_code,
order_ids, order_ids,
len(material_records), len(material_records),
) )
elif not material_records: elif not material_records:
logger.warning("小核酸实验1创建成功但未返回物料分配记录: order_code=%s", resolved_order_code) logger.warning("小核酸实验创建成功但未返回物料分配记录: order_code=%s", resolved_order_code)
elif not suggested_locations: elif not suggested_locations:
logger.warning("小核酸实验1创建成功但未解析到推荐库位: order_code=%s", resolved_order_code) logger.warning("小核酸实验创建成功但未解析到推荐库位: order_code=%s", resolved_order_code)
registration_result = { registration_result = {
"registered": [], "registered": [],
"skipped": material_records, "skipped": material_records,
"reason": "submit_experiment_1_resource_sync_disabled", "reason": f"{action_name}_resource_sync_disabled",
"auto_register_materials": bool(auto_register_materials), "auto_register_materials": bool(auto_register_materials),
} }
# if auto_register_materials and material_records:
# try:
# registration_result = self._register_materials_to_tree(material_records)
# logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树")
# except Exception as e:
# logger.error(f"物料注册失败: {e}")
# registration_result = {"error": str(e)}
result = { result = {
"success": create_success, "success": create_success,
@@ -703,7 +1014,8 @@ class BioyondSirnaStation(BioyondWorkstation):
} }
result.update(result["manual_load_probe"]) result.update(result["manual_load_probe"])
logger.info( logger.info(
"小核酸实验1提交完成: order_code=%s success=%s manual_load_rows=%s", "小核酸实验提交完成: action=%s order_code=%s success=%s manual_load_rows=%s",
action_name,
resolved_order_code, resolved_order_code,
result["success"], result["success"],
{ {
@@ -1007,6 +1319,9 @@ class BioyondSirnaStation(BioyondWorkstation):
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Guided manual-unload checkpoint that clears the deck after confirmation. """Guided manual-unload checkpoint that clears the deck after confirmation.
TODO: 未来应改成监听 Bioyond order_finish/finish signal 后触发的卸载向导,
而不是由工作流显式调用的本地 deck 清理节点。
Mirrors :meth:`start_experiment` in shape: three boolean gates default Mirrors :meth:`start_experiment` in shape: three boolean gates default
to ``False``, three sibling-array display tables (Sample / Consumables to ``False``, three sibling-array display tables (Sample / Consumables
/ Reagent × material_name / material_code / location / quantity) / Reagent × material_name / material_code / location / quantity)
@@ -1627,12 +1942,8 @@ class BioyondSirnaStation(BioyondWorkstation):
workflow_name: str = "", workflow_name: str = "",
sub_workflow_name: str = "", sub_workflow_name: str = "",
) -> Dict[str, str]: ) -> Dict[str, str]:
workflow_name = workflow_name or self._config_value("experiment_1_workflow_name", "sirna_exp1_workflow_name") or "" workflow_name = workflow_name or ""
sub_workflow_name = ( sub_workflow_name = sub_workflow_name or ""
sub_workflow_name
or self._config_value("experiment_1_sub_workflow_name", "sirna_exp1_sub_workflow_name")
or ""
)
workflow_query = {"type": 0, "filter": workflow_name or sub_workflow_name, "includeDetail": True} workflow_query = {"type": 0, "filter": workflow_name or sub_workflow_name, "includeDetail": True}
logger.info("正在解析小核酸工作流: filter=%r", workflow_query["filter"]) logger.info("正在解析小核酸工作流: filter=%r", workflow_query["filter"])
workflow_data = rpc.query_workflow(json.dumps(workflow_query, ensure_ascii=False)) workflow_data = rpc.query_workflow(json.dumps(workflow_query, ensure_ascii=False))
@@ -1671,9 +1982,6 @@ class BioyondSirnaStation(BioyondWorkstation):
"sub_workflow_id": sub_id, "sub_workflow_id": sub_id,
} }
def _resolve_experiment_1_workflow(self, rpc: Any) -> Dict[str, str]:
return self._resolve_experiment_workflow(rpc)
def _build_param_values_from_step_data( def _build_param_values_from_step_data(
self, self,
step_data: Any, step_data: Any,
@@ -1686,6 +1994,8 @@ class BioyondSirnaStation(BioyondWorkstation):
raise RuntimeError("workflow_step_query 未返回可解析的步骤参数") raise RuntimeError("workflow_step_query 未返回可解析的步骤参数")
param_values: Dict[str, List[Dict[str, Any]]] = {} param_values: Dict[str, List[Dict[str, Any]]] = {}
parameter_template: List[Dict[str, Any]] = [] parameter_template: List[Dict[str, Any]] = []
override_items = self._parameter_override_items(parameter_overrides)
override_keys = {key for key, _ in override_items}
for step_id, value in parameter_map.items(): for step_id, value in parameter_map.items():
if not self._looks_like_uuid(step_id): if not self._looks_like_uuid(step_id):
continue continue
@@ -1716,10 +2026,12 @@ class BioyondSirnaStation(BioyondWorkstation):
parameter_template.append(template_item) parameter_template.append(template_item)
if parameter_type.lower() == "hidden" or task_displayable == 0: if parameter_type.lower() == "hidden" or task_displayable == 0:
continue continue
if not include_all_task_displayable and key != "protocolName": is_required_default = key == "protocolName"
is_explicit_override = key in override_keys
if not include_all_task_displayable and not is_required_default and not is_explicit_override:
continue continue
value_for_create_order = self._value_for_create_order(parameter) value_for_create_order = self._value_for_create_order(parameter)
if self._is_blank(value_for_create_order): if self._is_blank(value_for_create_order) and not is_explicit_override:
continue continue
entry: Dict[str, Any] = {"key": key, "value": "" if self._is_blank(value_for_create_order) else str(value_for_create_order)} entry: Dict[str, Any] = {"key": key, "value": "" if self._is_blank(value_for_create_order) else str(value_for_create_order)}
m_value = parameter.get("m", module_m) m_value = parameter.get("m", module_m)
@@ -1731,7 +2043,7 @@ class BioyondSirnaStation(BioyondWorkstation):
entries.append(entry) entries.append(entry)
if entries: if entries:
param_values[str(step_id)] = entries param_values[str(step_id)] = entries
self._apply_parameter_overrides(param_values, parameter_overrides) self._apply_parameter_overrides(param_values, override_items)
return param_values, parameter_template return param_values, parameter_template
def _value_for_create_order(self, parameter: Dict[str, Any]) -> Any: def _value_for_create_order(self, parameter: Dict[str, Any]) -> Any:
@@ -1746,24 +2058,9 @@ class BioyondSirnaStation(BioyondWorkstation):
param_values: Dict[str, List[Dict[str, Any]]], param_values: Dict[str, List[Dict[str, Any]]],
overrides: Any, overrides: Any,
) -> None: ) -> None:
if not overrides: override_items = self._parameter_override_items(overrides)
if not override_items:
return return
if isinstance(overrides, dict):
override_items = [(str(key), value) for key, value in overrides.items()]
else:
override_items = []
for override in self._as_list(overrides):
if not override:
continue
if isinstance(override, dict):
override_items.extend((str(key), value) for key, value in override.items())
continue
if "=" not in str(override):
logger.error("参数覆盖格式错误: override=%r", override)
raise ValueError(f"参数覆盖必须使用 key=value 格式: {override!r}")
key, value = str(override).split("=", 1)
override_items.append((key, value))
for key, value in override_items: for key, value in override_items:
matched = False matched = False
for entries in param_values.values(): for entries in param_values.values():
@@ -1775,25 +2072,63 @@ class BioyondSirnaStation(BioyondWorkstation):
logger.error("参数覆盖未命中 paramValues: key=%s", key) logger.error("参数覆盖未命中 paramValues: key=%s", key)
raise ValueError(f"paramValues 中找不到可覆盖参数: {key}") raise ValueError(f"paramValues 中找不到可覆盖参数: {key}")
def _parameter_override_items(self, overrides: Any) -> List[Tuple[str, Any]]:
if not overrides:
return []
if isinstance(overrides, dict):
return [(str(key), value) for key, value in overrides.items() if not self._is_blank(value)]
override_items: List[Tuple[str, Any]] = []
for override in self._as_list(overrides):
if not override:
continue
if isinstance(override, dict):
override_items.extend(
(str(key), value) for key, value in override.items() if not self._is_blank(value)
)
continue
if isinstance(override, (list, tuple)) and len(override) == 2:
if self._is_blank(override[1]):
continue
override_items.append((str(override[0]), override[1]))
continue
if "=" not in str(override):
logger.error("参数覆盖格式错误: override=%r", override)
raise ValueError(f"参数覆盖必须使用 key=value 格式: {override!r}")
key, value = str(override).split("=", 1)
if self._is_blank(value):
continue
override_items.append((key, value))
return override_items
def _build_bioyond_order_identity( def _build_bioyond_order_identity(
self, self,
order_code: str = "", order_code: str = "",
order_name: str = "", order_name: str = "",
experiment_number: Optional[int] = None,
) -> Tuple[str, str]: ) -> Tuple[str, str]:
if order_code and order_name: if order_code and order_name:
return order_code, order_name return order_code, order_name
prefix = self._config_value( prefix_keys: List[str] = []
"experiment_1_order_prefix", if experiment_number is not None:
"sirna_exp1_order_prefix", prefix_keys.extend(
"sirna_exp1_order_code_prefix", [
) or "test" f"experiment_{experiment_number}_order_prefix",
f"sirna_exp{experiment_number}_order_prefix",
f"sirna_exp{experiment_number}_order_code_prefix",
]
)
prefix_keys.extend(
[
"experiment_1_order_prefix",
"sirna_exp1_order_prefix",
"sirna_exp1_order_code_prefix",
]
)
prefix = self._config_value(*prefix_keys) or "test"
suffix = datetime.now().strftime("%m%d%H%M%S") suffix = datetime.now().strftime("%m%d%H%M%S")
value = f"{prefix}{suffix}" value = f"{prefix}{suffix}"
return order_code or value, order_name or value return order_code or value, order_name or value
def _build_experiment1_order_identity(self) -> Tuple[str, str]:
return self._build_bioyond_order_identity()
def _parse_experiment1_create_result(self, result: Any, order_code: Optional[str] = None) -> Dict[str, Any]: def _parse_experiment1_create_result(self, result: Any, order_code: Optional[str] = None) -> Dict[str, Any]:
parsed_result = self._parse_lims_result(result) parsed_result = self._parse_lims_result(result)
material_records = self._extract_create_order_materials(parsed_result) material_records = self._extract_create_order_materials(parsed_result)
@@ -1858,6 +2193,37 @@ class BioyondSirnaStation(BioyondWorkstation):
result["error"] = str(exc) result["error"] = str(exc)
return result return result
def _run_scheduler_action(
self,
method_name: str,
action_label: str,
**kwargs: Any,
) -> Dict[str, Any]:
"""统一封装直接调度器动作,保持 station action 只是薄包装。"""
with self._debug_call_session(method_name):
api_host = self._kwarg_text(kwargs, "api_host")
api_key = self._kwarg_text(kwargs, "api_key")
self._update_runtime_api_config(api_host=api_host, api_key=api_key)
rpc = self._require_hardware_interface(method_name)
logger.info("正在%s小核酸调度器: method=%s", action_label, method_name)
result = getattr(rpc, method_name)()
success = result == 1
if success:
logger.info("小核酸调度器%s成功: result=%s", action_label, result)
else:
logger.error("小核酸调度器%s失败或返回非成功码: result=%s", action_label, result)
return {
"success": success,
"return_info": result,
f"{method_name}_result": result,
"scheduler_action": method_name,
"confirmation_message": (
f"调度器{action_label}成功"
if success
else f"调度器{action_label}失败,请检查 LIMS 状态"
),
}
def _run_reset_operations( def _run_reset_operations(
self, self,
rpc: Any, rpc: Any,