Compare commits

...

6 Commits

Author SHA1 Message Date
Xuwznln
45efbfcd12 fix deck & host_node 2026-02-11 17:33:26 +08:00
Xuwznln
8da6fdfd0b set liquid with tube 2026-02-11 16:20:07 +08:00
Xuwznln
29ea9909a5 Merge branch 'dev' into feat/lab_resource 2026-02-11 14:04:49 +08:00
Xuwznln
f9ed6cb3fb add test_resource_schema 2026-02-11 14:02:21 +08:00
Xuwznln
699a0b3ce7 fix test resource schema 2026-02-10 23:08:29 +08:00
Xuwznln
cf3a20ae79 registry update & workflow update 2026-02-10 22:46:07 +08:00
6 changed files with 83 additions and 46 deletions

View File

@@ -21,7 +21,7 @@ from pylabrobot.resources import (
ResourceHolder, ResourceHolder,
Lid, Lid,
Trash, Trash,
Tip, Tip, TubeRack,
) )
from typing_extensions import TypedDict from typing_extensions import TypedDict
@@ -696,10 +696,13 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。 如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。
""" """
assert issubclass(plate.__class__, Plate), "plate must be a Plate" assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}"
plate: Plate = cast(Plate, cast(Resource, plate)) plate: Union[Plate, TubeRack]
# 根据 well_names 获取对应的 Well 对象 # 根据 well_names 获取对应的 Well 对象
wells = [plate.get_well(name) for name in well_names] if issubclass(plate.__class__, Plate):
wells = [plate.get_well(name) for name in well_names]
elif issubclass(plate.__class__, TubeRack):
wells = [plate.get_tube(name) for name in well_names]
res_volumes = [] res_volumes = []
# 如果 liquid_names 和 volumes 都为空,直接返回 # 如果 liquid_names 和 volumes 都为空,直接返回

View File

@@ -91,7 +91,7 @@ class PRCXI9300Deck(Deck):
""" """
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs): def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
super().__init__(name, size_x, size_y, size_z) super().__init__(size_x, size_y, size_z, name)
self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位 self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位
self.slot_locations = [Coordinate(0, 0, 0)] * 16 self.slot_locations = [Coordinate(0, 0, 0)] * 16
@@ -248,14 +248,15 @@ class PRCXI9300TipRack(TipRack):
if ordered_items is not None: if ordered_items is not None:
items = ordered_items items = ordered_items
elif ordering is not None: elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况) # 检查 ordering 中的值类型来决定如何处理:
# 如果是字符串,说明这是位置名称,需要让 TipRack 自己创建 Tip 对象 # - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param
# 我们只传递位置信息(键),不传递值,使用 ordering 参数 # - None 值(从第二次往返序列化): 同样只用键创建 ordering_param
if ordering and isinstance(next(iter(ordering.values()), None), str): # - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict first_val = next(iter(ordering.values()), None) if ordering else None
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TipRack 自己创建 Tip 对象 # 传递 ordering 参数而不是 ordered_items让 TipRack 自己创建 Tip 对象
items = None items = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else: else:
# ordering 的值已经是对象,可以直接使用 # ordering 的值已经是对象,可以直接使用
@@ -397,14 +398,15 @@ class PRCXI9300TubeRack(TubeRack):
items_to_pass = ordered_items items_to_pass = ordered_items
ordering_param = None ordering_param = None
elif ordering is not None: elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况) # 检查 ordering 中的值类型来决定如何处理:
# 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象 # - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param
# 我们只传递位置信息(键),不传递值,使用 ordering 参数 # - None 值(从第二次往返序列化): 同样只用键创建 ordering_param
if ordering and isinstance(next(iter(ordering.values()), None), str): # - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict first_val = next(iter(ordering.values()), None) if ordering else None
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象 # 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象
items_to_pass = None items_to_pass = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else: else:
# ordering 的值已经是对象,可以直接使用 # ordering 的值已经是对象,可以直接使用

View File

@@ -89,6 +89,14 @@ class Registry:
) )
test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。" test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。"
test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {})
test_resource_schema = self._generate_unilab_json_command_schema(
test_resource_method_info.get("args", []),
"test_resource",
test_resource_method_info.get("return_annotation"),
)
test_resource_schema["description"] = "用于测试物料、设备和样本。"
self.device_type_registry.update( self.device_type_registry.update(
{ {
"host_node": { "host_node": {
@@ -190,32 +198,7 @@ class Registry:
"goal": {}, "goal": {},
"feedback": {}, "feedback": {},
"result": {}, "result": {},
"schema": { "schema": test_resource_schema,
"description": "",
"properties": {
"feedback": {},
"goal": {
"properties": {
"resource": ros_message_to_json_schema(Resource, "resource"),
"resources": {
"items": {
"properties": ros_message_to_json_schema(
Resource, "resources"
),
"type": "object",
},
"type": "array",
},
"device": {"type": "string"},
"devices": {"items": {"type": "string"}, "type": "array"},
},
"type": "object",
},
"result": {},
},
"title": "test_resource",
"type": "object",
},
"placeholder_keys": { "placeholder_keys": {
"device": "unilabos_devices", "device": "unilabos_devices",
"devices": "unilabos_devices", "devices": "unilabos_devices",

View File

@@ -38,24 +38,52 @@ class LabSample(TypedDict):
extra: Dict[str, Any] extra: Dict[str, Any]
class ResourceDictPositionSizeType(TypedDict):
depth: float
width: float
height: float
class ResourceDictPositionSize(BaseModel): class ResourceDictPositionSize(BaseModel):
depth: float = Field(description="Depth", default=0.0) # z depth: float = Field(description="Depth", default=0.0) # z
width: float = Field(description="Width", default=0.0) # x width: float = Field(description="Width", default=0.0) # x
height: float = Field(description="Height", default=0.0) # y height: float = Field(description="Height", default=0.0) # y
class ResourceDictPositionScaleType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionScale(BaseModel): class ResourceDictPositionScale(BaseModel):
x: float = Field(description="x scale", default=0.0) x: float = Field(description="x scale", default=0.0)
y: float = Field(description="y scale", default=0.0) y: float = Field(description="y scale", default=0.0)
z: float = Field(description="z scale", default=0.0) z: float = Field(description="z scale", default=0.0)
class ResourceDictPositionObjectType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionObject(BaseModel): class ResourceDictPositionObject(BaseModel):
x: float = Field(description="X coordinate", default=0.0) x: float = Field(description="X coordinate", default=0.0)
y: float = Field(description="Y coordinate", default=0.0) y: float = Field(description="Y coordinate", default=0.0)
z: float = Field(description="Z coordinate", default=0.0) z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType
layout: Literal["2d", "x-y", "z-y", "x-z"]
position: ResourceDictPositionObjectType
position3d: ResourceDictPositionObjectType
rotation: ResourceDictPositionObjectType
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"]
class ResourceDictPosition(BaseModel): class ResourceDictPosition(BaseModel):
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize) size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale) scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
@@ -74,6 +102,24 @@ class ResourceDictPosition(BaseModel):
) )
class ResourceDictType(TypedDict):
id: str
uuid: str
name: str
description: str
resource_schema: Dict[str, Any]
model: Dict[str, Any]
icon: str
parent_uuid: Optional[str]
parent: Optional["ResourceDictType"]
type: Union[Literal["device"], str]
klass: str
pose: ResourceDictPositionType
config: Dict[str, Any]
data: Dict[str, Any]
extra: Dict[str, Any]
# 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化 # 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化
class ResourceDict(BaseModel): class ResourceDict(BaseModel):
id: str = Field(description="Resource ID") id: str = Field(description="Resource ID")

View File

@@ -460,7 +460,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
} }
res.response = json.dumps(final_response) res.response = json.dumps(final_response)
# 如果driver自己就有assign的方法那就使用driver自己的assign方法 # 如果driver自己就有assign的方法那就使用driver自己的assign方法
if hasattr(self.driver_instance, "create_resource"): if hasattr(self.driver_instance, "create_resource") and self.node_name != "host_node":
create_resource_func = getattr(self.driver_instance, "create_resource") create_resource_func = getattr(self.driver_instance, "create_resource")
try: try:
ret = create_resource_func( ret = create_resource_func(

View File

@@ -35,7 +35,7 @@ from unilabos.resources.resource_tracker import (
ResourceTreeInstance, ResourceTreeInstance,
RETURN_UNILABOS_SAMPLES, RETURN_UNILABOS_SAMPLES,
JSON_UNILABOS_PARAM, JSON_UNILABOS_PARAM,
PARAM_SAMPLE_UUIDS, PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample,
) )
from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import ( from unilabos.ros.msgs.message_converter import (
@@ -64,7 +64,8 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict): class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]] resources: List[List[ResourceDict]]
devices: List[DeviceSlot] devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict): class TestLatencyReturn(TypedDict):
@@ -1582,6 +1583,7 @@ class HostNode(BaseROS2DeviceNode):
def test_resource( def test_resource(
self, self,
sample_uuids: SampleUUIDsType,
resource: ResourceSlot = None, resource: ResourceSlot = None,
resources: List[ResourceSlot] = None, resources: List[ResourceSlot] = None,
device: DeviceSlot = None, device: DeviceSlot = None,
@@ -1596,6 +1598,7 @@ class HostNode(BaseROS2DeviceNode):
return { return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(), "resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(),
"devices": [device, *devices], "devices": [device, *devices],
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
} }
def handle_pong_response(self, pong_data: dict): def handle_pong_response(self, pong_data: dict):