feat: RNA add guided manual unload end_experiment action

- Add end_experiment manual_confirm action mirroring start_experiment, with three boolean operator gates and twelve EXECUTOR sibling-array output handles for unloaded material manifests.
- Add helpers _build_unload_materials_by_type, _classify_labware_mode, _iter_reagent_liquids, and _clear_unloaded_materials.
- Clear unloaded slots and zero reagent liquid contents on confirmation while preserving trough labware; publish single resource tree update after mutations.
- Wrap action body in _debug_call_session("end_experiment") for opt-in raw call capture.
This commit is contained in:
yxz321
2026-05-08 23:44:21 +08:00
parent cae828ce74
commit d009863c8c

View File

@@ -755,6 +755,157 @@ class BioyondSirnaStation(BioyondWorkstation):
"confirmation_message": "调度器启动成功" if result == 1 else "调度器启动失败,请检查 LIMS 状态",
})
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={
"order_id": "",
"samples_unloaded": False,
"consumables_unloaded": False,
"reagents_unloaded": False,
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description=(
"请按当前 deck 状态卸载实验物料;勾选三类卸载确认后才会清空资源树。"
" 若上游 reagent-as-liquid 同步未启用,试剂列可能为空,仅展示 labware。"
),
handles=[
ActionOutputHandle(
key="unload_samples_name", data_type="array",
label="样本-物料名称", data_key="unload_tables.Sample.material_name",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_samples_code", data_type="array",
label="样本-物料编号", data_key="unload_tables.Sample.material_code",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_samples_location", data_type="array",
label="样本-库位", data_key="unload_tables.Sample.location",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_samples_quantity", data_type="array",
label="样本-数量", data_key="unload_tables.Sample.quantity",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_consumables_name", data_type="array",
label="耗材-物料名称", data_key="unload_tables.Consumables.material_name",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_consumables_code", data_type="array",
label="耗材-物料编号", data_key="unload_tables.Consumables.material_code",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_consumables_location", data_type="array",
label="耗材-库位", data_key="unload_tables.Consumables.location",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_consumables_quantity", data_type="array",
label="耗材-数量", data_key="unload_tables.Consumables.quantity",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_reagents_name", data_type="array",
label="试剂-物料名称", data_key="unload_tables.Reagent.material_name",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_reagents_code", data_type="array",
label="试剂-物料编号", data_key="unload_tables.Reagent.material_code",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_reagents_location", data_type="array",
label="试剂-库位", data_key="unload_tables.Reagent.location",
data_source=DataSource.EXECUTOR,
),
ActionOutputHandle(
key="unload_reagents_quantity", data_type="array",
label="试剂-数量", data_key="unload_tables.Reagent.quantity",
data_source=DataSource.EXECUTOR,
),
],
)
def end_experiment(
self,
order_id: str = "",
samples_unloaded: bool = False,
consumables_unloaded: bool = False,
reagents_unloaded: bool = False,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""Guided manual-unload checkpoint that clears the deck after confirmation.
Mirrors :meth:`start_experiment` in shape: three boolean gates default
to ``False``, three sibling-array display tables (Sample / Consumables
/ Reagent × material_name / material_code / location / quantity)
populated from current deck state, and a raise-on-missing-gate path.
Args:
order_id: 可选;只展示 ``unilabos_extra["order_id"]`` 等于该值的物料。
未指定则覆盖整个 deck。
samples_unloaded: 操作员勾选确认 ``Sample`` 类已卸载。
consumables_unloaded: 操作员勾选确认 ``Consumables`` 类已卸载。
reagents_unloaded: 操作员勾选确认 ``Reagent`` 类已卸载(含 trough 试剂内容)。
timeout_seconds: 框架超时时间(秒)。
assignee_user_ids: 框架分配用户列表。
"""
with self._debug_call_session("end_experiment"):
del timeout_seconds, assignee_user_ids, kwargs
order_filter = order_id.strip() if isinstance(order_id, str) else ""
manifest = self._build_unload_materials_by_type(
order_id_filter=order_filter or None,
)
unload_tables = self._build_manual_load_tables(manifest)
category_specs = [
("Sample", self._as_manual_gate(samples_unloaded), "样本", "samples_unloaded"),
("Consumables", self._as_manual_gate(consumables_unloaded), "耗材", "consumables_unloaded"),
("Reagent", self._as_manual_gate(reagents_unloaded), "试剂", "reagents_unloaded"),
]
gates: Dict[str, Dict[str, Any]] = {}
missing_labels: List[str] = []
for mode_key, ticked, label, gate_key in category_specs:
required = bool(manifest.get(mode_key))
gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)}
if required and not ticked:
missing_labels.append(label)
if missing_labels:
raise RuntimeError(
f"以下分类卸载尚未确认,无法清空资源树: {', '.join(missing_labels)}"
)
cleared = self._clear_unloaded_materials(manifest)
# Refresh the table snapshot so consumers see the post-mutation state.
post_manifest = self._build_unload_materials_by_type(
order_id_filter=order_filter or None,
)
return {
"success": True,
"order_id": order_filter,
"unloaded": cleared,
"gates": gates,
"unload_tables": unload_tables,
"post_unload_tables": self._build_manual_load_tables(post_manifest),
"confirmation_message": (
"deck 已按操作员确认清空" if any(cleared.values()) else "deck 当前为空,无需卸载"
),
}
@action(
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
@@ -1747,6 +1898,261 @@ class BioyondSirnaStation(BioyondWorkstation):
"machine_name": "",
}
def _classify_labware_mode(self, child: Any) -> str:
"""Map a placed labware to ``Sample`` / ``Consumables`` / ``Reagent``.
Resolution order:
1. ``unilabos_extra["material_bioyond_type_mode"]`` if previously set
by the load path (``_register_materials_to_tree``) — most reliable.
2. PLR class identity: troughs map to ``Reagent``; tip racks map to
``Consumables``; everything else (plates, tubes, etc.) defaults
to ``Sample``.
The classifier is intentionally permissive: an unrecognized class
falls back to ``Sample`` so the operator still sees the row instead
of the unload silently dropping it.
"""
extra = getattr(child, "unilabos_extra", None) or {}
if isinstance(extra, dict):
mode = str(extra.get("material_bioyond_type_mode") or "")
if mode in {"Sample", "Consumables", "Reagent"}:
return mode
try:
from unilabos.resources.bioyond.sirna_materials import (
BioyondSirna_ReagentTrough,
)
except Exception: # pragma: no cover - lightweight env
BioyondSirna_ReagentTrough = None # type: ignore[assignment]
if BioyondSirna_ReagentTrough is not None and isinstance(child, BioyondSirna_ReagentTrough):
return "Reagent"
category_attr = getattr(child, "category", "") or ""
category = str(category_attr).lower()
if "tip" in category:
return "Consumables"
if "trough" in category or "reagent" in category:
return "Reagent"
# Class-name heuristic for environments without category metadata.
cls_name = type(child).__name__.lower()
if "tip" in cls_name:
return "Consumables"
if "trough" in cls_name or "reagent" in cls_name:
return "Reagent"
return "Sample"
def _iter_reagent_liquids(self, parent: Any) -> Iterable[Dict[str, Any]]:
"""Yield reagent-content rows attached to ``parent`` via ``_attach_liquid_to_parent``.
Cross-references ``parent.unilabos_extra["reagent_bioyond_ids"]`` (the
structured Bioyond metadata source-of-truth) with ``parent.tracker.liquids``
when present. Tolerates missing metadata so this helper is usable even
on legacy decks that predate the reagent-as-liquid plan.
"""
extra = getattr(parent, "unilabos_extra", None) or {}
reagent_ids: List[Any] = []
if isinstance(extra, dict):
reagent_ids = list(extra.get("reagent_bioyond_ids") or [])
parent_name = getattr(parent, "name", "") or ""
warehouse = getattr(parent, "parent", None)
warehouse_name = getattr(warehouse, "name", "") or ""
location_label = f"{warehouse_name}/{parent_name}" if warehouse_name else parent_name
for entry in reagent_ids:
if not isinstance(entry, dict):
continue
yield {
"materialName": str(entry.get("material_bioyond_name") or ""),
"materialCode": str(entry.get("material_bioyond_code") or ""),
"materialTypeCode": str(entry.get("material_bioyond_type_code") or ""),
"materialTypeMode": "Reagent",
"locationCode": "",
"locationShowName": location_label,
"quantity": entry.get("quantity") or "",
"trough": parent,
"material_bioyond_id": str(entry.get("material_bioyond_id") or ""),
}
def _build_unload_materials_by_type(
self,
*,
order_id_filter: Optional[str] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""Walk ``self.deck`` and emit a Bioyond-shaped material list grouped by mode.
Output rows expose ``materialName / materialCode / locationCode /
locationShowName / quantity / materialTypeMode`` keys, mirroring the
fields :meth:`_build_manual_load_tables` consumes. No side effects.
Args:
order_id_filter: When provided, slot-bound labware whose
``unilabos_extra["order_id"]`` does not equal this value is
skipped. Reagent contents are not order-scoped.
"""
grouped: Dict[str, List[Dict[str, Any]]] = {
"Sample": [],
"Consumables": [],
"Reagent": [],
}
deck = getattr(self, "deck", None)
if deck is None:
return grouped
for warehouse in getattr(deck, "children", []) or []:
ordering = getattr(warehouse, "_ordering", None)
if not isinstance(ordering, dict):
continue
for slot_label in ordering.keys():
try:
child = warehouse[slot_label]
except (KeyError, IndexError, TypeError):
continue
# Distinguish placed labware from unoccupied ResourceHolder
# variants. Empty slots show as None on this carrier.
if child is None:
continue
if not hasattr(child, "tracker") and not hasattr(child, "children"):
continue
# Plain ResourceHolder placeholders for empty sites are skipped.
if type(child).__name__ == "ResourceHolder" and not list(getattr(child, "children", []) or []):
continue
extra = getattr(child, "unilabos_extra", None) or {}
if not isinstance(extra, dict):
extra = {}
if order_id_filter and extra.get("order_id") and extra.get("order_id") != order_id_filter:
continue
mode = self._classify_labware_mode(child)
row = {
"materialName": str(extra.get("material_bioyond_name") or getattr(child, "name", "") or ""),
"materialCode": str(extra.get("material_bioyond_code") or getattr(child, "name", "") or ""),
"materialTypeCode": str(extra.get("material_bioyond_type_code") or ""),
"materialTypeMode": mode,
"locationCode": str(extra.get("location_code") or slot_label or ""),
"locationShowName": f"{getattr(warehouse, 'name', '')}/{slot_label}",
"quantity": getattr(child, "quantity", 1),
"_warehouse": warehouse,
"_slot_label": str(slot_label),
"_child": child,
}
grouped.setdefault(mode, []).append(row)
# Reagent contents on this trough/labware (from reagent_bioyond_ids).
for liquid_row in self._iter_reagent_liquids(child):
grouped["Reagent"].append(liquid_row)
return grouped
def _clear_unloaded_materials(
self,
manifest: Dict[str, List[Dict[str, Any]]],
) -> Dict[str, List[Dict[str, Any]]]:
"""Apply the unload mutation: clear slot-bound labware, zero reagent liquids.
Returns a dict shaped like the manifest but with each row reduced to
the minimal fields downstream consumers need (material code/name, the
warehouse name, and the slot label or trough name).
"""
cleared: Dict[str, List[Dict[str, Any]]] = {
"Sample": [],
"Consumables": [],
"Reagent": [],
}
affected_warehouses: List[Any] = []
def _remember(warehouse: Any) -> None:
if warehouse is None:
return
if any(warehouse is existing for existing in affected_warehouses):
return
affected_warehouses.append(warehouse)
# 1) Slot-bound labware (Sample / Consumables).
for mode in ("Sample", "Consumables"):
for row in manifest.get(mode, []):
warehouse = row.get("_warehouse")
slot_label = row.get("_slot_label") or row.get("locationCode") or ""
if warehouse is None and slot_label:
try:
warehouse, _idx = self._resolve_location_to_warehouse(slot_label)
except (ValueError, RuntimeError) as exc:
logger.warning(
"卸载物料 %s 时无法解析库位 %s: %s",
row.get("materialCode"), slot_label, exc,
)
continue
if warehouse is None or not slot_label:
continue
try:
if warehouse[slot_label] is not None:
warehouse[slot_label] = None
cleared[mode].append({
"material_code": str(row.get("materialCode") or ""),
"material_name": str(row.get("materialName") or ""),
"location": slot_label,
"warehouse": getattr(warehouse, "name", ""),
})
_remember(warehouse)
except (IndexError, KeyError, TypeError) as exc:
logger.warning(
"卸载物料 %s 时清空库位 %s[%s] 失败: %s",
row.get("materialCode"), getattr(warehouse, "name", "?"),
slot_label, exc,
)
# 2) Reagent rows: contents on a parent trough vs slot-bound trough labware.
for row in manifest.get("Reagent", []):
trough = row.get("trough")
if trough is not None:
tracker = getattr(trough, "tracker", None)
if tracker is not None and hasattr(tracker, "set_liquids"):
try:
tracker.set_liquids([])
except Exception as exc: # pragma: no cover - tracker variants
logger.debug("tracker.set_liquids 失败(非阻塞): %s", exc)
extra = getattr(trough, "unilabos_extra", None)
if isinstance(extra, dict):
extra["reagent_bioyond_ids"] = []
try:
setattr(trough, "unilabos_extra", extra)
except Exception: # pragma: no cover - read-only proxy
pass
cleared["Reagent"].append({
"material_name": str(row.get("materialName") or ""),
"material_code": str(row.get("materialCode") or ""),
"trough": getattr(trough, "name", ""),
"warehouse": getattr(getattr(trough, "parent", None), "name", ""),
})
_remember(getattr(trough, "parent", None))
continue
# Slot-bound Reagent labware (e.g. trough labware). Preserve the
# labware on the deck — only its contents (liquids + reagent ids)
# are zeroed. ``reset`` is the LIMS-side path for full removal.
child = row.get("_child")
if child is not None:
tracker = getattr(child, "tracker", None)
if tracker is not None and hasattr(tracker, "set_liquids"):
try:
tracker.set_liquids([])
except Exception as exc: # pragma: no cover - tracker variants
logger.debug("tracker.set_liquids 失败(非阻塞): %s", exc)
extra = getattr(child, "unilabos_extra", None)
if isinstance(extra, dict) and extra.get("reagent_bioyond_ids"):
extra["reagent_bioyond_ids"] = []
try:
setattr(child, "unilabos_extra", extra)
except Exception: # pragma: no cover - read-only proxy
pass
_remember(row.get("_warehouse") or getattr(child, "parent", None))
continue
# 3) Publish the deck after mutations. ``_publish_resource_tree_update``
# publishes the whole deck, so a single call covers all warehouses we
# touched. ``affected_warehouses`` is kept for telemetry / tests.
if affected_warehouses:
self._publish_resource_tree_update()
return cleared
def _build_manual_load_probe(
self,
materials_by_type: Dict[str, List[Dict[str, Any]]],