From edd67e4880b73b06286368a167b8816b8655cb38 Mon Sep 17 00:00:00 2001 From: q434343 <554662886@qq.com> Date: Fri, 27 Mar 2026 19:50:51 +0800 Subject: [PATCH] Merge branch 'dev' into feat/lab_resource --- unilabos/resources/resource_tracker.py | 151 +++---------------------- 1 file changed, 16 insertions(+), 135 deletions(-) diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index 1b037676..3fb945b6 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -123,24 +123,6 @@ class ResourceDictType(TypedDict): machine_name: str -class ResourceDictType(TypedDict): - id: str - uuid: str - name: str - description: str - resource_schema: Dict[str, Any] - model: Dict[str, Any] - icon: str - parent_uuid: Optional[str] - parent: Optional["ResourceDictType"] - type: Union[Literal["device"], str] - klass: str - pose: ResourceDictPositionType - config: Dict[str, Any] - data: Dict[str, Any] - extra: Dict[str, Any] - - # 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化 class ResourceDict(BaseModel): id: str = Field(description="Resource ID") @@ -571,17 +553,10 @@ class ResourceTreeSet(object): trees.append(tree_instance) return cls(trees) - def to_plr_resources( - self, skip_devices: bool = True, requested_uuids: Optional[List[str]] = None - ) -> List["PLRResource"]: + def to_plr_resources(self, skip_devices=True) -> List["PLRResource"]: """ 将 ResourceTreeSet 转换为 PLR 资源列表 - Args: - skip_devices: 是否跳过 device 类型节点 - requested_uuids: 若指定,则按此 UUID 顺序返回对应资源(用于批量查询时一一对应), - 否则返回各树的根节点列表 - Returns: List[PLRResource]: PLR 资源实例列表 """ @@ -637,71 +612,6 @@ class ResourceTreeSet(object): d["model"] = res.config.get("model", None) return d - # deserialize 会单独处理的元数据 key,不传给构造函数 - _META_KEYS = {"type", "parent_name", "location", "children", "rotation", "barcode"} - # deserialize 自定义逻辑使用的 key(如 TipSpot 用 prototype_tip 构建 make_tip),需保留 - _DESERIALIZE_PRESERVED_KEYS = {"prototype_tip"} - - def remove_incompatible_params(plr_d: dict) -> None: - """递归移除 PLR 类不接受的参数,避免 deserialize 报错。 - - 移除构造函数不接受的参数(如 compute_height_from_volume、ordering、category) - - 对 TubeRack:将 ordering 转为 ordered_items - - 保留 deserialize 自定义逻辑需要的 key(如 prototype_tip) - """ - if "type" in plr_d: - sub_cls = find_subclass(plr_d["type"], PLRResource) - if sub_cls is not None: - spec = inspect.signature(sub_cls) - valid_params = set(spec.parameters.keys()) - # TubeRack 特殊处理:先转换 ordering,再参与后续过滤 - if "ordering" not in valid_params and "ordering" in plr_d: - ordering = plr_d.pop("ordering", None) - if sub_cls.__name__ == "TubeRack": - plr_d["ordered_items"] = ( - _ordering_to_ordered_items(plr_d, ordering) - if ordering - else {} - ) - # 移除构造函数不接受的参数(保留 META 和 deserialize 自定义逻辑需要的 key) - for key in list(plr_d.keys()): - if ( - key not in _META_KEYS - and key not in _DESERIALIZE_PRESERVED_KEYS - and key not in valid_params - ): - plr_d.pop(key, None) - for child in plr_d.get("children", []): - remove_incompatible_params(child) - - def _ordering_to_ordered_items(plr_d: dict, ordering: dict) -> dict: - """将 ordering 转为 ordered_items,从 children 构建 Tube 对象""" - from pylabrobot.resources import Tube, Coordinate - from pylabrobot.serializer import deserialize as plr_deserialize - - children = plr_d.get("children", []) - ordered_items = {} - for idx, (ident, child_name) in enumerate(ordering.items()): - child_data = children[idx] if idx < len(children) else None - if child_data is None: - continue - loc_data = child_data.get("location") - loc = ( - plr_deserialize(loc_data) - if loc_data - else Coordinate(0, 0, 0) - ) - tube = Tube( - name=child_data.get("name", child_name or ident), - size_x=child_data.get("size_x", 10), - size_y=child_data.get("size_y", 10), - size_z=child_data.get("size_z", 50), - max_volume=child_data.get("max_volume", 1000), - ) - tube.location = loc - ordered_items[ident] = tube - plr_d["children"] = [] # 已并入 ordered_items,避免重复反序列化 - return ordered_items - plr_resources = [] tracker = DeviceNodeResourceTracker() @@ -721,7 +631,9 @@ class ResourceTreeSet(object): raise ValueError( f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}" ) - remove_incompatible_params(plr_dict) + spec = inspect.signature(sub_cls) + if "category" not in spec.parameters: + plr_dict.pop("category", None) plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True) from pylabrobot.resources import Coordinate from pylabrobot.serializer import deserialize @@ -735,47 +647,12 @@ class ResourceTreeSet(object): plr_resources.append(plr_resource) except Exception as e: - logger.error(f"转换 PLR 资源失败: {e}") + logger.error(f"转换 PLR 资源失败: {e} {str(plr_dict)[:1000]}") import traceback logger.error(f"堆栈: {traceback.format_exc()}") raise - if requested_uuids: - # 按请求的 UUID 顺序返回对应资源(从整棵树中按 uuid 提取) - # 优先使用 tracker.uuid_to_resources;若映射缺失,再递归遍历 PLR 树兜底搜索。 - def _find_plr_by_uuid(roots: List["PLRResource"], uid: str) -> Optional["PLRResource"]: - stack = list(roots) - while stack: - node = stack.pop() - node_uid = getattr(node, "unilabos_uuid", None) - if node_uid == uid: - return node - children = getattr(node, "children", None) or [] - stack.extend(children) - return None - - result = [] - missing_uuids = [] - for uid in requested_uuids: - found = tracker.uuid_to_resources.get(uid) - if found is None: - found = _find_plr_by_uuid(plr_resources, uid) - if found is not None: - # 回填缓存,后续相同 uuid 可直接命中 - tracker.uuid_to_resources[uid] = found - if found is None: - missing_uuids.append(uid) - else: - result.append(found) - - if missing_uuids: - raise ValueError( - f"请求的 UUID 未在资源树中找到: {missing_uuids}。" - f"可用 UUID 数量: {len(tracker.uuid_to_resources)}," - f"资源树数量: {len(self.trees)}" - ) - return result return plr_resources @classmethod @@ -793,13 +670,7 @@ class ResourceTreeSet(object): ValueError: 当建立关系时发现不一致 """ # 第一步:将字典列表转换为 ResourceDictInstance 列表 - parsed_list = [] - for node_dict in raw_list: - if isinstance(node_dict, str): - import json - node_dict = json.loads(node_dict) - parsed_list.append(node_dict) - instances = [ResourceDictInstance.get_resource_instance_from_dict(node_dict) for node_dict in parsed_list] + instances = [ResourceDictInstance.get_resource_instance_from_dict(node_dict) for node_dict in raw_list] # 第二步:建立映射关系 uuid_to_instance: Dict[str, ResourceDictInstance] = {} @@ -870,6 +741,16 @@ class ResourceTreeSet(object): """ return [tree.root_node for tree in self.trees] + @property + def root_nodes_uuid(self) -> List[ResourceDictInstance]: + """ + 获取所有树的根节点 + + Returns: + 所有根节点的资源实例列表 + """ + return [tree.root_node.res_content.uuid for tree in self.trees] + @property def all_nodes(self) -> List[ResourceDictInstance]: """