From 84944396e90169468b44ab9423f0b563197ff7e5 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 29 May 2025 20:43:01 +0800 Subject: [PATCH] 34 icon support online (#35) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * unify liquid_handler definition * remove default values * Dev Sync (#25) * Update README and MQTTClient for installation instructions and code improvements * feat: 支持local_config启动 add: 增加对crt path的说明,为传入config.py的相对路径 move: web component * add: registry description * add 3d visualization * 完成在main中启动设备可视化 完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model 添加物料模型管理类,遍历物料与resource_model,完成TF数据收集 * 完成TF发布 * 修改模型方向,在yaml中添加变换属性 * 添加物料tf变化时,发送topic到前端 另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题 * 添加关节发布节点与物料可视化节点进入unilab * 使用json启动plr与3D模型仿真 * feat: node_info_update srv fix: OTDeck cant create * close #12 feat: slave node registry * feat: show machine name fix: host node registry not uploaded * feat: add hplc registry * feat: add hplc registry * fix: hplc status typo * fix: devices/ * 完成启动OT并联动rviz * add 3d visualization * 完成在main中启动设备可视化 完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model 添加物料模型管理类,遍历物料与resource_model,完成TF数据收集 * 完成TF发布 * 修改模型方向,在yaml中添加变换属性 * 添加物料tf变化时,发送topic到前端 另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题 * 添加关节发布节点与物料可视化节点进入unilab * 使用json启动plr与3D模型仿真 * 完成启动OT并联动rviz * fix: device.class possible null * fix: HPLC additions with online service * fix: slave mode spin not working * fix: slave mode spin not working * 修复rviz位置问题, 修复rviz位置问题, 在无tf变动时减缓发送频率 在backend中添加物料跟随方法 * feat: 多ProtocolNode 允许子设备ID相同 feat: 上报发现的ActionClient feat: Host重启动,通过discover机制要求slaveNode重新注册,实现信息及时上报 * feat: 支持env设置config * fix: running logic * fix: running logic * fix: missing ot * 在main中直接初始化republisher和物料的mesh节点 * 将joint_republisher和resource_mesh_manager添加进 main_slave_run.py中 * Device visualization (#14) * add 3d visualization * 完成在main中启动设备可视化 完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model 添加物料模型管理类,遍历物料与resource_model,完成TF数据收集 * 完成TF发布 * 修改模型方向,在yaml中添加变换属性 * 添加物料tf变化时,发送topic到前端 另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题 * 添加关节发布节点与物料可视化节点进入unilab * 使用json启动plr与3D模型仿真 * 完成启动OT并联动rviz * add 3d visualization * 完成在main中启动设备可视化 完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model 添加物料模型管理类,遍历物料与resource_model,完成TF数据收集 * 完成TF发布 * 修改模型方向,在yaml中添加变换属性 * 添加物料tf变化时,发送topic到前端 另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题 * 添加关节发布节点与物料可视化节点进入unilab * 使用json启动plr与3D模型仿真 * 完成启动OT并联动rviz * 修复rviz位置问题, 修复rviz位置问题, 在无tf变动时减缓发送频率 在backend中添加物料跟随方法 * fix: running logic * fix: running logic * fix: missing ot * 在main中直接初始化republisher和物料的mesh节点 * 将joint_republisher和resource_mesh_manager添加进 main_slave_run.py中 --------- Co-authored-by: zhangshixiang <@zhangshixiang> Co-authored-by: wznln <18435084+Xuwznln@users.noreply.github.com> * fix: missing hostname in devices_names fix: upload_file for model file * fix: missing paho-mqtt package bump version to 0.9.0 * fix startup add ResourceCreateFromOuter.action * fix type hint * update actions * update actions * host node add_resource_from_outer fix cmake list * pass device config to device class * add: bind_parent_ids to resource create action fix: message convert string * fix: host node should not be re_discovered * feat: resource tracker support dict * feat: add more necessary params * feat: fix boolean null in registry action data * feat: add outer resource * 编写mesh添加action * feat: append resource * add action * feat: vis 2d for plr * fix * fix: browser on rviz * fix: cloud bridge error fallback to local * fix: salve auto run rviz * 初始化两个plate * Device visualization (#22) * add 3d visualization * 完成在main中启动设备可视化 完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model 添加物料模型管理类,遍历物料与resource_model,完成TF数据收集 * 完成TF发布 * 修改模型方向,在yaml中添加变换属性 * 添加物料tf变化时,发送topic到前端 另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题 * 添加关节发布节点与物料可视化节点进入unilab * 使用json启动plr与3D模型仿真 * 完成启动OT并联动rviz * add 3d visualization * 完成在main中启动设备可视化 完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model 添加物料模型管理类,遍历物料与resource_model,完成TF数据收集 * 完成TF发布 * 修改模型方向,在yaml中添加变换属性 * 添加物料tf变化时,发送topic到前端 另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题 * 添加关节发布节点与物料可视化节点进入unilab * 使用json启动plr与3D模型仿真 * 完成启动OT并联动rviz * 修复rviz位置问题, 修复rviz位置问题, 在无tf变动时减缓发送频率 在backend中添加物料跟随方法 * fix: running logic * fix: running logic * fix: missing ot * 在main中直接初始化republisher和物料的mesh节点 * 将joint_republisher和resource_mesh_manager添加进 main_slave_run.py中 * 编写mesh添加action * add action * fix * fix: browser on rviz * fix: cloud bridge error fallback to local * fix: salve auto run rviz * 初始化两个plate --------- Co-authored-by: zhangshixiang <@zhangshixiang> Co-authored-by: wznln <18435084+Xuwznln@users.noreply.github.com> * fix: multi channel * fix: aspirate * fix: aspirate * fix: aspirate * fix: aspirate * 提交 * fix: jobadd * fix: jobadd * fix: msg converter * tijiao * add resource creat easy action * identify debug msg * mq client id --------- Co-authored-by: Harvey Que Co-authored-by: zhangshixiang <@zhangshixiang> Co-authored-by: q434343 <73513873+q434343@users.noreply.github.com> * remove default behavior for visualization * change liquidhandler name * resource create from outer easy * add easy resource creation * easy resource creation logic * remove wrongly debug msg from others * remove wrongly debug msg from others * add missing action clients * fix device_id * fix slot_on_deck * fix registry typo * complete require packages msg converter support array string implements create resource logic * 修复port输入 * 修复必须两次启动edge后端才有节点生成的bug 新增resources报送 * 新增延迟统计 --------- Co-authored-by: Junhan Chang Co-authored-by: Harvey Que Co-authored-by: q434343 <73513873+q434343@users.noreply.github.com> --- .gitignore | 1 + unilabos/app/controler.py | 4 +- unilabos/app/model.py | 5 +- unilabos/app/mq.py | 39 +- unilabos/registry/registry.py | 146 ++++---- unilabos/ros/nodes/presets/host_node.py | 334 ++++++++++++++---- .../action/LiquidHandlerTransfer.action | 1 - 7 files changed, 399 insertions(+), 131 deletions(-) diff --git a/.gitignore b/.gitignore index 333df5bf..e2c20639 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ __pycache__/ .vscode *.py[cod] *$py.class +service # C extensions *.so diff --git a/unilabos/app/controler.py b/unilabos/app/controler.py index f58f53ab..5d552565 100644 --- a/unilabos/app/controler.py +++ b/unilabos/app/controler.py @@ -31,6 +31,6 @@ def job_add(req: JobAddReq) -> JobData: action_kwargs = {"command": json.dumps(action_kwargs)} elif "command" in action_kwargs: action_kwargs = action_kwargs["command"] - print(f"job_add:{req.device_id} {action_name} {action_kwargs}") - HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id) + # print(f"job_add:{req.device_id} {action_name} {action_kwargs}") + HostNode.get_instance().send_goal(req.device_id, action_name=action_name, action_kwargs=action_kwargs, goal_uuid=req.job_id, server_info=req.server_info) return JobData(jobId=req.job_id) diff --git a/unilabos/app/model.py b/unilabos/app/model.py index ee7568fa..a5b8c786 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -51,8 +51,9 @@ class Resp(BaseModel): class JobAddReq(BaseModel): device_id: str = Field(examples=["Gripper"], description="device id") data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}]) - job_id: str = Field(examples=["sfsfsfeq"], description="goal uuid") - node_id: str = Field(examples=["sfsfsfeq"], description="node uuid") + job_id: str = Field(examples=["job_id"], description="goal uuid") + node_id: str = Field(examples=["node_id"], description="node uuid") + server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info") class JobStepFinishReq(BaseModel): diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index 0bac96f1..9f870691 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -12,7 +12,7 @@ import tempfile import os from unilabos.config.config import MQConfig -from unilabos.app.controler import devices, job_add +from unilabos.app.controler import job_add from unilabos.app.model import JobAddReq from unilabos.utils import logger from unilabos.utils.type_check import TypeEncoder @@ -43,13 +43,10 @@ class MQTTClient: def _on_connect(self, client, userdata, flags, rc, properties=None): logger.info("[MQTT] Connected with result code " + str(rc)) client.subscribe(f"labs/{MQConfig.lab_id}/job/start/", 0) - isok, data = devices() - if not isok: - logger.error("[MQTT] on_connect ErrorHostNotInit") - return + client.subscribe(f"labs/{MQConfig.lab_id}/pong/", 0) def _on_message(self, client, userdata, msg) -> None: - logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload)) + # logger.info("[MQTT] on_message<<<< " + msg.topic + " " + str(msg.payload)) try: payload_str = msg.payload.decode("utf-8") payload_json = json.loads(payload_str) @@ -63,6 +60,14 @@ class MQTTClient: job_req = JobAddReq.model_validate(payload_json) data = job_add(job_req) return + elif msg.topic == f"labs/{MQConfig.lab_id}/pong/": + # 处理pong响应,通知HostNode + from unilabos.ros.nodes.presets.host_node import HostNode + + host_instance = HostNode.get_instance(0) + if host_instance: + host_instance.handle_pong_response(payload_json) + return except json.JSONDecodeError as e: logger.error(f"[MQTT] JSON 解析错误: {e}") @@ -179,6 +184,28 @@ class MQTTClient: self.client.publish(address, json.dumps(action_info), qos=2) logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}") + def send_ping(self, ping_id: str, timestamp: float): + """发送ping消息到服务端""" + if self.mqtt_disable: + return + address = f"labs/{MQConfig.lab_id}/ping/" + ping_data = {"ping_id": ping_id, "client_timestamp": timestamp, "type": "ping"} + self.client.publish(address, json.dumps(ping_data), qos=2) + + def setup_pong_subscription(self): + """设置pong消息订阅""" + if self.mqtt_disable: + return + pong_topic = f"labs/{MQConfig.lab_id}/pong/" + self.client.subscribe(pong_topic, 0) + logger.debug(f"Subscribed to pong topic: {pong_topic}") + + def handle_pong(self, pong_data: dict): + """处理pong响应(这个方法会在收到pong消息时被调用)""" + logger.debug(f"Pong received: {pong_data}") + # 这里会被HostNode的ping-pong处理逻辑调用 + pass + mqtt_client = MQTTClient() diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 91789b71..c68e0d8d 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -25,58 +25,10 @@ class Registry: self.ResourceCreateFromOuterEasy = self._replace_type_with_class( "ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource" ) - self.device_type_registry = { - "host_node": { - "description": "UniLabOS主机节点", - "class": { - "module": "unilabos.ros.nodes.presets.host_node", - "type": "python", - "status_types": {}, - "action_value_mappings": { - "create_resource_detailed": { - "type": msg_converter_manager.search_class("ResourceCreateFromOuter"), - "goal": { - "resources": "resources", - "device_ids": "device_ids", - "bind_parent_ids": "bind_parent_ids", - "bind_locations": "bind_locations", - "other_calling_params": "other_calling_params", - }, - "feedback": {}, - "result": { - "success": "success" - }, - "schema": ros_action_to_json_schema(self.ResourceCreateFromOuter) - }, - "create_resource": { - "type": msg_converter_manager.search_class("ResourceCreateFromOuterEasy"), - "goal": { - "res_id": "res_id", - "class_name": "class_name", - "parent": "parent", - "device_id": "device_id", - "bind_locations": "bind_locations", - "liquid_input_slot": "liquid_input_slot[]", - "liquid_type": "liquid_type[]", - "liquid_volume": "liquid_volume[]", - "slot_on_deck": "slot_on_deck", - }, - "feedback": {}, - "result": { - "success": "success" - }, - "schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy) - } - } - }, - "schema": { - "properties": {}, - "additionalProperties": False, - "type": "object" - }, - "file_path": "/" - } - } + self.EmptyIn = self._replace_type_with_class( + "EmptyIn", "host_node", f"" + ) + self.device_type_registry = {} self.resource_type_registry = {} self._setup_called = False # 跟踪setup是否已调用 # 其他状态变量 @@ -88,9 +40,70 @@ class Registry: logger.critical("[UniLab Registry] setup方法已被调用过,不允许多次调用") return - # 标记setup已被调用 - self._setup_called = True + from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type + self.device_type_registry.update( + { + "host_node": { + "description": "UniLabOS主机节点", + "class": { + "module": "unilabos.ros.nodes.presets.host_node", + "type": "python", + "status_types": {}, + "action_value_mappings": { + "create_resource_detailed": { + "type": self.ResourceCreateFromOuter, + "goal": { + "resources": "resources", + "device_ids": "device_ids", + "bind_parent_ids": "bind_parent_ids", + "bind_locations": "bind_locations", + "other_calling_params": "other_calling_params", + }, + "feedback": {}, + "result": {"success": "success"}, + "schema": ros_action_to_json_schema(self.ResourceCreateFromOuter), + "goal_default": yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal)) + ), + }, + "create_resource": { + "type": self.ResourceCreateFromOuterEasy, + "goal": { + "res_id": "res_id", + "class_name": "class_name", + "parent": "parent", + "device_id": "device_id", + "bind_locations": "bind_locations", + "liquid_input_slot": "liquid_input_slot[]", + "liquid_type": "liquid_type[]", + "liquid_volume": "liquid_volume[]", + "slot_on_deck": "slot_on_deck", + }, + "feedback": {}, + "result": {"success": "success"}, + "schema": ros_action_to_json_schema(self.ResourceCreateFromOuterEasy), + "goal_default": yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal)) + ), + }, + "test_latency": { + "type": self.EmptyIn, + "goal": {}, + "feedback": {}, + "result": {"latency_ms": "latency_ms", "time_diff_ms": "time_diff_ms"}, + "schema": ros_action_to_json_schema(self.EmptyIn), + "goal_default": {}, + }, + }, + }, + "icon": "icon_device.webp", + "registry_type": "device", + "schema": {"properties": {}, "additionalProperties": False, "type": "object"}, + "file_path": "/", + } + } + ) logger.debug(f"[UniLab Registry] ----------Setup----------") self.registry_paths = [Path(path).absolute() for path in self.registry_paths] for i, path in enumerate(self.registry_paths): @@ -100,6 +113,8 @@ class Registry: self.load_device_types(path) self.load_resource_types(path) logger.info("[UniLab Registry] 注册表设置完成") + # 标记setup已被调用 + self._setup_called = True def load_resource_types(self, path: os.PathLike): abs_path = Path(path).absolute() @@ -115,6 +130,9 @@ class Registry: resource_info["file_path"] = str(file.absolute()).replace("\\", "/") if "description" not in resource_info: resource_info["description"] = "" + if "icon" not in resource_info: + resource_info["icon"] = "" + resource_info["registry_type"] = "resource" self.resource_type_registry.update(data) logger.debug( f"[UniLab Registry] Resource-{current_resource_number} File-{i+1}/{len(files)} " @@ -164,6 +182,7 @@ class Registry: ) current_device_number = len(self.device_type_registry) + 1 from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type + for i, file in enumerate(files): data = yaml.safe_load(open(file, encoding="utf-8")) if data: @@ -173,6 +192,9 @@ class Registry: device_config["file_path"] = str(file.absolute()).replace("\\", "/") if "description" not in device_config: device_config["description"] = "" + if "icon" not in device_config: + device_config["icon"] = "" + device_config["registry_type"] = "device" if "class" in device_config: # 处理状态类型 if "status_types" in device_config["class"]: @@ -189,7 +211,9 @@ class Registry: action_config["type"], device_id, f"动作 {action_name}" ) if action_config["type"] is not None: - action_config["goal_default"] = yaml.safe_load(io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal))) + action_config["goal_default"] = yaml.safe_load( + io.StringIO(get_yaml_from_goal_type(action_config["type"].Goal)) + ) action_config["schema"] = ros_action_to_json_schema(action_config["type"]) else: logger.warning( @@ -212,13 +236,17 @@ class Registry: def obtain_registry_device_info(self): devices = [] for device_id, device_info in self.device_type_registry.items(): - msg = { - "id": device_id, - **device_info - } + msg = {"id": device_id, **device_info} devices.append(msg) return devices + def obtain_registry_resource_info(self): + resources = [] + for resource_id, resource_info in self.resource_type_registry.items(): + msg = {"id": resource_id, **resource_info} + resources.append(msg) + return resources + # 全局单例实例 lab_registry = Registry() diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index a8a3299a..732e8bbd 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -12,8 +12,14 @@ from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.service import Service from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \ - SerialCommand # type: ignore +from unilabos_msgs.srv import ( + ResourceAdd, + ResourceGet, + ResourceDelete, + ResourceUpdate, + ResourceList, + SerialCommand, +) # type: ignore from unique_identifier_msgs.msg import UUID from unilabos.registry.registry import lab_registry @@ -87,6 +93,7 @@ class HostNode(BaseROS2DeviceNode): self.__class__._instance = self # 初始化配置 + self.server_latest_timestamp = 0.0 # self.devices_config = devices_config self.resources_config = resources_config self.physical_setup_graph = physical_setup_graph @@ -100,16 +107,32 @@ class HostNode(BaseROS2DeviceNode): # 创建设备、动作客户端和目标存储 self.devices_names: Dict[str, str] = {device_id: self.namespace} # 存储设备名称和命名空间的映射 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 - self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射 + self.device_machine_names: Dict[str, str] = { + device_id: "本地", + } # 存储设备ID到机器名称的映射 self._action_clients: Dict[str, ActionClient] = { # 为了方便了解实际的数据类型,host的默认写好 "/devices/host_node/create_resource": ActionClient( - self, lab_registry.ResourceCreateFromOuterEasy, "/devices/host_node/create_resource", callback_group=self.callback_group + self, + lab_registry.ResourceCreateFromOuterEasy, + "/devices/host_node/create_resource", + callback_group=self.callback_group, ), "/devices/host_node/create_resource_detailed": ActionClient( - self, lab_registry.ResourceCreateFromOuter, "/devices/host_node/create_resource_detailed", callback_group=self.callback_group - ) + self, + lab_registry.ResourceCreateFromOuter, + "/devices/host_node/create_resource_detailed", + callback_group=self.callback_group, + ), + "/devices/host_node/test_latency": ActionClient( + self, + lab_registry.EmptyIn, + "/devices/host_node/test_latency", + callback_group=self.callback_group, + ), } # 用来存储多个ActionClient实例 - self._action_value_mappings: Dict[str, Dict] = {} # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系 + self._action_value_mappings: Dict[str, Dict] = ( + {} + ) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系 self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态 self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备 self._last_discovery_time = 0.0 # 上次设备发现的时间 @@ -123,8 +146,11 @@ class HostNode(BaseROS2DeviceNode): self.device_status_timestamps = {} # 用来存储设备状态最后更新时间 from unilabos.app.mq import mqtt_client - for device_config in lab_registry.obtain_registry_device_info(): - mqtt_client.publish_registry(device_config["id"], device_config) + + for device_info in lab_registry.obtain_registry_device_info(): + mqtt_client.publish_registry(device_info["id"], device_info) + for resource_info in lab_registry.obtain_registry_resource_info(): + mqtt_client.publish_registry(resource_info["id"], resource_info) # 首次发现网络中的设备 self._discover_devices() @@ -149,21 +175,20 @@ class HostNode(BaseROS2DeviceNode): ].items(): controller_config["update_rate"] = update_rate self.initialize_controller(controller_id, controller_config) - resources_config.insert(0, { - "id": "host_node", - "name": "host_node", - "parent": None, - "type": "device", - "class": "host_node", - "position": { - "x": 0, - "y": 0, - "z": 0 + resources_config.insert( + 0, + { + "id": "host_node", + "name": "host_node", + "parent": None, + "type": "device", + "class": "host_node", + "position": {"x": 0, "y": 0, "z": 0}, + "config": {}, + "data": {}, + "children": [], }, - "config": {}, - "data": {}, - "children": [] - }) + ) resource_with_parent_name = [] resource_ids_to_instance = {i["id"]: i for i in resources_config} for res in resources_config: @@ -189,6 +214,10 @@ class HostNode(BaseROS2DeviceNode): discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup() ) + # 添加ping-pong相关属性 + self._ping_responses = {} # 存储ping响应 + self._ping_lock = threading.Lock() + self.lab_logger().info("[Host Node] Host node initialized.") HostNode._ready_event.set() @@ -233,7 +262,7 @@ class HostNode(BaseROS2DeviceNode): target=self._send_re_register, args=(sclient,), daemon=True, - name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}", ).start() elif device_key not in self._online_devices: # 设备重新上线 @@ -244,7 +273,7 @@ class HostNode(BaseROS2DeviceNode): target=self._send_re_register, args=(sclient,), daemon=True, - name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}", ).start() # 检测离线设备 @@ -288,7 +317,7 @@ class HostNode(BaseROS2DeviceNode): self, action_type, action_id, callback_group=self.callback_group ) self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}") - action_name = action_id[len(namespace) + 1:] + action_name = action_id[len(namespace) + 1 :] edge_device_id = namespace[9:] # from unilabos.app.mq import mqtt_client # info_with_schema = ros_action_to_json_schema(action_type) @@ -301,54 +330,83 @@ class HostNode(BaseROS2DeviceNode): except Exception as e: self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}") - def create_resource_detailed(self, resources: list["Resource"], device_ids: list[str], bind_parent_ids: list[str], bind_locations: list[Point], other_calling_params: list[str]): - for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip(resources, device_ids, bind_parent_ids, bind_locations, other_calling_params): + def create_resource_detailed( + self, + resources: list["Resource"], + device_ids: list[str], + bind_parent_ids: list[str], + bind_locations: list[Point], + other_calling_params: list[str], + ): + for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip( + resources, device_ids, bind_parent_ids, bind_locations, other_calling_params + ): # 这里要求device_id传入必须是edge_device_id namespace = "/devices/" + device_id srv_address = f"/srv{namespace}/append_resource" sclient = self.create_client(SerialCommand, srv_address) sclient.wait_for_service() request = SerialCommand.Request() - request.command = json.dumps({ - "resource": resource, # 单个/单组 可为 list[list[Resource]] - "namespace": namespace, - "edge_device_id": device_id, - "bind_parent_id": bind_parent_id, - "bind_location": { - "x": bind_location.x, - "y": bind_location.y, - "z": bind_location.z, + request.command = json.dumps( + { + "resource": resource, # 单个/单组 可为 list[list[Resource]] + "namespace": namespace, + "edge_device_id": device_id, + "bind_parent_id": bind_parent_id, + "bind_location": { + "x": bind_location.x, + "y": bind_location.y, + "z": bind_location.z, + }, + "other_calling_param": json.loads(other_calling_param) if other_calling_param else {}, }, - "other_calling_param": json.loads(other_calling_param) if other_calling_param else {}, - }, ensure_ascii=False) + ensure_ascii=False, + ) response = sclient.call(request) pass pass - def create_resource(self, device_id: str, res_id: str, class_name: str, parent: str, bind_locations: Point, liquid_input_slot: list[int], liquid_type: list[str], liquid_volume: list[int], slot_on_deck: int): - init_new_res = initialize_resource({ - "name": res_id, - "class": class_name, - "parent": parent, - "position": { - "x": bind_locations.x, - "y": bind_locations.y, - "z": bind_locations.z, + def create_resource( + self, + device_id: str, + res_id: str, + class_name: str, + parent: str, + bind_locations: Point, + liquid_input_slot: list[int], + liquid_type: list[str], + liquid_volume: list[int], + slot_on_deck: int, + ): + init_new_res = initialize_resource( + { + "name": res_id, + "class": class_name, + "parent": parent, + "position": { + "x": bind_locations.x, + "y": bind_locations.y, + "z": bind_locations.z, + }, } - }) # flatten的格式 - resources = [init_new_res] - device_id = [device_id] + ) # flatten的格式 + resources = init_new_res # initialize_resource已经返回list[dict] + device_ids = [device_id] bind_parent_id = [parent] bind_location = [bind_locations] - other_calling_param = [json.dumps({ - "ADD_LIQUID_TYPE": liquid_type, - "LIQUID_VOLUME": liquid_volume, - "LIQUID_INPUT_SLOT": liquid_input_slot, - "initialize_full": False, - "slot": slot_on_deck - })] + other_calling_param = [ + json.dumps( + { + "ADD_LIQUID_TYPE": liquid_type, + "LIQUID_VOLUME": liquid_volume, + "LIQUID_INPUT_SLOT": liquid_input_slot, + "initialize_full": False, + "slot": slot_on_deck, + } + ) + ] - return self.create_resource_detailed(resources, device_id, bind_parent_id, bind_location, other_calling_param) + return self.create_resource_detailed(resources, device_ids, bind_parent_id, bind_location, other_calling_param) def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None: """ @@ -377,7 +435,9 @@ class HostNode(BaseROS2DeviceNode): if action_id not in self._action_clients: action_type = action_value_mapping["type"] self._action_clients[action_id] = ActionClient(self, action_type, action_id) - self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的 + self.lab_logger().debug( + f"[Host Node] Created ActionClient (Local): {action_id}" + ) # 子设备再创建用的是Discover发现的 # from unilabos.app.mq import mqtt_client # info_with_schema = ros_action_to_json_schema(action_type) # mqtt_client.publish_actions(action_name, { @@ -477,7 +537,12 @@ class HostNode(BaseROS2DeviceNode): ) def send_goal( - self, device_id: str, action_name: str, action_kwargs: Dict[str, Any], goal_uuid: Optional[str] = None + self, + device_id: str, + action_name: str, + action_kwargs: Dict[str, Any], + goal_uuid: Optional[str] = None, + server_info: Optional[Dict[str, Any]] = None, ) -> None: """ 向设备发送目标请求 @@ -489,6 +554,8 @@ class HostNode(BaseROS2DeviceNode): goal_uuid: 目标UUID,如果为None则自动生成 """ action_id = f"/devices/{device_id}/{action_name}" + if action_name == "test_latency" and server_info is not None: + self.server_latest_timestamp = server_info.get("send_timestamp", 0.0) if action_id not in self._action_clients: self.lab_logger().error(f"[Host Node] ActionClient {action_id} not found.") return @@ -783,3 +850,148 @@ class HostNode(BaseROS2DeviceNode): # 这里可以实现返回资源列表的逻辑 self.lab_logger().debug(f"[Host Node-Resource] List parameters: {request}") return response + + def test_latency(self): + """ + 测试网络延迟的action实现 + 通过5次ping-pong机制校对时间误差并计算实际延迟 + """ + import time + import uuid as uuid_module + + self.lab_logger().info("=" * 60) + self.lab_logger().info("开始网络延迟测试...") + + # 记录任务开始执行的时间 + task_start_time = time.time() + + # 进行5次ping-pong测试 + ping_results = [] + + for i in range(5): + self.lab_logger().info(f"第{i+1}/5次ping-pong测试...") + + # 生成唯一的ping ID + ping_id = str(uuid_module.uuid4()) + + # 记录发送时间 + send_timestamp = time.time() + + # 发送ping + from unilabos.app.mq import mqtt_client + + mqtt_client.send_ping(ping_id, send_timestamp) + + # 等待pong响应 + timeout = 10.0 + start_wait_time = time.time() + + while time.time() - start_wait_time < timeout: + with self._ping_lock: + if ping_id in self._ping_responses: + pong_data = self._ping_responses.pop(ping_id) + break + time.sleep(0.001) + else: + self.lab_logger().error(f"❌ 第{i+1}次测试超时") + continue + + # 计算本次测试结果 + receive_timestamp = time.time() + client_timestamp = pong_data["client_timestamp"] + server_timestamp = pong_data["server_timestamp"] + + # 往返时间 + rtt_ms = (receive_timestamp - send_timestamp) * 1000 + + # 客户端与服务端时间差(客户端时间 - 服务端时间) + # 假设网络延迟对称,取中间点的服务端时间 + mid_point_time = send_timestamp + (receive_timestamp - send_timestamp) / 2 + time_diff_ms = (mid_point_time - server_timestamp) * 1000 + + ping_results.append({"rtt_ms": rtt_ms, "time_diff_ms": time_diff_ms}) + + self.lab_logger().info(f"✅ 第{i+1}次: 往返时间={rtt_ms:.2f}ms, 时间差={time_diff_ms:.2f}ms") + + time.sleep(0.1) + + if not ping_results: + self.lab_logger().error("❌ 所有ping-pong测试都失败了") + return {"status": "all_timeout"} + + # 统计分析 + rtts = [r["rtt_ms"] for r in ping_results] + time_diffs = [r["time_diff_ms"] for r in ping_results] + + avg_rtt_ms = sum(rtts) / len(rtts) + avg_time_diff_ms = sum(time_diffs) / len(time_diffs) + max_time_diff_error_ms = max(abs(min(time_diffs)), abs(max(time_diffs))) + + self.lab_logger().info("-" * 50) + self.lab_logger().info("[测试统计]") + self.lab_logger().info(f"有效测试次数: {len(ping_results)}/5") + self.lab_logger().info(f"平均往返时间: {avg_rtt_ms:.2f}ms") + self.lab_logger().info(f"平均时间差: {avg_time_diff_ms:.2f}ms") + self.lab_logger().info(f"时间差范围: {min(time_diffs):.2f}ms ~ {max(time_diffs):.2f}ms") + self.lab_logger().info(f"最大时间误差: ±{max_time_diff_error_ms:.2f}ms") + + # 计算任务执行延迟 + if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0: + self.lab_logger().info("-" * 50) + self.lab_logger().info("[任务执行延迟分析]") + self.lab_logger().info(f"服务端任务下发时间: {self.server_latest_timestamp:.6f}") + self.lab_logger().info(f"客户端任务开始时间: {task_start_time:.6f}") + + # 原始时间差(不考虑时间同步误差) + raw_delay_ms = (task_start_time - self.server_latest_timestamp) * 1000 + + # 考虑时间同步误差后的延迟(用平均时间差校正) + corrected_delay_ms = raw_delay_ms - avg_time_diff_ms + + self.lab_logger().info(f"📊 原始时间差: {raw_delay_ms:.2f}ms") + self.lab_logger().info(f"🔧 时间同步校正: {avg_time_diff_ms:.2f}ms") + self.lab_logger().info(f"⏰ 实际任务延迟: {corrected_delay_ms:.2f}ms") + self.lab_logger().info(f"📏 误差范围: ±{max_time_diff_error_ms:.2f}ms") + + # 给出延迟范围 + min_delay = corrected_delay_ms - max_time_diff_error_ms + max_delay = corrected_delay_ms + max_time_diff_error_ms + self.lab_logger().info(f"📋 延迟范围: {min_delay:.2f}ms ~ {max_delay:.2f}ms") + + else: + self.lab_logger().warning("⚠️ 无法获取服务端任务下发时间,跳过任务延迟分析") + corrected_delay_ms = -1 + + self.lab_logger().info("=" * 60) + + return { + "avg_rtt_ms": avg_rtt_ms, + "avg_time_diff_ms": avg_time_diff_ms, + "max_time_error_ms": max_time_diff_error_ms, + "task_delay_ms": corrected_delay_ms if corrected_delay_ms > 0 else -1, + "raw_delay_ms": ( + raw_delay_ms if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0 else -1 + ), + "test_count": len(ping_results), + "status": "success", + } + + def handle_pong_response(self, pong_data: dict): + """ + 处理pong响应 + """ + ping_id = pong_data.get("ping_id") + if ping_id: + with self._ping_lock: + self._ping_responses[ping_id] = pong_data + + # 详细信息合并为一条日志 + client_timestamp = pong_data.get("client_timestamp", 0) + server_timestamp = pong_data.get("server_timestamp", 0) + current_time = time.time() + + self.lab_logger().debug( + f"📨 Pong | ID:{ping_id[:8]}.. | C→S→C: {client_timestamp:.3f}→{server_timestamp:.3f}→{current_time:.3f}" + ) + else: + self.lab_logger().warning("⚠️ 收到无效的Pong响应(缺少ping_id)") diff --git a/unilabos_msgs/action/LiquidHandlerTransfer.action b/unilabos_msgs/action/LiquidHandlerTransfer.action index d815a6cb..39df59bb 100644 --- a/unilabos_msgs/action/LiquidHandlerTransfer.action +++ b/unilabos_msgs/action/LiquidHandlerTransfer.action @@ -1,4 +1,3 @@ -# Bio float64[] asp_vols float64[] dis_vols Resource[] sources