feat: RNA land resource-system mega plan Phases 1-4 plus Phase 5 stack orientation

- Phase 1: fix _publish_resource_tree_update to call update_resource via run_async_func with deck resource list.
- Phase 2: add ID-first material placement resolver chain with material_info and warehouse_inventory caches.
- Phase 3: classify stock-material rows as slot_labware vs liquid_content; idempotent reagent attachment by Bioyond materialId.
- Phase 4: introduce SirnaResourceSynchronizer over BioyondResourceSynchronizer; install once in post_init without double-sync.
- Phase 5: numeric stack orientation and bioyond_axis support in bioyond warehouses/__init__/decks (carries forward prior in-progress edits).
- _resolve_location_to_warehouse now raises on ambiguity; deck constructor accepts warehouse_bioyond_ids kwarg.
This commit is contained in:
yxz321
2026-05-08 21:41:03 +08:00
parent 1f93740580
commit 7c83e1bd51
4 changed files with 732 additions and 80 deletions

View File

@@ -102,11 +102,12 @@ except Exception: # pragma: no cover - 允许无 pylabrobot 依赖时导入轻
try:
from unilabos.devices.workstation.workstation_base import WorkstationBase
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation, BioyondResourceSynchronizer
_BIOYOND_IMPORT_ERROR: Optional[Exception] = None
except Exception as exc: # pragma: no cover - 允许在轻量探测模式下运行配置辅助函数
WorkstationBase = object # type: ignore[assignment,misc]
BioyondWorkstation = object # type: ignore[assignment,misc]
BioyondResourceSynchronizer = object # type: ignore[assignment,misc]
_BIOYOND_IMPORT_ERROR = exc
@@ -325,6 +326,17 @@ class BioyondSirnaStation(BioyondWorkstation):
)
return
super().post_init(ros_node)
# Phase 4 — install Sirna-specific synchronizer that partitions reagents
# into liquid contents. Only swap the synchronizer; do not re-run sync
# here (base post_init may have already published the deck).
try:
if getattr(self, "resource_synchronizer", None) is not None and not isinstance(
self.resource_synchronizer, SirnaResourceSynchronizer
):
self.resource_synchronizer = SirnaResourceSynchronizer(self)
logger.info("已安装 SirnaResourceSynchronizerPhase 4")
except Exception as exc: # pragma: no cover - 防御性
logger.warning(f"SirnaResourceSynchronizer 安装失败: {exc}")
@staticmethod
def _missing_api_config_keys(config: Dict[str, Any]) -> List[str]:
@@ -1668,7 +1680,21 @@ class BioyondSirnaStation(BioyondWorkstation):
}
def _register_materials_to_tree(self, material_records: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Register Bioyond materials to UniLabOS resource tree after user confirmation."""
"""Register Bioyond materials to UniLabOS resource tree after user confirmation.
ID-first resolver pipeline (Phase 2/3):
1. Each ``mat`` is classified as ``slot_labware`` / ``liquid_content`` / ``unsupported``
via :meth:`_classify_material_record`.
2. Slot labware is resolved to ``(warehouse, location_code)`` using
:meth:`_resolve_material_record_to_warehouse` which prefers ``material-info``
and falls back to ``warehouse-info-by-mat-type-id`` keyed by ``locationId``.
3. Liquid content is attached to the parent trough at the resolved slot via
:meth:`_attach_liquid_to_parent` (idempotent by Bioyond material id).
4. Unsupported / contradictory rows are surfaced loudly with all Bioyond IDs.
``locationCode`` alone is never used for production registration.
"""
from unilabos.resources.bioyond.sirna_materials import get_material_class_by_type_code
deck = getattr(self, "deck", None)
@@ -1676,68 +1702,476 @@ class BioyondSirnaStation(BioyondWorkstation):
logger.warning("deck 未初始化,跳过 resource tree 注册")
return {"registered": [], "skipped": material_records, "reason": "no_deck"}
registered = []
skipped = []
# Per-batch caches so a single submit_experiment_1 call doesn't refetch.
warehouse_inventory_cache: Dict[str, List[Dict[str, Any]]] = {}
material_info_cache: Dict[str, Dict[str, Any]] = {}
registered: List[Dict[str, Any]] = []
skipped: List[Dict[str, Any]] = []
for mat in material_records:
type_code = mat.get("materialTypeCode", "")
resource_class = get_material_class_by_type_code(type_code)
if resource_class is None:
logger.warning(f"未知 materialTypeCode {type_code},跳过: {mat.get('materialName')}")
skipped.append(mat)
continue
location_code = mat.get("locationShowName") or mat.get("locationCode") or ""
if not location_code:
skipped.append(mat)
classification = self._classify_material_record(mat)
if classification == "unsupported":
logger.warning(
"未支持的物料类型,跳过: materialId=%s materialTypeCode=%s materialTypeName=%s",
mat.get("materialId"),
mat.get("materialTypeCode"),
mat.get("materialTypeName"),
)
skipped.append({**mat, "_skip_reason": "unsupported_classification"})
continue
try:
warehouse, idx = self._resolve_location_to_warehouse(location_code)
resolved = self._resolve_material_record_to_warehouse(
mat,
warehouse_inventory_cache=warehouse_inventory_cache,
material_info_cache=material_info_cache,
)
except (ValueError, RuntimeError) as exc:
logger.warning(f"解析库位 {location_code} 失败: {exc}")
skipped.append(mat)
logger.warning(
"解析 Bioyond 库位失败: materialId=%s materialTypeId=%s locationId=%s 错误=%s",
mat.get("materialId"),
mat.get("materialTypeId"),
mat.get("locationId"),
exc,
)
skipped.append({**mat, "_skip_reason": f"resolution_failed: {exc}"})
continue
material_code = mat.get("materialCode") or f"mat_{type_code}_{location_code}"
plr_resource = resource_class(name=material_code)
plr_resource.unilabos_extra = {
"material_bioyond_id": mat.get("materialId", ""),
"material_bioyond_name": mat.get("materialName", ""),
"material_bioyond_type": mat.get("materialTypeName", ""),
"material_bioyond_type_code": type_code,
"material_bioyond_type_mode": mat.get("materialTypeMode", ""),
"location_code": location_code,
}
warehouse = resolved.get("warehouse")
location_code = resolved.get("location_code")
if warehouse is None or not location_code:
skipped.append({**mat, "_skip_reason": "no_warehouse_or_slot"})
continue
try:
warehouse[idx] = plr_resource
if classification == "slot_labware":
type_code = mat.get("materialTypeCode", "")
resource_class = get_material_class_by_type_code(type_code)
if resource_class is None:
logger.warning(
"未知 materialTypeCode %sslot_labware跳过: %s",
type_code, mat.get("materialName"),
)
skipped.append({**mat, "_skip_reason": "unmapped_material_type_code"})
continue
material_code = mat.get("materialCode") or f"mat_{type_code}_{location_code}"
plr_resource = resource_class(name=material_code)
plr_resource.unilabos_extra = self._build_slot_labware_extra(mat, resolved)
try:
warehouse[location_code] = plr_resource
registered.append({
"kind": "slot_labware",
"material_code": material_code,
"material_name": mat.get("materialName", ""),
"location_code": location_code,
"warehouse": warehouse.name,
"warehouse_bioyond_id": resolved.get("warehouse_id", ""),
"resolution_source": resolved.get("source", ""),
})
except (IndexError, KeyError, TypeError) as exc:
logger.warning(
"放置物料 %s%s[%s] 失败: %s",
material_code, warehouse.name, location_code, exc,
)
skipped.append({**mat, "_skip_reason": f"assign_failed: {exc}"})
continue
# liquid_content
attach_result = self._attach_liquid_to_parent(
mat=mat, warehouse=warehouse, location_code=location_code, resolved=resolved
)
if attach_result.get("status") == "ok":
registered.append({
"material_code": material_code,
"kind": "liquid_content",
"material_code": mat.get("materialCode", ""),
"material_name": mat.get("materialName", ""),
"location_code": location_code,
"warehouse": warehouse.name,
"index": idx,
"warehouse_bioyond_id": resolved.get("warehouse_id", ""),
"parent": attach_result.get("parent_name", ""),
"duplicate": attach_result.get("duplicate", False),
"resolution_source": resolved.get("source", ""),
})
except (IndexError, TypeError) as exc:
logger.warning(f"放置物料 {material_code}{warehouse.name}[{idx}] 失败: {exc}")
skipped.append(mat)
else:
skipped.append({**mat, "_skip_reason": attach_result.get("reason", "liquid_attach_failed")})
self._publish_resource_tree_update()
return {"registered": registered, "skipped": skipped}
# ------------------------------------------------------------------
# Phase 2 / Phase 3 helpers: classification + ID-first resolver
# ------------------------------------------------------------------
def _classify_material_record(self, mat: Dict[str, Any]) -> str:
"""Return ``slot_labware`` | ``liquid_content`` | ``unsupported``.
Rules (in order):
1. If ``materialTypeCode`` maps to a known PLR class via
``get_material_class_by_type_code`` AND the mapped class is *not* the
reagent trough, treat as ``slot_labware``.
2. If ``materialTypeMode`` is ``Reagent`` and the mapped class is missing
or is a trough container, treat as ``liquid_content`` (will attach to
an already-placed parent trough).
3. Otherwise, mark ``unsupported``.
The reagent-trough class is only chosen as ``slot_labware`` when the row's
material name matches a configured trough labware name (e.g. configured
explicitly via ``material_type_mappings``). When evidence is ambiguous, prefer
``liquid_content`` so the data lands on the parent rather than overwriting it.
"""
from unilabos.resources.bioyond.sirna_materials import (
get_material_class_by_type_code,
BioyondSirna_ReagentTrough,
)
type_code = str(mat.get("materialTypeCode") or "")
type_mode = str(mat.get("materialTypeMode") or "")
mapped = get_material_class_by_type_code(type_code) if type_code else None
if mapped is not None and mapped is not BioyondSirna_ReagentTrough:
return "slot_labware"
if type_mode == "Reagent":
return "liquid_content"
if mapped is None and type_mode in {"Sample", "Consumables"}:
return "unsupported"
# Default: if mapped trough class but Reagent mode -> liquid; otherwise unsupported.
if mapped is BioyondSirna_ReagentTrough and type_mode == "Reagent":
return "liquid_content"
return "unsupported"
def _resolve_material_record_to_warehouse(
self,
mat: Dict[str, Any],
warehouse_inventory_cache: Dict[str, List[Dict[str, Any]]],
material_info_cache: Dict[str, Dict[str, Any]],
) -> Dict[str, Any]:
"""Resolve a Bioyond allocation record to (warehouse, location_code) by IDs.
Resolution order: ``material-info`` → ``warehouse-info-by-mat-type-id``.
Falls back to a code-only diagnostic that raises on ambiguity.
"""
material_id = str(mat.get("materialId") or "")
material_type_id = str(mat.get("materialTypeId") or "")
location_id = str(mat.get("locationId") or "")
location_code = str(mat.get("locationShowName") or mat.get("locationCode") or "")
# 1) material-info path
try:
resolved = self._resolve_by_material_info(
material_id=material_id,
location_id=location_id,
material_info_cache=material_info_cache,
)
if resolved is not None:
return resolved
except RuntimeError as exc:
# RPC error: log and continue with fallback path.
logger.debug("material-info 解析失败,回退: %s", exc)
# 2) warehouse-info-by-mat-type-id path
try:
resolved = self._resolve_by_type_location_inventory(
material_type_id=material_type_id,
location_id=location_id,
warehouse_inventory_cache=warehouse_inventory_cache,
)
if resolved is not None:
return resolved
except RuntimeError as exc:
logger.debug("warehouse-info-by-mat-type-id 解析失败,回退: %s", exc)
# 3) Diagnostic-only code fallback. Only allowed when exactly one warehouse
# on the deck owns this slot label. Any ambiguity raises.
if location_code:
warehouse, _idx = self._resolve_location_to_warehouse(location_code)
return {
"warehouse": warehouse,
"warehouse_id": "",
"warehouse_name": warehouse.name,
"location_id": location_id,
"location_code": location_code,
"source": "code_only_fallback",
}
raise RuntimeError(
f"无法解析 Bioyond 库位: materialId={material_id} materialTypeId={material_type_id} "
f"locationId={location_id} locationCode={location_code}"
)
def _resolve_by_material_info(
self,
material_id: str,
location_id: str,
material_info_cache: Dict[str, Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
"""Resolve via ``BioyondV1RPC.material_info(material_id)``.
Schema returns ``locations[].whid/whName/code`` plus ``locations[].id`` (the
location_id). When ``location_id`` is non-empty we prefer matching by id, else
by ``code`` plus the deck's ``warehouse_bioyond_ids`` mapping.
"""
rpc = getattr(self, "hardware_interface", None)
if rpc is None or not material_id:
return None
if material_id in material_info_cache:
payload = material_info_cache[material_id]
else:
try:
payload = rpc.material_info(material_id) or {}
except Exception as exc: # pragma: no cover - 网络错误
raise RuntimeError(f"material_info({material_id}) RPC 失败: {exc}") from exc
material_info_cache[material_id] = payload
locations = payload.get("locations") if isinstance(payload, dict) else None
if not isinstance(locations, list) or not locations:
return None
# Match by id first.
for loc in locations:
if not isinstance(loc, dict):
continue
if location_id and str(loc.get("id") or "") == location_id:
return self._build_resolution_from_material_info_loc(loc, source="material-info")
# Fall back to first usable row.
for loc in locations:
if not isinstance(loc, dict):
continue
if loc.get("whid") or loc.get("whName") or loc.get("code"):
return self._build_resolution_from_material_info_loc(loc, source="material-info")
return None
def _build_resolution_from_material_info_loc(
self, loc: Dict[str, Any], source: str
) -> Optional[Dict[str, Any]]:
whid = str(loc.get("whid") or "")
wh_name = str(loc.get("whName") or "")
code = str(loc.get("code") or "")
warehouse = self._warehouse_by_bioyond_id_or_name(whid=whid, wh_name=wh_name)
if warehouse is None or not code:
return None
if not self._location_code_in_warehouse(warehouse, code):
logger.warning(
"material-info 返回库位 %s 在仓库 %s 中不存在,跳过",
code, getattr(warehouse, "name", "?"),
)
return None
return {
"warehouse": warehouse,
"warehouse_id": whid,
"warehouse_name": wh_name or warehouse.name,
"location_id": str(loc.get("id") or ""),
"location_code": code,
"x": loc.get("x"),
"y": loc.get("y"),
"z": loc.get("z"),
"source": source,
}
def _resolve_by_type_location_inventory(
self,
material_type_id: str,
location_id: str,
warehouse_inventory_cache: Dict[str, List[Dict[str, Any]]],
) -> Optional[Dict[str, Any]]:
"""Resolve via ``warehouse-info-by-mat-type-id`` inventory rows.
Each row contains ``id``/``warehouseId``/``warehouseName``/``code``. We pick
the row whose ``id == location_id``.
"""
rpc = getattr(self, "hardware_interface", None)
if rpc is None or not material_type_id or not location_id:
return None
if material_type_id in warehouse_inventory_cache:
rows = warehouse_inventory_cache[material_type_id]
else:
try:
payload = rpc.query_warehouse_by_material_type(material_type_id) or {}
except Exception as exc: # pragma: no cover - 网络错误
raise RuntimeError(
f"warehouse-info-by-mat-type-id({material_type_id}) RPC 失败: {exc}"
) from exc
rows = []
if isinstance(payload, dict):
# query_warehouse_by_material_type returns response['data'] dict-or-list.
data = payload.get("data") if isinstance(payload, dict) else None
if isinstance(data, list):
rows = data
elif isinstance(payload, dict):
# query helper returns response['data'] already, treat top-level dict
# as a single row only if it has expected keys.
if "id" in payload and "warehouseId" in payload:
rows = [payload]
elif isinstance(payload, list):
rows = payload
warehouse_inventory_cache[material_type_id] = rows
for row in rows:
if not isinstance(row, dict):
continue
if str(row.get("id") or "") != location_id:
continue
whid = str(row.get("warehouseId") or "")
wh_name = str(row.get("warehouseName") or "")
code = str(row.get("code") or "")
warehouse = self._warehouse_by_bioyond_id_or_name(whid=whid, wh_name=wh_name)
if warehouse is None or not code:
continue
if not self._location_code_in_warehouse(warehouse, code):
logger.warning(
"warehouse-info row 库位 %s 在仓库 %s 中不存在,跳过",
code, getattr(warehouse, "name", "?"),
)
continue
return {
"warehouse": warehouse,
"warehouse_id": whid,
"warehouse_name": wh_name or warehouse.name,
"location_id": location_id,
"location_code": code,
"x": row.get("x"),
"y": row.get("y"),
"z": row.get("z"),
"source": "warehouse-info-by-mat-type-id",
}
return None
def _warehouse_by_bioyond_id_or_name(self, whid: str, wh_name: str) -> Any:
"""Locate a deck child warehouse by Bioyond id (preferred) or display name."""
deck = getattr(self, "deck", None)
if deck is None:
return None
# Bioyond id mapping (deck-level config: deck.warehouse_bioyond_ids).
id_map = getattr(deck, "warehouse_bioyond_ids", {}) or {}
# Also pick up station-level config if deck has no mapping yet.
if not id_map:
cfg = getattr(self, "bioyond_config", {}) or {}
id_map = cfg.get("warehouse_bioyond_ids") or {}
if whid:
target_name = id_map.get(whid)
if target_name:
for child in deck.children:
if getattr(child, "name", "") == target_name:
return child
if wh_name:
for child in deck.children:
if getattr(child, "name", "") == wh_name:
return child
return None
@staticmethod
def _location_code_in_warehouse(warehouse: Any, code: str) -> bool:
ordering = getattr(warehouse, "_ordering", None)
if isinstance(ordering, dict):
return code in ordering
return False
def _build_slot_labware_extra(
self, mat: Dict[str, Any], resolved: Dict[str, Any]
) -> Dict[str, Any]:
return {
"material_bioyond_id": mat.get("materialId", ""),
"material_bioyond_code": mat.get("materialCode", ""),
"material_bioyond_name": mat.get("materialName", ""),
"material_bioyond_type_id": mat.get("materialTypeId", ""),
"material_bioyond_type_code": mat.get("materialTypeCode", ""),
"material_bioyond_type_mode": mat.get("materialTypeMode", ""),
"location_bioyond_id": mat.get("locationId", ""),
"location_code": resolved.get("location_code", ""),
"warehouse_bioyond_id": resolved.get("warehouse_id", ""),
"warehouse_bioyond_name": resolved.get("warehouse_name", ""),
"location_resolution_source": resolved.get("source", ""),
}
def _attach_liquid_to_parent(
self,
mat: Dict[str, Any],
warehouse: Any,
location_code: str,
resolved: Dict[str, Any],
) -> Dict[str, Any]:
"""Attach a reagent ``mat`` as liquid content on the parent labware at the slot.
Returns ``{"status": "ok"|"deferred", "reason": str, "parent_name": str, "duplicate": bool}``.
Idempotent by Bioyond ``materialId``.
"""
try:
holder = warehouse[location_code]
except (KeyError, IndexError):
return {"status": "deferred", "reason": f"warehouse_slot_missing:{location_code}"}
# ``warehouse[code]`` returns the assigned resource when occupied, otherwise
# an empty ResourceHolder. Detect both shapes.
parent = None
if hasattr(holder, "tracker"):
parent = holder
else:
children = list(getattr(holder, "children", []) or [])
if children:
parent = children[0]
if parent is None:
return {
"status": "deferred",
"reason": f"missing_parent_labware:{warehouse.name}/{location_code}",
}
bioyond_id = str(mat.get("materialId") or "")
extra = getattr(parent, "unilabos_extra", None)
if not isinstance(extra, dict):
extra = {}
reagent_ids = list(extra.get("reagent_bioyond_ids") or [])
duplicate = any(
isinstance(item, dict) and str(item.get("material_bioyond_id") or "") == bioyond_id
for item in reagent_ids
)
if not duplicate:
reagent_ids.append({
"material_bioyond_id": bioyond_id,
"material_bioyond_code": mat.get("materialCode", ""),
"material_bioyond_name": mat.get("materialName", ""),
"material_bioyond_type_id": mat.get("materialTypeId", ""),
"material_bioyond_type_code": mat.get("materialTypeCode", ""),
"location_bioyond_id": mat.get("locationId", ""),
"quantity": mat.get("quantity"),
"location_resolution_source": resolved.get("source", ""),
})
extra["reagent_bioyond_ids"] = reagent_ids
try:
setattr(parent, "unilabos_extra", extra)
except Exception: # pragma: no cover - read-only proxy
pass
# Best-effort: also add a Liquid entry to parent.tracker if it supports it.
tracker = getattr(parent, "tracker", None)
if tracker is not None and hasattr(tracker, "add_liquid"):
try:
quantity_text = str(mat.get("quantity") or "0")
# Try to parse a leading number; fall back to 0 to preserve idempotency.
import re as _re
match = _re.match(r"\s*(\d+(?:\.\d+)?)", quantity_text)
qty_value = float(match.group(1)) if match else 0.0
if qty_value > 0:
# No unit information from Bioyond reagent rows; default ul.
tracker.add_liquid(qty_value, unit="ul")
except Exception as exc: # pragma: no cover - tracker variants
logger.debug("tracker.add_liquid 失败(非阻塞): %s", exc)
return {
"status": "ok",
"parent_name": getattr(parent, "name", ""),
"duplicate": duplicate,
"reason": "",
}
def _resolve_location_to_warehouse(self, location_code: str) -> Tuple[Any, int]:
"""Map Bioyond location code to (warehouse, index).
"""[Diagnostic / legacy fallback] Map slot label to (warehouse, idx).
Location codes come from API response (e.g., "10-1", "4-13").
The deck defines warehouses with known row/col layouts:
- G3移液站: row 1, cols 1-14 (code "1-X" where X <= 14 and in G3 range)
- 自动化堆栈: rows 1-10, cols 1-17 (code "Y-X")
- 离心机配平板堆栈: rows 1-2, cols 1-1
Production registration goes through :meth:`_resolve_material_record_to_warehouse`,
which uses Bioyond IDs (``materialId`` / ``materialTypeId`` / ``locationId``).
Since both G3移液站 and 自动化堆栈 share row-based codes, we rely on
the deck warehouse configuration to determine which warehouse owns which
row/col range. The live API allocates materials using the same site codes
as defined in the deck setup.
This code-only resolver is kept for diagnostics / unit tests and now raises
on ambiguity instead of silently picking a warehouse by display dimensions.
"""
deck = getattr(self, "deck", None)
if deck is None:
@@ -1746,48 +2180,63 @@ class BioyondSirnaStation(BioyondWorkstation):
parts = location_code.replace("-", "-").split("-")
if len(parts) != 2:
raise ValueError(f"无法解析库位代码: {location_code!r}")
row = int(parts[0])
col = int(parts[1])
site_key = f"{parts[0]}-{parts[1]}"
# Try to find the matching warehouse by checking site keys
# Match exclusively by registered slot labels — never by display geometry.
candidates: List[Tuple[Any, int]] = []
for child in deck.children:
warehouse = child
if not hasattr(warehouse, "sites"):
ordering = getattr(child, "_ordering", None)
if not isinstance(ordering, dict):
continue
site_key = f"{row}-{col}"
sites = warehouse.sites if hasattr(warehouse, "sites") else {}
if isinstance(sites, dict) and site_key in sites:
idx = list(sites.keys()).index(site_key)
return warehouse, idx
if site_key in ordering:
idx = list(ordering.keys()).index(site_key)
candidates.append((child, idx))
# Fallback: use row/col based heuristic matching deck dimensions
for child in deck.children:
warehouse = child
num_sites = getattr(warehouse, "num_items", 0) or len(getattr(warehouse, "children", []))
if num_sites == 0:
continue
num_cols = getattr(warehouse, "_num_items_y", None) or getattr(warehouse, "num_items_y", 1)
num_rows = getattr(warehouse, "_num_items_x", None) or getattr(warehouse, "num_items_x", 1)
if 1 <= row <= num_rows and 1 <= col <= num_cols:
idx = (row - 1) * num_cols + (col - 1)
if idx < num_sites:
return warehouse, idx
raise ValueError(f"未找到与库位 {location_code} 匹配的 warehouse")
if not candidates:
raise ValueError(f"未找到与库位 {location_code!r} 匹配的 warehouse按 ordering 标签)")
if len(candidates) > 1:
warehouse_names = [getattr(w, "name", "?") for w, _ in candidates]
raise RuntimeError(
f"库位 {location_code!r} 在多个仓库中存在,需要 Bioyond ID 才能消歧: {warehouse_names}"
)
return candidates[0]
def _publish_resource_tree_update(self) -> None:
"""Trigger ROS2 resource tree update for frontend refresh."""
"""触发 UniLabOS 资源树更新(异步、非阻塞)。
``BaseROS2DeviceNode.update_resource`` 的真实签名是
``async def update_resource(self, resources: List[ResourcePLR])``。
因此必须用 ``run_async_func`` 调度并传入 ``resources=[deck]``
不能传 ``resource_name``/``resource_data`` 这两个不存在的关键字。
"""
ros_node = getattr(self, "_ros_node", None)
if ros_node is None:
return
deck = getattr(self, "deck", None)
if deck is None:
return
update_resource_callable = getattr(ros_node, "update_resource", None)
if update_resource_callable is None:
return
try:
if hasattr(ros_node, "update_resource"):
deck = getattr(self, "deck", None)
if deck is not None:
ros_node.update_resource(
resource_name=deck.name,
resource_data=deck.serialize() if hasattr(deck, "serialize") else {},
)
try:
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode # type: ignore
except Exception: # pragma: no cover - 轻量环境无 ros2
ROS2DeviceNode = None # type: ignore[assignment]
if ROS2DeviceNode is not None and hasattr(ROS2DeviceNode, "run_async_func"):
ROS2DeviceNode.run_async_func(
update_resource_callable,
True,
**{"resources": [deck]},
)
logger.info(f"已调度 deck '{deck.name}' 的资源树更新async")
else:
# 轻量/测试场景:直接调用,便于测试通过 monkeypatch 验证关键字。
update_resource_callable(resources=[deck])
except TypeError as exc:
# 严格定位错误调用形态,便于回归。
logger.error(f"resource tree 更新失败 (调用签名错误): {exc}")
raise
except Exception as exc:
logger.warning(f"resource tree 更新失败 (非阻塞): {exc}")
@@ -1871,6 +2320,170 @@ class BioyondSirnaStation(BioyondWorkstation):
return False
class SirnaResourceSynchronizer(BioyondResourceSynchronizer):
"""Sirna-specific resource synchronizer.
Phase 4 of the resource-system mega plan: external Bioyond stock-material
rows are partitioned into ``slot_labware`` rows (handled by the existing
``BioyondResourceSynchronizer.sync_from_external`` path) and reagent
``liquid_content`` rows that should attach to a parent trough on the deck.
The base implementation goes through ``resource_bioyond_to_plr`` which can
silently fall back to ``RegularContainer`` for unmapped types. The Sirna
override classifies before calling the base path, attaches reagent contents
to already-placed parent labware, and logs (with Bioyond ids) any rows that
need a parent that doesn't exist yet.
The override does not double-sync: when rows are taken over here they are
excluded from the data passed to the base sync.
"""
def sync_from_external(self) -> bool: # type: ignore[override]
rpc = getattr(self.workstation, "hardware_interface", None)
if rpc is None:
logger.error("Bioyond API 客户端未初始化")
return False
try:
type1 = rpc.stock_material('{"typeMode": 1, "includeDetail": true}') or []
type2 = rpc.stock_material('{"typeMode": 2, "includeDetail": true}') or []
type0 = rpc.stock_material('{"typeMode": 0, "includeDetail": true}') or []
except Exception as exc: # pragma: no cover - 网络
logger.error(f"[Sirna sync] stock_material 调用失败: {exc}")
return False
all_rows: List[Dict[str, Any]] = []
for batch in (type0, type1, type2):
if isinstance(batch, list):
all_rows.extend(item for item in batch if isinstance(item, dict))
labware_rows, liquid_rows = self._partition_external_rows(all_rows)
# Apply liquid attachments first so duplicate writes from a re-sync are
# detected via the unilabos_extra reagent_bioyond_ids list.
deferred_liquids: List[Dict[str, Any]] = []
for row in liquid_rows:
attached = self._attach_external_liquid_row(row)
if attached.get("status") != "ok":
deferred_liquids.append({**row, "_skip_reason": attached.get("reason", "")})
# Delegate labware path to the base implementation by temporarily replacing
# the stock-material results. We re-run only when there is something to do.
if labware_rows:
try:
from unilabos.resources.graphio import resource_bioyond_to_plr
resource_bioyond_to_plr(
labware_rows,
type_mapping=self.workstation.bioyond_config.get("material_type_mappings", {}),
deck=self.workstation.deck,
)
except Exception as exc: # pragma: no cover - graphio failure
logger.error(f"[Sirna sync] labware 转换失败: {exc}")
if deferred_liquids:
logger.warning(
"[Sirna sync] %d 条 reagent 行无父 labware已延后: %s",
len(deferred_liquids),
[r.get("_skip_reason") for r in deferred_liquids[:5]],
)
return True
def _partition_external_rows(
self, rows: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Split external rows by classifier rules.
Reagent rows whose ``typeName`` is mapped to a known PLR class become
labware. Reagent rows without a mapped class are treated as liquid
content rows. ``Sample`` / ``Consumables`` rows always go to labware.
"""
from unilabos.resources.bioyond.sirna_materials import (
BioyondSirna_ReagentTrough,
get_material_class_by_type_code,
)
type_mapping = self.workstation.bioyond_config.get("material_type_mappings", {})
reverse: Dict[str, Any] = {}
for key, value in type_mapping.items():
if isinstance(value, (tuple, list)) and value:
display = value[0]
if display:
reverse.setdefault(display, key)
labware: List[Dict[str, Any]] = []
liquid: List[Dict[str, Any]] = []
for row in rows:
type_name = row.get("typeName") or ""
type_mode_int = row.get("typeMode") # external API uses int 0/1/2
mapped_key = reverse.get(type_name)
mapped_class = None
# Use mapped_key to derive class via type code if possible. Sirna materials
# uses code-based map; here we just rely on the type_name → mapping presence.
if mapped_key:
# try to find class via id; fall back to non-trough.
# In practice, Sirna materials map by code, so trough check is by name.
if "试剂槽" in type_name and BioyondSirna_ReagentTrough is not None:
mapped_class = BioyondSirna_ReagentTrough
else:
mapped_class = object # any non-trough sentinel
is_reagent_mode = (type_mode_int == 2)
if mapped_class is not None and mapped_class is not BioyondSirna_ReagentTrough:
labware.append(row)
continue
if is_reagent_mode and mapped_class is None:
liquid.append(row)
continue
if is_reagent_mode and mapped_class is BioyondSirna_ReagentTrough:
# Trough labware: PLR class exists; treat as labware.
labware.append(row)
continue
# Default: labware so we don't accidentally drop unmapped sample rows.
labware.append(row)
return labware, liquid
def _attach_external_liquid_row(self, row: Dict[str, Any]) -> Dict[str, Any]:
"""Attach an external reagent row to a parent labware on the deck.
Locates the parent through ``locations[0].whName + locations[0].code`` and
delegates to the station's ``_attach_liquid_to_parent`` helper.
"""
deck = getattr(self.workstation, "deck", None)
if deck is None:
return {"status": "deferred", "reason": "no_deck"}
locations = row.get("locations") or []
if not isinstance(locations, list) or not locations:
return {"status": "deferred", "reason": "no_locations"}
loc = locations[0] if isinstance(locations[0], dict) else {}
wh_name = str(loc.get("whName") or "")
code = str(loc.get("code") or "")
warehouse = None
for child in getattr(deck, "children", []):
if getattr(child, "name", "") == wh_name:
warehouse = child
break
if warehouse is None or not code:
return {"status": "deferred", "reason": f"no_warehouse_or_code:{wh_name}/{code}"}
# Build a synthetic mat dict in create-order shape so we can reuse the
# station-side helper without duplicating logic.
mat = {
"materialId": row.get("id"),
"materialCode": row.get("code"),
"materialName": row.get("name"),
"materialTypeId": row.get("typeId"),
"materialTypeName": row.get("typeName"),
"quantity": row.get("quantity"),
"locationId": loc.get("id"),
}
attach = getattr(self.workstation, "_attach_liquid_to_parent", None)
if attach is None:
return {"status": "deferred", "reason": "station_missing_helper"}
return attach(
mat=mat,
warehouse=warehouse,
location_code=code,
resolved={"warehouse_id": "", "warehouse_name": wh_name, "source": "stock-material"},
)
def main() -> int:
"""命令行入口:读取配置并拉取工作流列表。"""
assert DEBUG_CLI_ENABLED == True, "main 是调试/CLI 快捷入口,运行时不应调用 sirna_station.py 的 CLI 路径"

View File

@@ -0,0 +1 @@
from . import sirna_materials # noqa: F401 ensure @resource classes are importable for PLR deserialize

View File

@@ -1,4 +1,5 @@
from os import name
from pylabrobot.resources import Deck, Coordinate, Rotation
from unilabos.registry.decorators import resource
@@ -112,6 +113,16 @@ class BIOYOND_PolymerPreparationStation_Deck(Deck):
icon="配液站.webp",
)
class BIOYOND_SirnaStation_Deck(Deck):
WAREHOUSE_BIOYOND_AXIS = {
"G3移液站": "xy_col_row",
"自动化堆栈": "xy_col_row",
"离心机配平板堆栈": "xy_col_row",
}
# Bioyond warehouse UUID -> 本地仓库名称 映射。
# 留空时由配置station config 的 ``warehouse_bioyond_ids``)注入。
# graph 节点也可在 deck.config.warehouse_bioyond_ids 覆盖。
WAREHOUSE_BIOYOND_IDS: dict = {}
def __init__(
self,
name: str = "SirnaStation_Deck",
@@ -119,9 +130,15 @@ class BIOYOND_SirnaStation_Deck(Deck):
size_y: float = 1080.0,
size_z: float = 1500.0,
category: str = "deck",
setup: bool = False
setup: bool = False,
warehouse_bioyond_ids: dict | None = None,
**kwargs,
) -> None:
super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z)
# 按需写入实例级覆盖;保留默认空 mapping避免改动模型常量。
self.warehouse_bioyond_ids: dict = dict(self.WAREHOUSE_BIOYOND_IDS)
if warehouse_bioyond_ids:
self.warehouse_bioyond_ids.update(warehouse_bioyond_ids)
if setup:
self.setup()
@@ -130,7 +147,15 @@ class BIOYOND_SirnaStation_Deck(Deck):
if data.get("children") and data.get("setup") is True:
data = data.copy()
data["setup"] = False
return super().deserialize(data, allow_marshal=allow_marshal)
result = super().deserialize(data, allow_marshal=allow_marshal)
result._ensure_sirna_warehouse_axis()
return result
def _ensure_sirna_warehouse_axis(self) -> None:
for child in getattr(self, "children", []):
axis = self.WAREHOUSE_BIOYOND_AXIS.get(getattr(child, "name", ""))
if axis and not hasattr(child, "bioyond_axis"):
child.bioyond_axis = axis
def setup(self) -> None:
# Sirna 读接口 /api/storage/location/locations-by-type 返回完整固定堆栈清单。

View File

@@ -4,6 +4,19 @@ from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_reso
from unilabos.resources.warehouse import WareHouse, warehouse_factory
class BioyondWareHouse(WareHouse):
"""Bioyond 仓库,额外保存服务端 x/y 坐标语义。"""
def __init__(self, *args, bioyond_axis: str = "xy_row_col", **kwargs):
super().__init__(*args, **kwargs)
self.bioyond_axis = bioyond_axis
def serialize(self) -> dict:
data = super().serialize()
data["bioyond_axis"] = self.bioyond_axis
return data
def bioyond_warehouse_numeric_stack(
name: str,
rows: int = 10,
@@ -44,7 +57,7 @@ def bioyond_warehouse_numeric_stack(
for row in range(num_items_y)
for col in range(num_items_x)
]
warehouse = WareHouse(
warehouse = BioyondWareHouse(
name=name,
size_x=dx + item_dx * num_items_x,
size_y=dy + item_dy * num_items_y,
@@ -55,8 +68,8 @@ def bioyond_warehouse_numeric_stack(
ordering_layout="row-major",
sites={key: holder for key, holder in zip(keys, holders.values())},
category="warehouse",
bioyond_axis=bioyond_axis,
)
warehouse.bioyond_axis = bioyond_axis
return warehouse