From d009863c8c491758ea2fca021188ba16ebffaa42 Mon Sep 17 00:00:00 2001 From: yxz321 Date: Fri, 8 May 2026 23:44:21 +0800 Subject: [PATCH] feat: RNA add guided manual unload end_experiment action - Add end_experiment manual_confirm action mirroring start_experiment, with three boolean operator gates and twelve EXECUTOR sibling-array output handles for unloaded material manifests. - Add helpers _build_unload_materials_by_type, _classify_labware_mode, _iter_reagent_liquids, and _clear_unloaded_materials. - Clear unloaded slots and zero reagent liquid contents on confirmation while preserving trough labware; publish single resource tree update after mutations. - Wrap action body in _debug_call_session("end_experiment") for opt-in raw call capture. --- .../sirna_station/sirna_station.py | 406 ++++++++++++++++++ 1 file changed, 406 insertions(+) diff --git a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py index d3c6e2f5..6e71ae8e 100644 --- a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py +++ b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py @@ -755,6 +755,157 @@ class BioyondSirnaStation(BioyondWorkstation): "confirmation_message": "调度器启动成功" if result == 1 else "调度器启动失败,请检查 LIMS 状态", }) + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={ + "order_id": "", + "samples_unloaded": False, + "consumables_unloaded": False, + "reagents_unloaded": False, + "timeout_seconds": 3600, + "assignee_user_ids": [], + }, + feedback_interval=300, + description=( + "请按当前 deck 状态卸载实验物料;勾选三类卸载确认后才会清空资源树。" + " 若上游 reagent-as-liquid 同步未启用,试剂列可能为空,仅展示 labware。" + ), + handles=[ + ActionOutputHandle( + key="unload_samples_name", data_type="array", + label="样本-物料名称", data_key="unload_tables.Sample.material_name", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_samples_code", data_type="array", + label="样本-物料编号", data_key="unload_tables.Sample.material_code", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_samples_location", data_type="array", + label="样本-库位", data_key="unload_tables.Sample.location", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_samples_quantity", data_type="array", + label="样本-数量", data_key="unload_tables.Sample.quantity", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_consumables_name", data_type="array", + label="耗材-物料名称", data_key="unload_tables.Consumables.material_name", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_consumables_code", data_type="array", + label="耗材-物料编号", data_key="unload_tables.Consumables.material_code", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_consumables_location", data_type="array", + label="耗材-库位", data_key="unload_tables.Consumables.location", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_consumables_quantity", data_type="array", + label="耗材-数量", data_key="unload_tables.Consumables.quantity", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_reagents_name", data_type="array", + label="试剂-物料名称", data_key="unload_tables.Reagent.material_name", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_reagents_code", data_type="array", + label="试剂-物料编号", data_key="unload_tables.Reagent.material_code", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_reagents_location", data_type="array", + label="试剂-库位", data_key="unload_tables.Reagent.location", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="unload_reagents_quantity", data_type="array", + label="试剂-数量", data_key="unload_tables.Reagent.quantity", + data_source=DataSource.EXECUTOR, + ), + ], + ) + def end_experiment( + self, + order_id: str = "", + samples_unloaded: bool = False, + consumables_unloaded: bool = False, + reagents_unloaded: bool = False, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """Guided manual-unload checkpoint that clears the deck after confirmation. + + Mirrors :meth:`start_experiment` in shape: three boolean gates default + to ``False``, three sibling-array display tables (Sample / Consumables + / Reagent × material_name / material_code / location / quantity) + populated from current deck state, and a raise-on-missing-gate path. + + Args: + order_id: 可选;只展示 ``unilabos_extra["order_id"]`` 等于该值的物料。 + 未指定则覆盖整个 deck。 + samples_unloaded: 操作员勾选确认 ``Sample`` 类已卸载。 + consumables_unloaded: 操作员勾选确认 ``Consumables`` 类已卸载。 + reagents_unloaded: 操作员勾选确认 ``Reagent`` 类已卸载(含 trough 试剂内容)。 + timeout_seconds: 框架超时时间(秒)。 + assignee_user_ids: 框架分配用户列表。 + """ + with self._debug_call_session("end_experiment"): + del timeout_seconds, assignee_user_ids, kwargs + + order_filter = order_id.strip() if isinstance(order_id, str) else "" + manifest = self._build_unload_materials_by_type( + order_id_filter=order_filter or None, + ) + unload_tables = self._build_manual_load_tables(manifest) + + category_specs = [ + ("Sample", self._as_manual_gate(samples_unloaded), "样本", "samples_unloaded"), + ("Consumables", self._as_manual_gate(consumables_unloaded), "耗材", "consumables_unloaded"), + ("Reagent", self._as_manual_gate(reagents_unloaded), "试剂", "reagents_unloaded"), + ] + gates: Dict[str, Dict[str, Any]] = {} + missing_labels: List[str] = [] + for mode_key, ticked, label, gate_key in category_specs: + required = bool(manifest.get(mode_key)) + gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)} + if required and not ticked: + missing_labels.append(label) + if missing_labels: + raise RuntimeError( + f"以下分类卸载尚未确认,无法清空资源树: {', '.join(missing_labels)}" + ) + + cleared = self._clear_unloaded_materials(manifest) + + # Refresh the table snapshot so consumers see the post-mutation state. + post_manifest = self._build_unload_materials_by_type( + order_id_filter=order_filter or None, + ) + + return { + "success": True, + "order_id": order_filter, + "unloaded": cleared, + "gates": gates, + "unload_tables": unload_tables, + "post_unload_tables": self._build_manual_load_tables(post_manifest), + "confirmation_message": ( + "deck 已按操作员确认清空" if any(cleared.values()) else "deck 当前为空,无需卸载" + ), + } + @action( always_free=True, node_type=NodeType.MANUAL_CONFIRM, @@ -1747,6 +1898,261 @@ class BioyondSirnaStation(BioyondWorkstation): "machine_name": "", } + def _classify_labware_mode(self, child: Any) -> str: + """Map a placed labware to ``Sample`` / ``Consumables`` / ``Reagent``. + + Resolution order: + + 1. ``unilabos_extra["material_bioyond_type_mode"]`` if previously set + by the load path (``_register_materials_to_tree``) — most reliable. + 2. PLR class identity: troughs map to ``Reagent``; tip racks map to + ``Consumables``; everything else (plates, tubes, etc.) defaults + to ``Sample``. + + The classifier is intentionally permissive: an unrecognized class + falls back to ``Sample`` so the operator still sees the row instead + of the unload silently dropping it. + """ + extra = getattr(child, "unilabos_extra", None) or {} + if isinstance(extra, dict): + mode = str(extra.get("material_bioyond_type_mode") or "") + if mode in {"Sample", "Consumables", "Reagent"}: + return mode + try: + from unilabos.resources.bioyond.sirna_materials import ( + BioyondSirna_ReagentTrough, + ) + except Exception: # pragma: no cover - lightweight env + BioyondSirna_ReagentTrough = None # type: ignore[assignment] + if BioyondSirna_ReagentTrough is not None and isinstance(child, BioyondSirna_ReagentTrough): + return "Reagent" + category_attr = getattr(child, "category", "") or "" + category = str(category_attr).lower() + if "tip" in category: + return "Consumables" + if "trough" in category or "reagent" in category: + return "Reagent" + # Class-name heuristic for environments without category metadata. + cls_name = type(child).__name__.lower() + if "tip" in cls_name: + return "Consumables" + if "trough" in cls_name or "reagent" in cls_name: + return "Reagent" + return "Sample" + + def _iter_reagent_liquids(self, parent: Any) -> Iterable[Dict[str, Any]]: + """Yield reagent-content rows attached to ``parent`` via ``_attach_liquid_to_parent``. + + Cross-references ``parent.unilabos_extra["reagent_bioyond_ids"]`` (the + structured Bioyond metadata source-of-truth) with ``parent.tracker.liquids`` + when present. Tolerates missing metadata so this helper is usable even + on legacy decks that predate the reagent-as-liquid plan. + """ + extra = getattr(parent, "unilabos_extra", None) or {} + reagent_ids: List[Any] = [] + if isinstance(extra, dict): + reagent_ids = list(extra.get("reagent_bioyond_ids") or []) + + parent_name = getattr(parent, "name", "") or "" + warehouse = getattr(parent, "parent", None) + warehouse_name = getattr(warehouse, "name", "") or "" + location_label = f"{warehouse_name}/{parent_name}" if warehouse_name else parent_name + + for entry in reagent_ids: + if not isinstance(entry, dict): + continue + yield { + "materialName": str(entry.get("material_bioyond_name") or ""), + "materialCode": str(entry.get("material_bioyond_code") or ""), + "materialTypeCode": str(entry.get("material_bioyond_type_code") or ""), + "materialTypeMode": "Reagent", + "locationCode": "", + "locationShowName": location_label, + "quantity": entry.get("quantity") or "", + "trough": parent, + "material_bioyond_id": str(entry.get("material_bioyond_id") or ""), + } + + def _build_unload_materials_by_type( + self, + *, + order_id_filter: Optional[str] = None, + ) -> Dict[str, List[Dict[str, Any]]]: + """Walk ``self.deck`` and emit a Bioyond-shaped material list grouped by mode. + + Output rows expose ``materialName / materialCode / locationCode / + locationShowName / quantity / materialTypeMode`` keys, mirroring the + fields :meth:`_build_manual_load_tables` consumes. No side effects. + + Args: + order_id_filter: When provided, slot-bound labware whose + ``unilabos_extra["order_id"]`` does not equal this value is + skipped. Reagent contents are not order-scoped. + """ + grouped: Dict[str, List[Dict[str, Any]]] = { + "Sample": [], + "Consumables": [], + "Reagent": [], + } + deck = getattr(self, "deck", None) + if deck is None: + return grouped + for warehouse in getattr(deck, "children", []) or []: + ordering = getattr(warehouse, "_ordering", None) + if not isinstance(ordering, dict): + continue + for slot_label in ordering.keys(): + try: + child = warehouse[slot_label] + except (KeyError, IndexError, TypeError): + continue + # Distinguish placed labware from unoccupied ResourceHolder + # variants. Empty slots show as None on this carrier. + if child is None: + continue + if not hasattr(child, "tracker") and not hasattr(child, "children"): + continue + # Plain ResourceHolder placeholders for empty sites are skipped. + if type(child).__name__ == "ResourceHolder" and not list(getattr(child, "children", []) or []): + continue + + extra = getattr(child, "unilabos_extra", None) or {} + if not isinstance(extra, dict): + extra = {} + if order_id_filter and extra.get("order_id") and extra.get("order_id") != order_id_filter: + continue + + mode = self._classify_labware_mode(child) + row = { + "materialName": str(extra.get("material_bioyond_name") or getattr(child, "name", "") or ""), + "materialCode": str(extra.get("material_bioyond_code") or getattr(child, "name", "") or ""), + "materialTypeCode": str(extra.get("material_bioyond_type_code") or ""), + "materialTypeMode": mode, + "locationCode": str(extra.get("location_code") or slot_label or ""), + "locationShowName": f"{getattr(warehouse, 'name', '')}/{slot_label}", + "quantity": getattr(child, "quantity", 1), + "_warehouse": warehouse, + "_slot_label": str(slot_label), + "_child": child, + } + grouped.setdefault(mode, []).append(row) + + # Reagent contents on this trough/labware (from reagent_bioyond_ids). + for liquid_row in self._iter_reagent_liquids(child): + grouped["Reagent"].append(liquid_row) + return grouped + + def _clear_unloaded_materials( + self, + manifest: Dict[str, List[Dict[str, Any]]], + ) -> Dict[str, List[Dict[str, Any]]]: + """Apply the unload mutation: clear slot-bound labware, zero reagent liquids. + + Returns a dict shaped like the manifest but with each row reduced to + the minimal fields downstream consumers need (material code/name, the + warehouse name, and the slot label or trough name). + """ + cleared: Dict[str, List[Dict[str, Any]]] = { + "Sample": [], + "Consumables": [], + "Reagent": [], + } + affected_warehouses: List[Any] = [] + + def _remember(warehouse: Any) -> None: + if warehouse is None: + return + if any(warehouse is existing for existing in affected_warehouses): + return + affected_warehouses.append(warehouse) + + # 1) Slot-bound labware (Sample / Consumables). + for mode in ("Sample", "Consumables"): + for row in manifest.get(mode, []): + warehouse = row.get("_warehouse") + slot_label = row.get("_slot_label") or row.get("locationCode") or "" + if warehouse is None and slot_label: + try: + warehouse, _idx = self._resolve_location_to_warehouse(slot_label) + except (ValueError, RuntimeError) as exc: + logger.warning( + "卸载物料 %s 时无法解析库位 %s: %s", + row.get("materialCode"), slot_label, exc, + ) + continue + if warehouse is None or not slot_label: + continue + try: + if warehouse[slot_label] is not None: + warehouse[slot_label] = None + cleared[mode].append({ + "material_code": str(row.get("materialCode") or ""), + "material_name": str(row.get("materialName") or ""), + "location": slot_label, + "warehouse": getattr(warehouse, "name", ""), + }) + _remember(warehouse) + except (IndexError, KeyError, TypeError) as exc: + logger.warning( + "卸载物料 %s 时清空库位 %s[%s] 失败: %s", + row.get("materialCode"), getattr(warehouse, "name", "?"), + slot_label, exc, + ) + + # 2) Reagent rows: contents on a parent trough vs slot-bound trough labware. + for row in manifest.get("Reagent", []): + trough = row.get("trough") + if trough is not None: + tracker = getattr(trough, "tracker", None) + if tracker is not None and hasattr(tracker, "set_liquids"): + try: + tracker.set_liquids([]) + except Exception as exc: # pragma: no cover - tracker variants + logger.debug("tracker.set_liquids 失败(非阻塞): %s", exc) + extra = getattr(trough, "unilabos_extra", None) + if isinstance(extra, dict): + extra["reagent_bioyond_ids"] = [] + try: + setattr(trough, "unilabos_extra", extra) + except Exception: # pragma: no cover - read-only proxy + pass + cleared["Reagent"].append({ + "material_name": str(row.get("materialName") or ""), + "material_code": str(row.get("materialCode") or ""), + "trough": getattr(trough, "name", ""), + "warehouse": getattr(getattr(trough, "parent", None), "name", ""), + }) + _remember(getattr(trough, "parent", None)) + continue + + # Slot-bound Reagent labware (e.g. trough labware). Preserve the + # labware on the deck — only its contents (liquids + reagent ids) + # are zeroed. ``reset`` is the LIMS-side path for full removal. + child = row.get("_child") + if child is not None: + tracker = getattr(child, "tracker", None) + if tracker is not None and hasattr(tracker, "set_liquids"): + try: + tracker.set_liquids([]) + except Exception as exc: # pragma: no cover - tracker variants + logger.debug("tracker.set_liquids 失败(非阻塞): %s", exc) + extra = getattr(child, "unilabos_extra", None) + if isinstance(extra, dict) and extra.get("reagent_bioyond_ids"): + extra["reagent_bioyond_ids"] = [] + try: + setattr(child, "unilabos_extra", extra) + except Exception: # pragma: no cover - read-only proxy + pass + _remember(row.get("_warehouse") or getattr(child, "parent", None)) + continue + + # 3) Publish the deck after mutations. ``_publish_resource_tree_update`` + # publishes the whole deck, so a single call covers all warehouses we + # touched. ``affected_warehouses`` is kept for telemetry / tests. + if affected_warehouses: + self._publish_resource_tree_update() + return cleared + def _build_manual_load_probe( self, materials_by_type: Dict[str, List[Dict[str, Any]]],