diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 1af0a4e3..844d4cf8 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -92,7 +92,7 @@ class Registry: test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {}) test_resource_schema = self._generate_unilab_json_command_schema( test_resource_method_info.get("args", []), - "auto-test_resource", + "test_resource", test_resource_method_info.get("return_annotation"), ) test_resource_schema["description"] = "用于测试物料、设备和样本。" diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index 2054e2ab..e042ef80 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -38,24 +38,52 @@ class LabSample(TypedDict): extra: Dict[str, Any] +class ResourceDictPositionSizeType(TypedDict): + depth: float + width: float + height: float + + class ResourceDictPositionSize(BaseModel): depth: float = Field(description="Depth", default=0.0) # z width: float = Field(description="Width", default=0.0) # x height: float = Field(description="Height", default=0.0) # y +class ResourceDictPositionScaleType(TypedDict): + x: float + y: float + z: float + + class ResourceDictPositionScale(BaseModel): x: float = Field(description="x scale", default=0.0) y: float = Field(description="y scale", default=0.0) z: float = Field(description="z scale", default=0.0) +class ResourceDictPositionObjectType(TypedDict): + x: float + y: float + z: float + + class ResourceDictPositionObject(BaseModel): x: float = Field(description="X coordinate", default=0.0) y: float = Field(description="Y coordinate", default=0.0) z: float = Field(description="Z coordinate", default=0.0) +class ResourceDictPositionType(TypedDict): + size: ResourceDictPositionSizeType + scale: ResourceDictPositionScaleType + layout: Literal["2d", "x-y", "z-y", "x-z"] + position: ResourceDictPositionObjectType + position3d: ResourceDictPositionObjectType + rotation: ResourceDictPositionObjectType + cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] + + class ResourceDictPosition(BaseModel): size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize) scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale) @@ -74,6 +102,24 @@ class ResourceDictPosition(BaseModel): ) +class ResourceDictType(TypedDict): + id: str + uuid: str + name: str + description: str + resource_schema: Dict[str, Any] + model: Dict[str, Any] + icon: str + parent_uuid: Optional[str] + parent: Optional["ResourceDictType"] + type: Union[Literal["device"], str] + klass: str + pose: ResourceDictPositionType + config: Dict[str, Any] + data: Dict[str, Any] + extra: Dict[str, Any] + + # 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化 class ResourceDict(BaseModel): id: str = Field(description="Resource ID") diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 63eda320..30c5d414 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -35,7 +35,7 @@ from unilabos.resources.resource_tracker import ( ResourceTreeInstance, RETURN_UNILABOS_SAMPLES, JSON_UNILABOS_PARAM, - PARAM_SAMPLE_UUIDS, + PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample, ) from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( @@ -65,6 +65,7 @@ class DeviceActionStatus: class TestResourceReturn(TypedDict): resources: List[List[ResourceDict]] devices: List[Dict[str, Any]] + unilabos_samples: List[LabSample] class TestLatencyReturn(TypedDict): @@ -1582,6 +1583,7 @@ class HostNode(BaseROS2DeviceNode): def test_resource( self, + sample_uuids: SampleUUIDsType, resource: ResourceSlot = None, resources: List[ResourceSlot] = None, device: DeviceSlot = None, @@ -1596,6 +1598,7 @@ class HostNode(BaseROS2DeviceNode): return { "resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(), "devices": [device, *devices], + "unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()] } def handle_pong_response(self, pong_data: dict):