diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 3c51b349..72c079a1 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -6,6 +6,8 @@ HTTP客户端模块 import json import os +import time +from threading import Thread from typing import List, Dict, Any, Optional import requests @@ -84,14 +86,14 @@ class HTTPClient: f"{self.remote_addr}/edge/material", json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, headers={"Authorization": f"Lab {self.auth}"}, - timeout=100, + timeout=60, ) else: response = requests.put( f"{self.remote_addr}/edge/material", json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, headers={"Authorization": f"Lab {self.auth}"}, - timeout=100, + timeout=10, ) with open(os.path.join(BasicConfig.working_dir, "res_resource_tree_add.json"), "w", encoding="utf-8") as f: diff --git a/unilabos/devices/workstation/bioyond_studio/station.py b/unilabos/devices/workstation/bioyond_studio/station.py index 33975d8c..21957cd2 100644 --- a/unilabos/devices/workstation/bioyond_studio/station.py +++ b/unilabos/devices/workstation/bioyond_studio/station.py @@ -4,6 +4,7 @@ Bioyond Workstation Implementation 集成Bioyond物料管理的工作站示例 """ +import time import traceback from datetime import datetime from typing import Dict, Any, List, Optional, Union diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 70e569ba..b6cec380 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -535,6 +535,7 @@ def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR": def resource_ulab_to_plr_inner(resource: dict): all_states[resource["name"]] = resource["data"] + extra = resource.pop("extra", {}) d = { "name": resource["name"], "type": resource["type"], diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index b57d30b0..d9ad3682 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -10,7 +10,7 @@ from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher from unilabos_msgs.srv import SerialCommand # type: ignore -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor from rclpy.node import Node from rclpy.timer import Timer diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 289fe513..f1063123 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -49,7 +49,7 @@ from unilabos_msgs.msg import Resource # type: ignore from unilabos.ros.nodes.resource_tracker import ( DeviceNodeResourceTracker, - ResourceTreeSet, + ResourceTreeSet, ResourceTreeInstance, ) from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator @@ -338,12 +338,12 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 创建资源管理客户端 self._resource_clients: Dict[str, Client] = { - "resource_add": self.create_client(ResourceAdd, "/resources/add"), - "resource_get": self.create_client(SerialCommand, "/resources/get"), - "resource_delete": self.create_client(ResourceDelete, "/resources/delete"), - "resource_update": self.create_client(ResourceUpdate, "/resources/update"), - "resource_list": self.create_client(ResourceList, "/resources/list"), - "c2s_update_resource_tree": self.create_client(SerialCommand, "/c2s_update_resource_tree"), + "resource_add": self.create_client(ResourceAdd, "/resources/add", callback_group=self.callback_group), + "resource_get": self.create_client(SerialCommand, "/resources/get", callback_group=self.callback_group), + "resource_delete": self.create_client(ResourceDelete, "/resources/delete", callback_group=self.callback_group), + "resource_update": self.create_client(ResourceUpdate, "/resources/update", callback_group=self.callback_group), + "resource_list": self.create_client(ResourceList, "/resources/list", callback_group=self.callback_group), + "c2s_update_resource_tree": self.create_client(SerialCommand, "/c2s_update_resource_tree", callback_group=self.callback_group), } def re_register_device(req, res): @@ -573,6 +573,52 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().error(traceback.format_exc()) self.lab_logger().debug(f"资源更新结果: {response}") + def transfer_to_new_resource(self, plr_resource: "ResourcePLR", tree: ResourceTreeInstance, additional_add_params: Dict[str, Any]): + parent_uuid = tree.root_node.res_content.parent_uuid + if parent_uuid: + parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid) + if parent_resource is None: + self.lab_logger().warning( + f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在" + ) + else: + try: + # 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步 + additional_params = {} + extra = getattr(plr_resource, "unilabos_extra", {}) + if len(extra): + self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra)) + if "update_resource_site" in extra: + additional_add_params["site"] = extra["update_resource_site"] + site = additional_add_params.get("site", None) + spec = inspect.signature(parent_resource.assign_child_resource) + if "spot" in spec.parameters: + ordering_dict: Dict[str, Any] = getattr(parent_resource, "_ordering") + if ordering_dict: + site = list(ordering_dict.keys()).index(site) + additional_params["spot"] = site + old_parent = plr_resource.parent + if old_parent is not None: + # plr并不支持同一个deck的加载和卸载 + self.lab_logger().warning( + f"物料{plr_resource}请求从{old_parent}卸载" + ) + old_parent.unassign_child_resource(plr_resource) + self.lab_logger().warning( + f"物料{plr_resource}请求挂载到{parent_resource},额外参数:{additional_params}" + ) + parent_resource.assign_child_resource( + plr_resource, location=None, **additional_params + ) + func = getattr(self.driver_instance, "resource_tree_transfer", None) + if callable(func): + # 分别是 物料的原来父节点,当前物料的状态,物料的新父节点(此时物料已经重新assign了) + func(old_parent, plr_resource, parent_resource) + except Exception as e: + self.lab_logger().warning( + f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}" + ) + async def s2c_resource_tree(self, req: SerialCommand_Request, res: SerialCommand_Response): """ 处理资源树更新请求 @@ -613,28 +659,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): plr_resources = tree_set.to_plr_resources() for plr_resource, tree in zip(plr_resources, tree_set.trees): self.resource_tracker.add_resource(plr_resource) - parent_uuid = tree.root_node.res_content.parent_uuid - if parent_uuid: - parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid) - if parent_resource is None: - self.lab_logger().warning( - f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在" - ) - else: - try: - # 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步 - additional_params = {} - site = additional_add_params.get("site", None) - spec = inspect.signature(parent_resource.assign_child_resource) - if "spot" in spec.parameters: - additional_params["spot"] = site - parent_resource.assign_child_resource( - plr_resource, location=None, **additional_params - ) - except Exception as e: - self.lab_logger().warning( - f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}" - ) + self.transfer_to_new_resource(plr_resource, tree, additional_add_params) func = getattr(self.driver_instance, "resource_tree_add", None) if callable(func): func(plr_resources) @@ -647,6 +672,17 @@ class BaseROS2DeviceNode(Node, Generic[T]): original_instance: ResourcePLR = self.resource_tracker.figure_resource( {"uuid": tree.root_node.res_content.uuid}, try_mode=False ) + original_parent_resource = original_instance.parent + original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None) + target_parent_resource_uuid = tree.root_node.res_content.uuid_parent + self.lab_logger().info( + f"物料{original_instance} 原始父节点{original_parent_resource_uuid} 目标父节点{target_parent_resource_uuid} 更新" + ) + # todo: 对extra进行update + if getattr(plr_resource, "unilabos_extra", None) is not None: + original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") + if original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None: + self.transfer_to_new_resource(original_instance, tree, additional_add_params) original_instance.load_all_state(states) self.lab_logger().info( f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())} 个" @@ -879,7 +915,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): action_type, action_name, execute_callback=self._create_execute_callback(action_name, action_value_mapping), - callback_group=ReentrantCallbackGroup(), + callback_group=self.callback_group, ) self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") @@ -1500,7 +1536,7 @@ class ROS2DeviceNode: asyncio.set_event_loop(loop) loop.run_forever() - ROS2DeviceNode._loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNode") + ROS2DeviceNode._loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNodeLoop") ROS2DeviceNode._loop_thread.start() logger.info(f"循环线程已启动") diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 932c4885..43d16e8d 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -285,7 +285,7 @@ class HostNode(BaseROS2DeviceNode): # 创建定时器,定期发现设备 self._discovery_timer = self.create_timer( - discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup() + discovery_interval, self._discovery_devices_callback, callback_group=self.callback_group ) # 添加ping-pong相关属性 @@ -618,7 +618,7 @@ class HostNode(BaseROS2DeviceNode): topic, lambda msg, d=device_id, p=property_name: self.property_callback(msg, d, p), 1, - callback_group=ReentrantCallbackGroup(), + callback_group=self.callback_group, ) # 标记为已订阅 self._subscribed_topics.add(topic) @@ -829,37 +829,37 @@ class HostNode(BaseROS2DeviceNode): def _init_host_service(self): self._resource_services: Dict[str, Service] = { "resource_add": self.create_service( - ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=ReentrantCallbackGroup() + ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=self.callback_group ), "resource_get": self.create_service( - SerialCommand, "/resources/get", self._resource_get_callback, callback_group=ReentrantCallbackGroup() + SerialCommand, "/resources/get", self._resource_get_callback, callback_group=self.callback_group ), "resource_delete": self.create_service( ResourceDelete, "/resources/delete", self._resource_delete_callback, - callback_group=ReentrantCallbackGroup(), + callback_group=self.callback_group, ), "resource_update": self.create_service( ResourceUpdate, "/resources/update", self._resource_update_callback, - callback_group=ReentrantCallbackGroup(), + callback_group=self.callback_group, ), "resource_list": self.create_service( - ResourceList, "/resources/list", self._resource_list_callback, callback_group=ReentrantCallbackGroup() + ResourceList, "/resources/list", self._resource_list_callback, callback_group=self.callback_group ), "node_info_update": self.create_service( SerialCommand, "/node_info_update", self._node_info_update_callback, - callback_group=ReentrantCallbackGroup(), + callback_group=self.callback_group, ), "c2s_update_resource_tree": self.create_service( SerialCommand, "/c2s_update_resource_tree", self._resource_tree_update_callback, - callback_group=ReentrantCallbackGroup(), + callback_group=self.callback_group, ), } diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index dc1175d6..af1afab5 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -194,7 +194,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): action_type, action_name, execute_callback=self._create_protocol_execute_callback(action_name, protocol_steps_generator), - callback_group=ReentrantCallbackGroup(), + callback_group=self.callback_group, ) self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") return diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 1506007b..c958fe7b 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -32,7 +32,7 @@ class ResourceDictPositionObject(BaseModel): class ResourceDictPosition(BaseModel): size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize) scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale) - layout: Literal["2d", "x-y", "z-y", "x-z", ""] = Field(description="Resource layout", default="x-y") + layout: Literal["2d", "x-y", "z-y", "x-z"] = Field(description="Resource layout", default="x-y") position: ResourceDictPositionObject = Field( description="Resource position", default_factory=ResourceDictPositionObject ) @@ -42,7 +42,9 @@ class ResourceDictPosition(BaseModel): rotation: ResourceDictPositionObject = Field( description="Resource rotation", default_factory=ResourceDictPositionObject ) - cross_section_type: Literal["rectangle", "circle", "rounded_rectangle", ""] = Field(description="Cross section type", default="rectangle") + cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field( + description="Cross section type", default="rectangle" + ) # 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化 @@ -51,7 +53,9 @@ class ResourceDict(BaseModel): uuid: str = Field(description="Resource UUID") name: str = Field(description="Resource name") description: str = Field(description="Resource description", default="") - resource_schema: Dict[str, Any] = Field(description="Resource schema", default_factory=dict, serialization_alias="schema", validation_alias="schema") + resource_schema: Dict[str, Any] = Field( + description="Resource schema", default_factory=dict, serialization_alias="schema", validation_alias="schema" + ) model: Dict[str, Any] = Field(description="Resource model", default_factory=dict) icon: str = Field(description="Resource icon", default="") parent_uuid: Optional["str"] = Field(description="Parent resource uuid", default=None) # 先设定parent_uuid @@ -62,6 +66,7 @@ class ResourceDict(BaseModel): pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) config: Dict[str, Any] = Field(description="Resource configuration") data: Dict[str, Any] = Field(description="Resource data") + extra: Dict[str, Any] = Field(description="Extra data") @field_serializer("parent_uuid") def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]): @@ -138,6 +143,8 @@ class ResourceDictInstance(object): content["config"] = {} if not content.get("data"): content["data"] = {} + if not content.get("extra"): # MagicCode + content["extra"] = {} if "pose" not in content: content["pose"] = content.get("position", {}) return ResourceDictInstance(ResourceDict.model_validate(content)) @@ -322,21 +329,25 @@ class ResourceTreeSet(object): print("转换pylabrobot的时候,出现未知类型", source) return source - def build_uuid_mapping(res: "PLRResource", uuid_list: list): - """递归构建uuid映射字典""" + def build_uuid_mapping(res: "PLRResource", uuid_list: list, parent_uuid: Optional[str] = None): + """递归构建uuid和extra映射字典,返回(current_uuid, parent_uuid, extra)元组列表""" uid = getattr(res, "unilabos_uuid", "") if not uid: uid = str(uuid.uuid4()) res.unilabos_uuid = uid logger.warning(f"{res}没有uuid,请设置后再传入,默认填充{uid}!\n{traceback.format_exc()}") - uuid_list.append(uid) + + # 获取unilabos_extra,默认为空字典 + extra = getattr(res, "unilabos_extra", {}) + + uuid_list.append((uid, parent_uuid, extra)) for child in res.children: - build_uuid_mapping(child, uuid_list) + build_uuid_mapping(child, uuid_list, uid) def resource_plr_inner( d: dict, parent_resource: Optional[ResourceDict], states: dict, uuids: list ) -> ResourceDictInstance: - current_uuid = uuids.pop(0) + current_uuid, parent_uuid, extra = uuids.pop(0) raw_pos = ( {"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]} @@ -359,13 +370,30 @@ class ResourceTreeSet(object): "uuid": current_uuid, "name": d["name"], "parent": parent_resource, # 直接传入 ResourceDict 对象 + "parent_uuid": parent_uuid, # 使用 parent_uuid 而不是 parent 对象 "type": replace_plr_type(d.get("category", "")), "class": d.get("class", ""), "position": pos, "pose": pos, - "config": {k: v for k, v in d.items() if k not in - ["name", "children", "parent_name", "location", "rotation", "size_x", "size_y", "size_z", "cross_section_type", "bottom_type"]}, + "config": { + k: v + for k, v in d.items() + if k + not in [ + "name", + "children", + "parent_name", + "location", + "rotation", + "size_x", + "size_y", + "size_z", + "cross_section_type", + "bottom_type", + ] + }, "data": states[d["name"]], + "extra": extra, } # 先转换为 ResourceDictInstance,获取其中的 ResourceDict @@ -383,7 +411,7 @@ class ResourceTreeSet(object): for resource in resources: # 构建uuid列表 uuid_list = [] - build_uuid_mapping(resource, uuid_list) + build_uuid_mapping(resource, uuid_list, getattr(resource.parent, "unilabos_uuid", None)) serialized_data = resource.serialize() all_states = resource.serialize_all_state() @@ -408,12 +436,13 @@ class ResourceTreeSet(object): # 类型映射 TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer"} - def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict): - """一次遍历收集 name_to_uuid 和 all_states""" + def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict, name_to_extra: dict): + """一次遍历收集 name_to_uuid, all_states 和 name_to_extra""" name_to_uuid[node.res_content.name] = node.res_content.uuid all_states[node.res_content.name] = node.res_content.data + name_to_extra[node.res_content.name] = node.res_content.extra for child in node.children: - collect_node_data(child, name_to_uuid, all_states) + collect_node_data(child, name_to_uuid, all_states, name_to_extra) def node_to_plr_dict(node: ResourceDictInstance, has_model: bool): """转换节点为 PLR 字典格式""" @@ -423,6 +452,7 @@ class ResourceTreeSet(object): logger.warning(f"未知类型 {res.type}") d = { + **res.config, "name": res.name, "type": res.config.get("type", plr_type), "size_x": res.config.get("size_x", 0), @@ -438,33 +468,35 @@ class ResourceTreeSet(object): "category": res.config.get("category", plr_type), "children": [node_to_plr_dict(child, has_model) for child in node.children], "parent_name": res.parent_instance_name, - **res.config, } if has_model: d["model"] = res.config.get("model", None) return d plr_resources = [] - trees = [] tracker = DeviceNodeResourceTracker() for tree in self.trees: name_to_uuid: Dict[str, str] = {} all_states: Dict[str, Any] = {} - collect_node_data(tree.root_node, name_to_uuid, all_states) + name_to_extra: Dict[str, dict] = {} + collect_node_data(tree.root_node, name_to_uuid, all_states, name_to_extra) has_model = tree.root_node.res_content.type != "deck" plr_dict = node_to_plr_dict(tree.root_node, has_model) try: sub_cls = find_subclass(plr_dict["type"], PLRResource) if sub_cls is None: - raise ValueError(f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}") + raise ValueError( + f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}" + ) spec = inspect.signature(sub_cls) if "category" not in spec.parameters: plr_dict.pop("category", None) plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True) plr_resource.load_all_state(all_states) - # 使用 DeviceNodeResourceTracker 设置 UUID + # 使用 DeviceNodeResourceTracker 设置 UUID 和 Extra tracker.loop_set_uuid(plr_resource, name_to_uuid) + tracker.loop_set_extra(plr_resource, name_to_extra) plr_resources.append(plr_resource) except Exception as e: @@ -806,6 +838,20 @@ class DeviceNodeResourceTracker(object): else: setattr(resource, "unilabos_uuid", new_uuid) + @staticmethod + def set_resource_extra(resource, extra: dict): + """ + 设置资源的 extra,统一处理 dict 和 instance 两种类型 + + Args: + resource: 资源对象(dict或实例) + extra: extra字典值 + """ + if isinstance(resource, dict): + resource["extra"] = extra + else: + setattr(resource, "unilabos_extra", extra) + def _traverse_and_process(self, resource, process_func) -> int: """ 递归遍历资源树,对每个节点执行处理函数 @@ -854,6 +900,29 @@ class DeviceNodeResourceTracker(object): return self._traverse_and_process(resource, process) + def loop_set_extra(self, resource, name_to_extra_map: Dict[str, dict]) -> int: + """ + 递归遍历资源树,根据 name 设置所有节点的 extra + + Args: + resource: 资源对象(可以是dict或实例) + name_to_extra_map: name到extra的映射字典,{name: extra} + + Returns: + 更新的资源数量 + """ + + def process(res): + resource_name = self._get_resource_attr(res, "name") + if resource_name and resource_name in name_to_extra_map: + extra = name_to_extra_map[resource_name] + self.set_resource_extra(res, extra) + logger.debug(f"设置资源Extra: {resource_name} -> {extra}") + return 1 + return 0 + + return self._traverse_and_process(resource, process) + def loop_update_uuid(self, resource, uuid_map: Dict[str, str]) -> int: """ 递归遍历资源树,更新所有节点的uuid @@ -896,7 +965,9 @@ class DeviceNodeResourceTracker(object): if current_uuid: old = self.uuid_to_resources.get(current_uuid) self.uuid_to_resources[current_uuid] = res - logger.debug(f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}") + logger.debug( + f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}" + ) return 0 self._traverse_and_process(resource, process)