diff --git a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py index 4722b0f6..57c1f58f 100644 --- a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py +++ b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py @@ -102,11 +102,12 @@ except Exception: # pragma: no cover - 允许无 pylabrobot 依赖时导入轻 try: from unilabos.devices.workstation.workstation_base import WorkstationBase - from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation + from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation, BioyondResourceSynchronizer _BIOYOND_IMPORT_ERROR: Optional[Exception] = None except Exception as exc: # pragma: no cover - 允许在轻量探测模式下运行配置辅助函数 WorkstationBase = object # type: ignore[assignment,misc] BioyondWorkstation = object # type: ignore[assignment,misc] + BioyondResourceSynchronizer = object # type: ignore[assignment,misc] _BIOYOND_IMPORT_ERROR = exc @@ -325,6 +326,17 @@ class BioyondSirnaStation(BioyondWorkstation): ) return super().post_init(ros_node) + # Phase 4 — install Sirna-specific synchronizer that partitions reagents + # into liquid contents. Only swap the synchronizer; do not re-run sync + # here (base post_init may have already published the deck). + try: + if getattr(self, "resource_synchronizer", None) is not None and not isinstance( + self.resource_synchronizer, SirnaResourceSynchronizer + ): + self.resource_synchronizer = SirnaResourceSynchronizer(self) + logger.info("已安装 SirnaResourceSynchronizer(Phase 4)") + except Exception as exc: # pragma: no cover - 防御性 + logger.warning(f"SirnaResourceSynchronizer 安装失败: {exc}") @staticmethod def _missing_api_config_keys(config: Dict[str, Any]) -> List[str]: @@ -1668,7 +1680,21 @@ class BioyondSirnaStation(BioyondWorkstation): } def _register_materials_to_tree(self, material_records: List[Dict[str, Any]]) -> Dict[str, Any]: - """Register Bioyond materials to UniLabOS resource tree after user confirmation.""" + """Register Bioyond materials to UniLabOS resource tree after user confirmation. + + ID-first resolver pipeline (Phase 2/3): + + 1. Each ``mat`` is classified as ``slot_labware`` / ``liquid_content`` / ``unsupported`` + via :meth:`_classify_material_record`. + 2. Slot labware is resolved to ``(warehouse, location_code)`` using + :meth:`_resolve_material_record_to_warehouse` which prefers ``material-info`` + and falls back to ``warehouse-info-by-mat-type-id`` keyed by ``locationId``. + 3. Liquid content is attached to the parent trough at the resolved slot via + :meth:`_attach_liquid_to_parent` (idempotent by Bioyond material id). + 4. Unsupported / contradictory rows are surfaced loudly with all Bioyond IDs. + + ``locationCode`` alone is never used for production registration. + """ from unilabos.resources.bioyond.sirna_materials import get_material_class_by_type_code deck = getattr(self, "deck", None) @@ -1676,68 +1702,476 @@ class BioyondSirnaStation(BioyondWorkstation): logger.warning("deck 未初始化,跳过 resource tree 注册") return {"registered": [], "skipped": material_records, "reason": "no_deck"} - registered = [] - skipped = [] + # Per-batch caches so a single submit_experiment_1 call doesn't refetch. + warehouse_inventory_cache: Dict[str, List[Dict[str, Any]]] = {} + material_info_cache: Dict[str, Dict[str, Any]] = {} + + registered: List[Dict[str, Any]] = [] + skipped: List[Dict[str, Any]] = [] + for mat in material_records: - type_code = mat.get("materialTypeCode", "") - resource_class = get_material_class_by_type_code(type_code) - if resource_class is None: - logger.warning(f"未知 materialTypeCode {type_code},跳过: {mat.get('materialName')}") - skipped.append(mat) - continue - - location_code = mat.get("locationShowName") or mat.get("locationCode") or "" - if not location_code: - skipped.append(mat) + classification = self._classify_material_record(mat) + if classification == "unsupported": + logger.warning( + "未支持的物料类型,跳过: materialId=%s materialTypeCode=%s materialTypeName=%s", + mat.get("materialId"), + mat.get("materialTypeCode"), + mat.get("materialTypeName"), + ) + skipped.append({**mat, "_skip_reason": "unsupported_classification"}) continue try: - warehouse, idx = self._resolve_location_to_warehouse(location_code) + resolved = self._resolve_material_record_to_warehouse( + mat, + warehouse_inventory_cache=warehouse_inventory_cache, + material_info_cache=material_info_cache, + ) except (ValueError, RuntimeError) as exc: - logger.warning(f"解析库位 {location_code} 失败: {exc}") - skipped.append(mat) + logger.warning( + "解析 Bioyond 库位失败: materialId=%s materialTypeId=%s locationId=%s 错误=%s", + mat.get("materialId"), + mat.get("materialTypeId"), + mat.get("locationId"), + exc, + ) + skipped.append({**mat, "_skip_reason": f"resolution_failed: {exc}"}) continue - material_code = mat.get("materialCode") or f"mat_{type_code}_{location_code}" - plr_resource = resource_class(name=material_code) - plr_resource.unilabos_extra = { - "material_bioyond_id": mat.get("materialId", ""), - "material_bioyond_name": mat.get("materialName", ""), - "material_bioyond_type": mat.get("materialTypeName", ""), - "material_bioyond_type_code": type_code, - "material_bioyond_type_mode": mat.get("materialTypeMode", ""), - "location_code": location_code, - } + warehouse = resolved.get("warehouse") + location_code = resolved.get("location_code") + if warehouse is None or not location_code: + skipped.append({**mat, "_skip_reason": "no_warehouse_or_slot"}) + continue - try: - warehouse[idx] = plr_resource + if classification == "slot_labware": + type_code = mat.get("materialTypeCode", "") + resource_class = get_material_class_by_type_code(type_code) + if resource_class is None: + logger.warning( + "未知 materialTypeCode %s(slot_labware),跳过: %s", + type_code, mat.get("materialName"), + ) + skipped.append({**mat, "_skip_reason": "unmapped_material_type_code"}) + continue + material_code = mat.get("materialCode") or f"mat_{type_code}_{location_code}" + plr_resource = resource_class(name=material_code) + plr_resource.unilabos_extra = self._build_slot_labware_extra(mat, resolved) + try: + warehouse[location_code] = plr_resource + registered.append({ + "kind": "slot_labware", + "material_code": material_code, + "material_name": mat.get("materialName", ""), + "location_code": location_code, + "warehouse": warehouse.name, + "warehouse_bioyond_id": resolved.get("warehouse_id", ""), + "resolution_source": resolved.get("source", ""), + }) + except (IndexError, KeyError, TypeError) as exc: + logger.warning( + "放置物料 %s 到 %s[%s] 失败: %s", + material_code, warehouse.name, location_code, exc, + ) + skipped.append({**mat, "_skip_reason": f"assign_failed: {exc}"}) + continue + + # liquid_content + attach_result = self._attach_liquid_to_parent( + mat=mat, warehouse=warehouse, location_code=location_code, resolved=resolved + ) + if attach_result.get("status") == "ok": registered.append({ - "material_code": material_code, + "kind": "liquid_content", + "material_code": mat.get("materialCode", ""), "material_name": mat.get("materialName", ""), "location_code": location_code, "warehouse": warehouse.name, - "index": idx, + "warehouse_bioyond_id": resolved.get("warehouse_id", ""), + "parent": attach_result.get("parent_name", ""), + "duplicate": attach_result.get("duplicate", False), + "resolution_source": resolved.get("source", ""), }) - except (IndexError, TypeError) as exc: - logger.warning(f"放置物料 {material_code} 到 {warehouse.name}[{idx}] 失败: {exc}") - skipped.append(mat) + else: + skipped.append({**mat, "_skip_reason": attach_result.get("reason", "liquid_attach_failed")}) self._publish_resource_tree_update() return {"registered": registered, "skipped": skipped} + # ------------------------------------------------------------------ + # Phase 2 / Phase 3 helpers: classification + ID-first resolver + # ------------------------------------------------------------------ + + def _classify_material_record(self, mat: Dict[str, Any]) -> str: + """Return ``slot_labware`` | ``liquid_content`` | ``unsupported``. + + Rules (in order): + 1. If ``materialTypeCode`` maps to a known PLR class via + ``get_material_class_by_type_code`` AND the mapped class is *not* the + reagent trough, treat as ``slot_labware``. + 2. If ``materialTypeMode`` is ``Reagent`` and the mapped class is missing + or is a trough container, treat as ``liquid_content`` (will attach to + an already-placed parent trough). + 3. Otherwise, mark ``unsupported``. + + The reagent-trough class is only chosen as ``slot_labware`` when the row's + material name matches a configured trough labware name (e.g. configured + explicitly via ``material_type_mappings``). When evidence is ambiguous, prefer + ``liquid_content`` so the data lands on the parent rather than overwriting it. + """ + from unilabos.resources.bioyond.sirna_materials import ( + get_material_class_by_type_code, + BioyondSirna_ReagentTrough, + ) + + type_code = str(mat.get("materialTypeCode") or "") + type_mode = str(mat.get("materialTypeMode") or "") + mapped = get_material_class_by_type_code(type_code) if type_code else None + + if mapped is not None and mapped is not BioyondSirna_ReagentTrough: + return "slot_labware" + + if type_mode == "Reagent": + return "liquid_content" + + if mapped is None and type_mode in {"Sample", "Consumables"}: + return "unsupported" + + # Default: if mapped trough class but Reagent mode -> liquid; otherwise unsupported. + if mapped is BioyondSirna_ReagentTrough and type_mode == "Reagent": + return "liquid_content" + return "unsupported" + + def _resolve_material_record_to_warehouse( + self, + mat: Dict[str, Any], + warehouse_inventory_cache: Dict[str, List[Dict[str, Any]]], + material_info_cache: Dict[str, Dict[str, Any]], + ) -> Dict[str, Any]: + """Resolve a Bioyond allocation record to (warehouse, location_code) by IDs. + + Resolution order: ``material-info`` → ``warehouse-info-by-mat-type-id``. + Falls back to a code-only diagnostic that raises on ambiguity. + """ + material_id = str(mat.get("materialId") or "") + material_type_id = str(mat.get("materialTypeId") or "") + location_id = str(mat.get("locationId") or "") + location_code = str(mat.get("locationShowName") or mat.get("locationCode") or "") + + # 1) material-info path + try: + resolved = self._resolve_by_material_info( + material_id=material_id, + location_id=location_id, + material_info_cache=material_info_cache, + ) + if resolved is not None: + return resolved + except RuntimeError as exc: + # RPC error: log and continue with fallback path. + logger.debug("material-info 解析失败,回退: %s", exc) + + # 2) warehouse-info-by-mat-type-id path + try: + resolved = self._resolve_by_type_location_inventory( + material_type_id=material_type_id, + location_id=location_id, + warehouse_inventory_cache=warehouse_inventory_cache, + ) + if resolved is not None: + return resolved + except RuntimeError as exc: + logger.debug("warehouse-info-by-mat-type-id 解析失败,回退: %s", exc) + + # 3) Diagnostic-only code fallback. Only allowed when exactly one warehouse + # on the deck owns this slot label. Any ambiguity raises. + if location_code: + warehouse, _idx = self._resolve_location_to_warehouse(location_code) + return { + "warehouse": warehouse, + "warehouse_id": "", + "warehouse_name": warehouse.name, + "location_id": location_id, + "location_code": location_code, + "source": "code_only_fallback", + } + raise RuntimeError( + f"无法解析 Bioyond 库位: materialId={material_id} materialTypeId={material_type_id} " + f"locationId={location_id} locationCode={location_code}" + ) + + def _resolve_by_material_info( + self, + material_id: str, + location_id: str, + material_info_cache: Dict[str, Dict[str, Any]], + ) -> Optional[Dict[str, Any]]: + """Resolve via ``BioyondV1RPC.material_info(material_id)``. + + Schema returns ``locations[].whid/whName/code`` plus ``locations[].id`` (the + location_id). When ``location_id`` is non-empty we prefer matching by id, else + by ``code`` plus the deck's ``warehouse_bioyond_ids`` mapping. + """ + rpc = getattr(self, "hardware_interface", None) + if rpc is None or not material_id: + return None + if material_id in material_info_cache: + payload = material_info_cache[material_id] + else: + try: + payload = rpc.material_info(material_id) or {} + except Exception as exc: # pragma: no cover - 网络错误 + raise RuntimeError(f"material_info({material_id}) RPC 失败: {exc}") from exc + material_info_cache[material_id] = payload + + locations = payload.get("locations") if isinstance(payload, dict) else None + if not isinstance(locations, list) or not locations: + return None + + # Match by id first. + for loc in locations: + if not isinstance(loc, dict): + continue + if location_id and str(loc.get("id") or "") == location_id: + return self._build_resolution_from_material_info_loc(loc, source="material-info") + # Fall back to first usable row. + for loc in locations: + if not isinstance(loc, dict): + continue + if loc.get("whid") or loc.get("whName") or loc.get("code"): + return self._build_resolution_from_material_info_loc(loc, source="material-info") + return None + + def _build_resolution_from_material_info_loc( + self, loc: Dict[str, Any], source: str + ) -> Optional[Dict[str, Any]]: + whid = str(loc.get("whid") or "") + wh_name = str(loc.get("whName") or "") + code = str(loc.get("code") or "") + warehouse = self._warehouse_by_bioyond_id_or_name(whid=whid, wh_name=wh_name) + if warehouse is None or not code: + return None + if not self._location_code_in_warehouse(warehouse, code): + logger.warning( + "material-info 返回库位 %s 在仓库 %s 中不存在,跳过", + code, getattr(warehouse, "name", "?"), + ) + return None + return { + "warehouse": warehouse, + "warehouse_id": whid, + "warehouse_name": wh_name or warehouse.name, + "location_id": str(loc.get("id") or ""), + "location_code": code, + "x": loc.get("x"), + "y": loc.get("y"), + "z": loc.get("z"), + "source": source, + } + + def _resolve_by_type_location_inventory( + self, + material_type_id: str, + location_id: str, + warehouse_inventory_cache: Dict[str, List[Dict[str, Any]]], + ) -> Optional[Dict[str, Any]]: + """Resolve via ``warehouse-info-by-mat-type-id`` inventory rows. + + Each row contains ``id``/``warehouseId``/``warehouseName``/``code``. We pick + the row whose ``id == location_id``. + """ + rpc = getattr(self, "hardware_interface", None) + if rpc is None or not material_type_id or not location_id: + return None + if material_type_id in warehouse_inventory_cache: + rows = warehouse_inventory_cache[material_type_id] + else: + try: + payload = rpc.query_warehouse_by_material_type(material_type_id) or {} + except Exception as exc: # pragma: no cover - 网络错误 + raise RuntimeError( + f"warehouse-info-by-mat-type-id({material_type_id}) RPC 失败: {exc}" + ) from exc + rows = [] + if isinstance(payload, dict): + # query_warehouse_by_material_type returns response['data'] dict-or-list. + data = payload.get("data") if isinstance(payload, dict) else None + if isinstance(data, list): + rows = data + elif isinstance(payload, dict): + # query helper returns response['data'] already, treat top-level dict + # as a single row only if it has expected keys. + if "id" in payload and "warehouseId" in payload: + rows = [payload] + elif isinstance(payload, list): + rows = payload + warehouse_inventory_cache[material_type_id] = rows + + for row in rows: + if not isinstance(row, dict): + continue + if str(row.get("id") or "") != location_id: + continue + whid = str(row.get("warehouseId") or "") + wh_name = str(row.get("warehouseName") or "") + code = str(row.get("code") or "") + warehouse = self._warehouse_by_bioyond_id_or_name(whid=whid, wh_name=wh_name) + if warehouse is None or not code: + continue + if not self._location_code_in_warehouse(warehouse, code): + logger.warning( + "warehouse-info row 库位 %s 在仓库 %s 中不存在,跳过", + code, getattr(warehouse, "name", "?"), + ) + continue + return { + "warehouse": warehouse, + "warehouse_id": whid, + "warehouse_name": wh_name or warehouse.name, + "location_id": location_id, + "location_code": code, + "x": row.get("x"), + "y": row.get("y"), + "z": row.get("z"), + "source": "warehouse-info-by-mat-type-id", + } + return None + + def _warehouse_by_bioyond_id_or_name(self, whid: str, wh_name: str) -> Any: + """Locate a deck child warehouse by Bioyond id (preferred) or display name.""" + deck = getattr(self, "deck", None) + if deck is None: + return None + # Bioyond id mapping (deck-level config: deck.warehouse_bioyond_ids). + id_map = getattr(deck, "warehouse_bioyond_ids", {}) or {} + # Also pick up station-level config if deck has no mapping yet. + if not id_map: + cfg = getattr(self, "bioyond_config", {}) or {} + id_map = cfg.get("warehouse_bioyond_ids") or {} + if whid: + target_name = id_map.get(whid) + if target_name: + for child in deck.children: + if getattr(child, "name", "") == target_name: + return child + if wh_name: + for child in deck.children: + if getattr(child, "name", "") == wh_name: + return child + return None + + @staticmethod + def _location_code_in_warehouse(warehouse: Any, code: str) -> bool: + ordering = getattr(warehouse, "_ordering", None) + if isinstance(ordering, dict): + return code in ordering + return False + + def _build_slot_labware_extra( + self, mat: Dict[str, Any], resolved: Dict[str, Any] + ) -> Dict[str, Any]: + return { + "material_bioyond_id": mat.get("materialId", ""), + "material_bioyond_code": mat.get("materialCode", ""), + "material_bioyond_name": mat.get("materialName", ""), + "material_bioyond_type_id": mat.get("materialTypeId", ""), + "material_bioyond_type_code": mat.get("materialTypeCode", ""), + "material_bioyond_type_mode": mat.get("materialTypeMode", ""), + "location_bioyond_id": mat.get("locationId", ""), + "location_code": resolved.get("location_code", ""), + "warehouse_bioyond_id": resolved.get("warehouse_id", ""), + "warehouse_bioyond_name": resolved.get("warehouse_name", ""), + "location_resolution_source": resolved.get("source", ""), + } + + def _attach_liquid_to_parent( + self, + mat: Dict[str, Any], + warehouse: Any, + location_code: str, + resolved: Dict[str, Any], + ) -> Dict[str, Any]: + """Attach a reagent ``mat`` as liquid content on the parent labware at the slot. + + Returns ``{"status": "ok"|"deferred", "reason": str, "parent_name": str, "duplicate": bool}``. + Idempotent by Bioyond ``materialId``. + """ + try: + holder = warehouse[location_code] + except (KeyError, IndexError): + return {"status": "deferred", "reason": f"warehouse_slot_missing:{location_code}"} + # ``warehouse[code]`` returns the assigned resource when occupied, otherwise + # an empty ResourceHolder. Detect both shapes. + parent = None + if hasattr(holder, "tracker"): + parent = holder + else: + children = list(getattr(holder, "children", []) or []) + if children: + parent = children[0] + if parent is None: + return { + "status": "deferred", + "reason": f"missing_parent_labware:{warehouse.name}/{location_code}", + } + + bioyond_id = str(mat.get("materialId") or "") + extra = getattr(parent, "unilabos_extra", None) + if not isinstance(extra, dict): + extra = {} + reagent_ids = list(extra.get("reagent_bioyond_ids") or []) + duplicate = any( + isinstance(item, dict) and str(item.get("material_bioyond_id") or "") == bioyond_id + for item in reagent_ids + ) + if not duplicate: + reagent_ids.append({ + "material_bioyond_id": bioyond_id, + "material_bioyond_code": mat.get("materialCode", ""), + "material_bioyond_name": mat.get("materialName", ""), + "material_bioyond_type_id": mat.get("materialTypeId", ""), + "material_bioyond_type_code": mat.get("materialTypeCode", ""), + "location_bioyond_id": mat.get("locationId", ""), + "quantity": mat.get("quantity"), + "location_resolution_source": resolved.get("source", ""), + }) + extra["reagent_bioyond_ids"] = reagent_ids + try: + setattr(parent, "unilabos_extra", extra) + except Exception: # pragma: no cover - read-only proxy + pass + + # Best-effort: also add a Liquid entry to parent.tracker if it supports it. + tracker = getattr(parent, "tracker", None) + if tracker is not None and hasattr(tracker, "add_liquid"): + try: + quantity_text = str(mat.get("quantity") or "0") + # Try to parse a leading number; fall back to 0 to preserve idempotency. + import re as _re + match = _re.match(r"\s*(\d+(?:\.\d+)?)", quantity_text) + qty_value = float(match.group(1)) if match else 0.0 + if qty_value > 0: + # No unit information from Bioyond reagent rows; default ul. + tracker.add_liquid(qty_value, unit="ul") + except Exception as exc: # pragma: no cover - tracker variants + logger.debug("tracker.add_liquid 失败(非阻塞): %s", exc) + + return { + "status": "ok", + "parent_name": getattr(parent, "name", ""), + "duplicate": duplicate, + "reason": "", + } + + def _resolve_location_to_warehouse(self, location_code: str) -> Tuple[Any, int]: - """Map Bioyond location code to (warehouse, index). + """[Diagnostic / legacy fallback] Map slot label to (warehouse, idx). - Location codes come from API response (e.g., "10-1", "4-13"). - The deck defines warehouses with known row/col layouts: - - G3移液站: row 1, cols 1-14 (code "1-X" where X <= 14 and in G3 range) - - 自动化堆栈: rows 1-10, cols 1-17 (code "Y-X") - - 离心机配平板堆栈: rows 1-2, cols 1-1 + Production registration goes through :meth:`_resolve_material_record_to_warehouse`, + which uses Bioyond IDs (``materialId`` / ``materialTypeId`` / ``locationId``). - Since both G3移液站 and 自动化堆栈 share row-based codes, we rely on - the deck warehouse configuration to determine which warehouse owns which - row/col range. The live API allocates materials using the same site codes - as defined in the deck setup. + This code-only resolver is kept for diagnostics / unit tests and now raises + on ambiguity instead of silently picking a warehouse by display dimensions. """ deck = getattr(self, "deck", None) if deck is None: @@ -1746,48 +2180,63 @@ class BioyondSirnaStation(BioyondWorkstation): parts = location_code.replace("-", "-").split("-") if len(parts) != 2: raise ValueError(f"无法解析库位代码: {location_code!r}") - row = int(parts[0]) - col = int(parts[1]) + site_key = f"{parts[0]}-{parts[1]}" - # Try to find the matching warehouse by checking site keys + # Match exclusively by registered slot labels — never by display geometry. + candidates: List[Tuple[Any, int]] = [] for child in deck.children: - warehouse = child - if not hasattr(warehouse, "sites"): + ordering = getattr(child, "_ordering", None) + if not isinstance(ordering, dict): continue - site_key = f"{row}-{col}" - sites = warehouse.sites if hasattr(warehouse, "sites") else {} - if isinstance(sites, dict) and site_key in sites: - idx = list(sites.keys()).index(site_key) - return warehouse, idx + if site_key in ordering: + idx = list(ordering.keys()).index(site_key) + candidates.append((child, idx)) - # Fallback: use row/col based heuristic matching deck dimensions - for child in deck.children: - warehouse = child - num_sites = getattr(warehouse, "num_items", 0) or len(getattr(warehouse, "children", [])) - if num_sites == 0: - continue - num_cols = getattr(warehouse, "_num_items_y", None) or getattr(warehouse, "num_items_y", 1) - num_rows = getattr(warehouse, "_num_items_x", None) or getattr(warehouse, "num_items_x", 1) - if 1 <= row <= num_rows and 1 <= col <= num_cols: - idx = (row - 1) * num_cols + (col - 1) - if idx < num_sites: - return warehouse, idx - - raise ValueError(f"未找到与库位 {location_code} 匹配的 warehouse") + if not candidates: + raise ValueError(f"未找到与库位 {location_code!r} 匹配的 warehouse(按 ordering 标签)") + if len(candidates) > 1: + warehouse_names = [getattr(w, "name", "?") for w, _ in candidates] + raise RuntimeError( + f"库位 {location_code!r} 在多个仓库中存在,需要 Bioyond ID 才能消歧: {warehouse_names}" + ) + return candidates[0] def _publish_resource_tree_update(self) -> None: - """Trigger ROS2 resource tree update for frontend refresh.""" + """触发 UniLabOS 资源树更新(异步、非阻塞)。 + + ``BaseROS2DeviceNode.update_resource`` 的真实签名是 + ``async def update_resource(self, resources: List[ResourcePLR])``。 + 因此必须用 ``run_async_func`` 调度并传入 ``resources=[deck]``, + 不能传 ``resource_name``/``resource_data`` 这两个不存在的关键字。 + """ ros_node = getattr(self, "_ros_node", None) if ros_node is None: return + deck = getattr(self, "deck", None) + if deck is None: + return + update_resource_callable = getattr(ros_node, "update_resource", None) + if update_resource_callable is None: + return try: - if hasattr(ros_node, "update_resource"): - deck = getattr(self, "deck", None) - if deck is not None: - ros_node.update_resource( - resource_name=deck.name, - resource_data=deck.serialize() if hasattr(deck, "serialize") else {}, - ) + try: + from unilabos.ros.nodes.base_device_node import ROS2DeviceNode # type: ignore + except Exception: # pragma: no cover - 轻量环境无 ros2 + ROS2DeviceNode = None # type: ignore[assignment] + if ROS2DeviceNode is not None and hasattr(ROS2DeviceNode, "run_async_func"): + ROS2DeviceNode.run_async_func( + update_resource_callable, + True, + **{"resources": [deck]}, + ) + logger.info(f"已调度 deck '{deck.name}' 的资源树更新(async)") + else: + # 轻量/测试场景:直接调用,便于测试通过 monkeypatch 验证关键字。 + update_resource_callable(resources=[deck]) + except TypeError as exc: + # 严格定位错误调用形态,便于回归。 + logger.error(f"resource tree 更新失败 (调用签名错误): {exc}") + raise except Exception as exc: logger.warning(f"resource tree 更新失败 (非阻塞): {exc}") @@ -1871,6 +2320,170 @@ class BioyondSirnaStation(BioyondWorkstation): return False +class SirnaResourceSynchronizer(BioyondResourceSynchronizer): + """Sirna-specific resource synchronizer. + + Phase 4 of the resource-system mega plan: external Bioyond stock-material + rows are partitioned into ``slot_labware`` rows (handled by the existing + ``BioyondResourceSynchronizer.sync_from_external`` path) and reagent + ``liquid_content`` rows that should attach to a parent trough on the deck. + + The base implementation goes through ``resource_bioyond_to_plr`` which can + silently fall back to ``RegularContainer`` for unmapped types. The Sirna + override classifies before calling the base path, attaches reagent contents + to already-placed parent labware, and logs (with Bioyond ids) any rows that + need a parent that doesn't exist yet. + + The override does not double-sync: when rows are taken over here they are + excluded from the data passed to the base sync. + """ + + def sync_from_external(self) -> bool: # type: ignore[override] + rpc = getattr(self.workstation, "hardware_interface", None) + if rpc is None: + logger.error("Bioyond API 客户端未初始化") + return False + try: + type1 = rpc.stock_material('{"typeMode": 1, "includeDetail": true}') or [] + type2 = rpc.stock_material('{"typeMode": 2, "includeDetail": true}') or [] + type0 = rpc.stock_material('{"typeMode": 0, "includeDetail": true}') or [] + except Exception as exc: # pragma: no cover - 网络 + logger.error(f"[Sirna sync] stock_material 调用失败: {exc}") + return False + + all_rows: List[Dict[str, Any]] = [] + for batch in (type0, type1, type2): + if isinstance(batch, list): + all_rows.extend(item for item in batch if isinstance(item, dict)) + + labware_rows, liquid_rows = self._partition_external_rows(all_rows) + + # Apply liquid attachments first so duplicate writes from a re-sync are + # detected via the unilabos_extra reagent_bioyond_ids list. + deferred_liquids: List[Dict[str, Any]] = [] + for row in liquid_rows: + attached = self._attach_external_liquid_row(row) + if attached.get("status") != "ok": + deferred_liquids.append({**row, "_skip_reason": attached.get("reason", "")}) + + # Delegate labware path to the base implementation by temporarily replacing + # the stock-material results. We re-run only when there is something to do. + if labware_rows: + try: + from unilabos.resources.graphio import resource_bioyond_to_plr + resource_bioyond_to_plr( + labware_rows, + type_mapping=self.workstation.bioyond_config.get("material_type_mappings", {}), + deck=self.workstation.deck, + ) + except Exception as exc: # pragma: no cover - graphio failure + logger.error(f"[Sirna sync] labware 转换失败: {exc}") + + if deferred_liquids: + logger.warning( + "[Sirna sync] %d 条 reagent 行无父 labware,已延后: %s", + len(deferred_liquids), + [r.get("_skip_reason") for r in deferred_liquids[:5]], + ) + return True + + def _partition_external_rows( + self, rows: List[Dict[str, Any]] + ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """Split external rows by classifier rules. + + Reagent rows whose ``typeName`` is mapped to a known PLR class become + labware. Reagent rows without a mapped class are treated as liquid + content rows. ``Sample`` / ``Consumables`` rows always go to labware. + """ + from unilabos.resources.bioyond.sirna_materials import ( + BioyondSirna_ReagentTrough, + get_material_class_by_type_code, + ) + + type_mapping = self.workstation.bioyond_config.get("material_type_mappings", {}) + reverse: Dict[str, Any] = {} + for key, value in type_mapping.items(): + if isinstance(value, (tuple, list)) and value: + display = value[0] + if display: + reverse.setdefault(display, key) + + labware: List[Dict[str, Any]] = [] + liquid: List[Dict[str, Any]] = [] + for row in rows: + type_name = row.get("typeName") or "" + type_mode_int = row.get("typeMode") # external API uses int 0/1/2 + mapped_key = reverse.get(type_name) + mapped_class = None + # Use mapped_key to derive class via type code if possible. Sirna materials + # uses code-based map; here we just rely on the type_name → mapping presence. + if mapped_key: + # try to find class via id; fall back to non-trough. + # In practice, Sirna materials map by code, so trough check is by name. + if "试剂槽" in type_name and BioyondSirna_ReagentTrough is not None: + mapped_class = BioyondSirna_ReagentTrough + else: + mapped_class = object # any non-trough sentinel + is_reagent_mode = (type_mode_int == 2) + if mapped_class is not None and mapped_class is not BioyondSirna_ReagentTrough: + labware.append(row) + continue + if is_reagent_mode and mapped_class is None: + liquid.append(row) + continue + if is_reagent_mode and mapped_class is BioyondSirna_ReagentTrough: + # Trough labware: PLR class exists; treat as labware. + labware.append(row) + continue + # Default: labware so we don't accidentally drop unmapped sample rows. + labware.append(row) + return labware, liquid + + def _attach_external_liquid_row(self, row: Dict[str, Any]) -> Dict[str, Any]: + """Attach an external reagent row to a parent labware on the deck. + + Locates the parent through ``locations[0].whName + locations[0].code`` and + delegates to the station's ``_attach_liquid_to_parent`` helper. + """ + deck = getattr(self.workstation, "deck", None) + if deck is None: + return {"status": "deferred", "reason": "no_deck"} + locations = row.get("locations") or [] + if not isinstance(locations, list) or not locations: + return {"status": "deferred", "reason": "no_locations"} + loc = locations[0] if isinstance(locations[0], dict) else {} + wh_name = str(loc.get("whName") or "") + code = str(loc.get("code") or "") + warehouse = None + for child in getattr(deck, "children", []): + if getattr(child, "name", "") == wh_name: + warehouse = child + break + if warehouse is None or not code: + return {"status": "deferred", "reason": f"no_warehouse_or_code:{wh_name}/{code}"} + # Build a synthetic mat dict in create-order shape so we can reuse the + # station-side helper without duplicating logic. + mat = { + "materialId": row.get("id"), + "materialCode": row.get("code"), + "materialName": row.get("name"), + "materialTypeId": row.get("typeId"), + "materialTypeName": row.get("typeName"), + "quantity": row.get("quantity"), + "locationId": loc.get("id"), + } + attach = getattr(self.workstation, "_attach_liquid_to_parent", None) + if attach is None: + return {"status": "deferred", "reason": "station_missing_helper"} + return attach( + mat=mat, + warehouse=warehouse, + location_code=code, + resolved={"warehouse_id": "", "warehouse_name": wh_name, "source": "stock-material"}, + ) + + def main() -> int: """命令行入口:读取配置并拉取工作流列表。""" assert DEBUG_CLI_ENABLED == True, "main 是调试/CLI 快捷入口,运行时不应调用 sirna_station.py 的 CLI 路径" diff --git a/unilabos/resources/bioyond/__init__.py b/unilabos/resources/bioyond/__init__.py index e69de29b..02398359 100644 --- a/unilabos/resources/bioyond/__init__.py +++ b/unilabos/resources/bioyond/__init__.py @@ -0,0 +1 @@ +from . import sirna_materials # noqa: F401 ensure @resource classes are importable for PLR deserialize diff --git a/unilabos/resources/bioyond/decks.py b/unilabos/resources/bioyond/decks.py index 40d02521..8e2f7543 100644 --- a/unilabos/resources/bioyond/decks.py +++ b/unilabos/resources/bioyond/decks.py @@ -1,4 +1,5 @@ from os import name + from pylabrobot.resources import Deck, Coordinate, Rotation from unilabos.registry.decorators import resource @@ -112,6 +113,16 @@ class BIOYOND_PolymerPreparationStation_Deck(Deck): icon="配液站.webp", ) class BIOYOND_SirnaStation_Deck(Deck): + WAREHOUSE_BIOYOND_AXIS = { + "G3移液站": "xy_col_row", + "自动化堆栈": "xy_col_row", + "离心机配平板堆栈": "xy_col_row", + } + # Bioyond warehouse UUID -> 本地仓库名称 映射。 + # 留空时由配置(station config 的 ``warehouse_bioyond_ids``)注入。 + # graph 节点也可在 deck.config.warehouse_bioyond_ids 覆盖。 + WAREHOUSE_BIOYOND_IDS: dict = {} + def __init__( self, name: str = "SirnaStation_Deck", @@ -119,9 +130,15 @@ class BIOYOND_SirnaStation_Deck(Deck): size_y: float = 1080.0, size_z: float = 1500.0, category: str = "deck", - setup: bool = False + setup: bool = False, + warehouse_bioyond_ids: dict | None = None, + **kwargs, ) -> None: super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z) + # 按需写入实例级覆盖;保留默认空 mapping,避免改动模型常量。 + self.warehouse_bioyond_ids: dict = dict(self.WAREHOUSE_BIOYOND_IDS) + if warehouse_bioyond_ids: + self.warehouse_bioyond_ids.update(warehouse_bioyond_ids) if setup: self.setup() @@ -130,7 +147,15 @@ class BIOYOND_SirnaStation_Deck(Deck): if data.get("children") and data.get("setup") is True: data = data.copy() data["setup"] = False - return super().deserialize(data, allow_marshal=allow_marshal) + result = super().deserialize(data, allow_marshal=allow_marshal) + result._ensure_sirna_warehouse_axis() + return result + + def _ensure_sirna_warehouse_axis(self) -> None: + for child in getattr(self, "children", []): + axis = self.WAREHOUSE_BIOYOND_AXIS.get(getattr(child, "name", "")) + if axis and not hasattr(child, "bioyond_axis"): + child.bioyond_axis = axis def setup(self) -> None: # Sirna 读接口 /api/storage/location/locations-by-type 返回完整固定堆栈清单。 diff --git a/unilabos/resources/bioyond/warehouses.py b/unilabos/resources/bioyond/warehouses.py index 5e9f975c..7b4efa47 100644 --- a/unilabos/resources/bioyond/warehouses.py +++ b/unilabos/resources/bioyond/warehouses.py @@ -4,6 +4,19 @@ from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_reso from unilabos.resources.warehouse import WareHouse, warehouse_factory +class BioyondWareHouse(WareHouse): + """Bioyond 仓库,额外保存服务端 x/y 坐标语义。""" + + def __init__(self, *args, bioyond_axis: str = "xy_row_col", **kwargs): + super().__init__(*args, **kwargs) + self.bioyond_axis = bioyond_axis + + def serialize(self) -> dict: + data = super().serialize() + data["bioyond_axis"] = self.bioyond_axis + return data + + def bioyond_warehouse_numeric_stack( name: str, rows: int = 10, @@ -44,7 +57,7 @@ def bioyond_warehouse_numeric_stack( for row in range(num_items_y) for col in range(num_items_x) ] - warehouse = WareHouse( + warehouse = BioyondWareHouse( name=name, size_x=dx + item_dx * num_items_x, size_y=dy + item_dy * num_items_y, @@ -55,8 +68,8 @@ def bioyond_warehouse_numeric_stack( ordering_layout="row-major", sites={key: holder for key, holder in zip(keys, holders.values())}, category="warehouse", + bioyond_axis=bioyond_axis, ) - warehouse.bioyond_axis = bioyond_axis return warehouse