From 01ac3415ae907eff6e681f69a05be3c2b0a72d73 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 1 May 2025 14:58:36 +0800 Subject: [PATCH] Closes #3. Closes #12. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #3. Closes #12. * Update README and MQTTClient for installation instructions and code improvements * feat: 支持local_config启动 add: 增加对crt path的说明,为传入config.py的相对路径 move: web component * add: registry description * feat: node_info_update srv fix: OTDeck cant create * close #12 feat: slave node registry * feat: show machine name fix: host node registry not uploaded * feat: add hplc registry * feat: add hplc registry * fix: hplc status typo * fix: devices/ * fix: device.class possible null * fix: HPLC additions with online service * fix: slave mode spin not working * fix: slave mode spin not working * feat: 多ProtocolNode 允许子设备ID相同 feat: 上报发现的ActionClient feat: Host重启动,通过discover机制要求slaveNode重新注册,实现信息及时上报 --------- Co-authored-by: Harvey Que --- test/experiments/HPLC.json | 6 +- unilabos/app/main.py | 17 ++- unilabos/app/mq.py | 9 +- unilabos/app/web/pages.py | 14 +-- unilabos/app/web/templates/status.html | 22 +++- unilabos/app/web/utils/host_utils.py | 13 +- unilabos/app/web/utils/ros_utils.py | 14 ++- unilabos/config/config.py | 1 + .../devices/characterization_optic.yaml | 47 +++++++- unilabos/registry/registry.py | 15 ++- unilabos/resources/graphio.py | 7 +- unilabos/ros/main_slave_run.py | 68 +++++------ unilabos/ros/nodes/base_device_node.py | 58 +++++++-- unilabos/ros/nodes/presets/host_node.py | 112 +++++++++++++++--- unilabos/ros/utils/driver_creator.py | 3 +- 15 files changed, 300 insertions(+), 106 deletions(-) diff --git a/test/experiments/HPLC.json b/test/experiments/HPLC.json index 9e511b3c..6d866a9a 100644 --- a/test/experiments/HPLC.json +++ b/test/experiments/HPLC.json @@ -5,7 +5,7 @@ "name": "HPLC", "parent": null, "type": "device", - "class": "hplc", + "class": "hplc.agilent", "position": { "x": 620.6111111111111, "y": 171, @@ -19,8 +19,8 @@ }, { "id": "BottlesRack3", - "name": "Revvity上样盘3", - "parent": "Revvity", + "name": "上样盘3", + "parent": "HPLC", "type": "plate", "class": null, "position": { diff --git a/unilabos/app/main.py b/unilabos/app/main.py index d418c5c1..f75a3295 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -58,6 +58,18 @@ def parse_args(): default=None, help="配置文件路径,支持.py格式的Python配置文件", ) + parser.add_argument( + "--port", + type=int, + default=8002, + help="信息页web服务的启动端口", + ) + parser.add_argument( + "--open_browser", + type=bool, + default=True, + help="是否在启动时打开信息页", + ) return parser.parse_args() @@ -84,6 +96,9 @@ def main(): # 设置BasicConfig参数 BasicConfig.is_host_mode = not args_dict.get("without_host", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) + machine_name = os.popen("hostname").read().strip() + machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) + BasicConfig.machine_name = machine_name from unilabos.resources.graphio import ( read_node_link_json, @@ -151,7 +166,7 @@ def main(): mqtt_client.start() start_backend(**args_dict) - start_server() + start_server(port=args_dict.get("port", 8002), open_browser=args_dict.get("open_browser", False)) if __name__ == "__main__": diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index 8ed35f95..a6123fb2 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -146,7 +146,7 @@ class MQTTClient: if self.mqtt_disable: return status = {"data": device_status.get(device_id, {}), "device_id": device_id} - address = f"labs/{MQConfig.lab_id}/devices" + address = f"labs/{MQConfig.lab_id}/devices/" self.client.publish(address, json.dumps(status), qos=2) logger.critical(f"Device status published: address: {address}, {status}") @@ -168,11 +168,8 @@ class MQTTClient: if self.mqtt_disable: return address = f"labs/{MQConfig.lab_id}/actions/" - action_type_name = action_info["title"] - action_info["title"] = action_id - action_data = json.dumps({action_type_name: action_info}, ensure_ascii=False) - self.client.publish(address, action_data, qos=2) - logger.debug(f"Action data published: address: {address}, {action_data}") + self.client.publish(address, json.dumps(action_info), qos=2) + logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}") mqtt_client = MQTTClient() diff --git a/unilabos/app/web/pages.py b/unilabos/app/web/pages.py index 7216a66c..a08cebb5 100644 --- a/unilabos/app/web/pages.py +++ b/unilabos/app/web/pages.py @@ -92,19 +92,7 @@ def setup_web_pages(router: APIRouter) -> None: # 获取已加载的设备 if lab_registry: - # 设备类型 - for device_id, device_info in lab_registry.device_type_registry.items(): - msg = { - "id": device_id, - "name": device_info.get("name", "未命名"), - "file_path": device_info.get("file_path", ""), - "class_json": json.dumps( - device_info.get("class", {}), indent=4, ensure_ascii=False, cls=TypeEncoder - ), - } - mqtt_client.publish_registry(device_id, device_info) - devices.append(msg) - + devices = json.loads(json.dumps(lab_registry.obtain_registry_device_info(), ensure_ascii=False, cls=TypeEncoder)) # 资源类型 for resource_id, resource_info in lab_registry.resource_type_registry.items(): resources.append( diff --git a/unilabos/app/web/templates/status.html b/unilabos/app/web/templates/status.html index 30c3e6b8..e1105b7b 100644 --- a/unilabos/app/web/templates/status.html +++ b/unilabos/app/web/templates/status.html @@ -96,17 +96,19 @@ 设备ID 命名空间 + 机器名称 状态 {% for device_id, device_info in host_node_info.devices.items() %} {{ device_id }} {{ device_info.namespace }} + {{ device_info.machine_name }} {{ "在线" if device_info.is_online else "离线" }} {% else %} - 没有发现已管理的设备 + 没有发现已管理的设备 {% endfor %} @@ -218,6 +220,7 @@ Device ID 节点名称 命名空间 + 机器名称 状态项 动作数 @@ -227,6 +230,7 @@ {{ device_id }} {{ device_info.node_name }} {{ device_info.namespace }} + {{ device_info.machine_name|default("本地") }} {{ ros_node_info.device_topics.get(device_id, {})|length }} {{ ros_node_info.device_actions.get(device_id, {})|length }} @@ -329,8 +333,13 @@
-
{{ device.class_json }}
- + {% if device.class %} +
{{ device.class | tojson(indent=4) }}
+ {% else %} + +
// No data
+ {% endif %} + {% if device.is_online %}
在线
{% endif %} @@ -362,7 +371,12 @@
diff --git a/unilabos/app/web/utils/host_utils.py b/unilabos/app/web/utils/host_utils.py index 0df5e816..a9070486 100644 --- a/unilabos/app/web/utils/host_utils.py +++ b/unilabos/app/web/utils/host_utils.py @@ -30,20 +30,19 @@ def get_host_node_info() -> Dict[str, Any]: return host_info host_info["available"] = True host_info["devices"] = { - device_id: { + edge_device_id: { "namespace": namespace, - "is_online": f"{namespace}/{device_id}" in host_node._online_devices, - "key": f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}", + "is_online": f"{namespace}/{edge_device_id}" in host_node._online_devices, + "key": f"{namespace}/{edge_device_id}" if namespace.startswith("/") else f"/{namespace}/{edge_device_id}", + "machine_name": host_node.device_machine_names.get(edge_device_id, "未知"), } - for device_id, namespace in host_node.devices_names.items() + for edge_device_id, namespace in host_node.devices_names.items() } # 获取已订阅的主题 host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics)) # 获取动作客户端信息 for action_id, client in host_node._action_clients.items(): - host_info["action_clients"] = { - action_id: get_action_info(client, full_name=action_id) - } + host_info["action_clients"] = {action_id: get_action_info(client, full_name=action_id)} # 获取设备状态 host_info["device_status"] = host_node.device_status diff --git a/unilabos/app/web/utils/ros_utils.py b/unilabos/app/web/utils/ros_utils.py index 01733510..8e1c767c 100644 --- a/unilabos/app/web/utils/ros_utils.py +++ b/unilabos/app/web/utils/ros_utils.py @@ -12,6 +12,7 @@ from unilabos.app.web.utils.action_utils import get_action_info # 存储 ROS 节点信息的全局变量 ros_node_info = {"online_devices": {}, "device_topics": {}, "device_actions": {}} + def get_ros_node_info() -> Dict[str, Any]: """获取 ROS 节点信息,包括设备节点、发布的状态和动作 @@ -35,6 +36,13 @@ def update_ros_node_info() -> Dict[str, Any]: try: from unilabos.ros.nodes.base_device_node import registered_devices + from unilabos.ros.nodes.presets.host_node import HostNode + + # 尝试获取主机节点实例 + host_node = HostNode.get_instance(0) + device_machine_names = {} + if host_node: + device_machine_names = host_node.device_machine_names for device_id, device_info in registered_devices.items(): # 设备基本信息 @@ -42,6 +50,7 @@ def update_ros_node_info() -> Dict[str, Any]: "node_name": device_info["node_name"], "namespace": device_info["namespace"], "uuid": device_info["uuid"], + "machine_name": device_machine_names.get(device_id, "本地"), } # 设备话题(状态)信息 @@ -55,10 +64,7 @@ def update_ros_node_info() -> Dict[str, Any]: } # 设备动作信息 - result["device_actions"][device_id] = { - k: get_action_info(v, k) - for k, v in device_info["actions"].items() - } + result["device_actions"][device_id] = {k: get_action_info(v, k) for k, v in device_info["actions"].items()} # 更新全局变量 ros_node_info = result except Exception as e: diff --git a/unilabos/config/config.py b/unilabos/config/config.py index 5c1f7b9c..3f3d8dd7 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -12,6 +12,7 @@ class BasicConfig: config_path = "" is_host_mode = True # 从registry.py移动过来 slave_no_host = False # 是否跳过rclient.wait_for_service() + machine_name = "undefined" # MQTT配置 diff --git a/unilabos/registry/devices/characterization_optic.yaml b/unilabos/registry/devices/characterization_optic.yaml index 0164ae4a..52ffc87a 100644 --- a/unilabos/registry/devices/characterization_optic.yaml +++ b/unilabos/registry/devices/characterization_optic.yaml @@ -19,6 +19,49 @@ raman_home_made: status: type: string required: - - status + - status additionalProperties: false - type: object \ No newline at end of file + type: object +hplc.agilent: + description: HPLC device + class: + module: unilabos.devices.hplc.AgilentHPLC:HPLCDriver + type: python + status_types: + device_status: String + could_run: Bool + driver_init_ok: Bool + is_running: Bool + finish_status: String + status_text: String + action_value_mappings: + execute_command_from_outer: + type: SendCmd + goal: + command: command + feedback: {} + result: + success: success + schema: + properties: + device_status: + type: string + could_run: + type: boolean + driver_init_ok: + type: boolean + is_running: + type: boolean + finish_status: + type: string + status_text: + type: string + required: + - device_status + - could_run + - driver_init_ok + - is_running + - finish_status + - status_text + additionalProperties: false + type: object diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 9c7a95b9..feba3fef 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -1,3 +1,4 @@ +import json import os import sys from pathlib import Path @@ -6,8 +7,9 @@ from typing import Any import yaml from unilabos.utils import logger -from unilabos.ros.msgs.message_converter import msg_converter_manager +from unilabos.ros.msgs.message_converter import msg_converter_manager, ros_action_to_json_schema from unilabos.utils.decorator import singleton +from unilabos.utils.type_check import TypeEncoder DEFAULT_PATHS = [Path(__file__).absolute().parent] @@ -129,6 +131,7 @@ class Registry: action_config["type"] = self._replace_type_with_class( action_config["type"], device_id, f"动作 {action_name}" ) + action_config["schema"] = ros_action_to_json_schema(action_config["type"]) self.device_type_registry.update(data) @@ -143,6 +146,16 @@ class Registry: f"[UniLab Registry] Device File-{i+1}/{len(files)} Not Valid YAML File: {file.absolute()}" ) + def obtain_registry_device_info(self): + devices = [] + for device_id, device_info in self.device_type_registry.items(): + msg = { + "id": device_id, + **device_info + } + devices.append(msg) + return devices + # 全局单例实例 lab_registry = Registry() diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index e5c16bc4..71e4aaeb 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -1,4 +1,5 @@ import importlib +import inspect import json from typing import Union import numpy as np @@ -384,7 +385,11 @@ def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR": d = resource_ulab_to_plr_inner(resource) """无法通过Resource进行反序列化,例如TipSpot必须内部序列化好,直接用TipSpot序列化会多参数,导致出错""" from pylabrobot.utils.object_parsing import find_subclass - resource_plr = find_subclass(d["type"], ResourcePLR).deserialize(d, allow_marshal=True) + sub_cls = find_subclass(d["type"], ResourcePLR) + spect = inspect.signature(sub_cls) + if "category" not in spect.parameters: + d.pop("category") + resource_plr = sub_cls.deserialize(d, allow_marshal=True) resource_plr.load_all_state(all_states) return resource_plr diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index 1ae37906..9ac96748 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -1,22 +1,25 @@ +import copy +import json import os -import traceback +import threading from typing import Optional, Dict, Any, List import rclpy from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceAdd # type: ignore +from unilabos_msgs.srv import ResourceAdd, SerialCommand # type: ignore from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node from rclpy.timer import Timer +from unilabos.registry.registry import lab_registry from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( convert_to_ros_msg, ) from unilabos.ros.nodes.presets.host_node import HostNode -from unilabos.ros.x.rclpyx import run_event_loop_in_thread from unilabos.utils import logger from unilabos.config.config import BasicConfig +from unilabos.utils.type_check import TypeEncoder def exit() -> None: @@ -59,16 +62,11 @@ def main( discovery_interval, ) - executor.add_node(host_node) - # run_event_loop_in_thread() + thread = threading.Thread(target=executor.spin, daemon=True, name="host_executor_thread") + thread.start() - try: - executor.spin() - except Exception as e: - logger.error(traceback.format_exc()) - print(f"Exception caught: {e}") - finally: - exit() + while True: + input() def slave( @@ -82,7 +80,7 @@ def slave( """从节点函数""" rclpy.init(args=args) rclpy.__executor = executor = MultiThreadedExecutor() - + devices_config_copy = copy.deepcopy(devices_config) for device_id, device_config in devices_config.items(): d = initialize_device_from_dict(device_id, device_config) if d is None: @@ -93,32 +91,36 @@ def slave( # else: # print(f"Warning: Device {device_id} could not be initialized or is not a valid Node") - machine_name = os.popen("hostname").read().strip() - machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) - n = Node(f"slaveMachine_{machine_name}", parameter_overrides=[]) + n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[]) executor.add_node(n) - if BasicConfig.slave_no_host: - # 确保ResourceAdd存在 - if "ResourceAdd" in globals(): - rclient = n.create_client(ResourceAdd, "/resources/add") - rclient.wait_for_service() # FIXME 可能一直等待,加一个参数 + thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread") + thread.start() - request = ResourceAdd.Request() - request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources_config] - response = rclient.call_async(request) - else: - print("Warning: ResourceAdd service not available") + if not BasicConfig.slave_no_host: + sclient = n.create_client(SerialCommand, "/node_info_update") + sclient.wait_for_service() - run_event_loop_in_thread() + request = SerialCommand.Request() + request.command = json.dumps({ + "machine_name": BasicConfig.machine_name, + "type": "slave", + "devices_config": devices_config_copy, + "registry_config": lab_registry.obtain_registry_device_info() + }, ensure_ascii=False, cls=TypeEncoder) + response = sclient.call_async(request).result() + logger.info(f"Slave node info updated.") - try: - executor.spin() - except Exception as e: - print(f"Exception caught: {e}") - finally: - exit() + rclient = n.create_client(ResourceAdd, "/resources/add") + rclient.wait_for_service() # FIXME 可能一直等待,加一个参数 + request = ResourceAdd.Request() + request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources_config] + response = rclient.call_async(request).result() + logger.info(f"Slave resource added.") + + while True: + input() if __name__ == "__main__": main() diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 0ff03a68..7e032064 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -1,3 +1,4 @@ +import json import threading import time import traceback @@ -13,15 +14,17 @@ from rclpy.action import ActionServer from rclpy.action.server import ServerGoalHandle from rclpy.client import Client from rclpy.callback_groups import ReentrantCallbackGroup +from rclpy.service import Service from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type from unilabos.ros.msgs.message_converter import ( convert_to_ros_msg, convert_from_ros_msg, convert_from_ros_msg_with_mapping, - convert_to_ros_msg_with_mapping, + convert_to_ros_msg_with_mapping, ros_action_to_json_schema, ) -from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore +from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \ + SerialCommand # type: ignore from unilabos_msgs.msg import Resource # type: ignore from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker @@ -29,7 +32,7 @@ from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.utils.async_util import run_async_func from unilabos.utils.log import info, debug, warning, error, critical, logger -from unilabos.utils.type_check import get_type_class +from unilabos.utils.type_check import get_type_class, TypeEncoder T = TypeVar("T") @@ -44,19 +47,17 @@ class ROSLoggerAdapter: @property def identifier(self): - return f"{self.namespace}/{self.node_name}" + return f"{self.namespace}" - def __init__(self, ros_logger, node_name, namespace): + def __init__(self, ros_logger, namespace): """ 初始化日志适配器 Args: ros_logger: ROS2日志记录器 - node_name: 节点名称 namespace: 命名空间 """ self.ros_logger = ros_logger - self.node_name = node_name self.namespace = namespace self.level_2_logger_func = { "info": info, @@ -258,9 +259,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().critical("资源跟踪器未初始化,请检查") # 创建自定义日志记录器 - self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.node_name, self.namespace) + self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.namespace) - self._action_servers = {} + self._action_servers: Dict[str, ActionServer] = {} self._property_publishers = {} self._status_types = status_types self._action_value_mappings = action_value_mappings @@ -284,7 +285,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.create_ros_action_server(action_name, action_value_mapping) # 创建线程池执行器 - self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1)) + self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}") # 创建资源管理客户端 self._resource_clients: Dict[str, Client] = { @@ -295,6 +296,18 @@ class BaseROS2DeviceNode(Node, Generic[T]): "resource_list": self.create_client(ResourceList, "/resources/list"), } + def query_host_name_cb(req, res): + self.register_device() + self.lab_logger().info("Host要求重新注册当前节点") + res.response = "" + return res + + self._service_server: Dict[str, Service] = { + "query_host_name": self.create_service( + SerialCommand, f"/srv{self.namespace}/query_host_name", query_host_name_cb, callback_group=self.callback_group + ), + } + # 向全局在线设备注册表添加设备信息 self.register_device() rclpy.get_global_executor().add_node(self) @@ -318,6 +331,31 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) # 加入全局注册表 registered_devices[self.device_id] = device_info + from unilabos.config.config import BasicConfig + if not BasicConfig.is_host_mode: + sclient = self.create_client(SerialCommand, "/node_info_update") + # 启动线程执行发送任务 + threading.Thread( + target=self.send_slave_node_info, + args=(sclient,), + daemon=True, + name=f"ROSDevice{self.device_id}_send_slave_node_info" + ).start() + + def send_slave_node_info(self, sclient): + sclient.wait_for_service() + request = SerialCommand.Request() + from unilabos.config.config import BasicConfig + request.command = json.dumps({ + "SYNC_SLAVE_NODE_INFO": { + "machine_name": BasicConfig.machine_name, + "type": "slave", + "edge_device_id": self.device_id + }}, ensure_ascii=False, cls=TypeEncoder) + + # 发送异步请求并等待结果 + future = sclient.call_async(request) + response = future.result() def lab_logger(self): """ diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 34132b35..5a739773 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -1,4 +1,5 @@ import copy +import json import threading import time import traceback @@ -7,12 +8,13 @@ from typing import Optional, Dict, Any, List, ClassVar, Set from action_msgs.msg import GoalStatus from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore +from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, SerialCommand # type: ignore from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.service import Service from unique_identifier_msgs.msg import UUID +from unilabos.registry.registry import lab_registry from unilabos.resources.registry import add_schema from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( @@ -20,10 +22,12 @@ from unilabos.ros.msgs.message_converter import ( get_ros_type_by_msgname, convert_from_ros_msg, convert_to_ros_msg, - msg_converter_manager, ros_action_to_json_schema, + msg_converter_manager, + ros_action_to_json_schema, ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode +from unilabos.utils.type_check import TypeEncoder class HostNode(BaseROS2DeviceNode): @@ -95,6 +99,7 @@ class HostNode(BaseROS2DeviceNode): # 创建设备、动作客户端和目标存储 self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 + self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射 self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例 self._action_value_mappings: Dict[str, Dict] = ( {} @@ -106,18 +111,24 @@ class HostNode(BaseROS2DeviceNode): self._subscribed_topics = set() # 用于跟踪已订阅的话题 # 创建物料增删改查服务(非客户端) - self._init_resource_service() + self._init_host_service() self.device_status = {} # 用来存储设备状态 self.device_status_timestamps = {} # 用来存储设备状态最后更新时间 + from unilabos.app.mq import mqtt_client + for device_config in lab_registry.obtain_registry_device_info(): + mqtt_client.publish_registry(device_config["id"], device_config) + # 首次发现网络中的设备 self._discover_devices() # 初始化所有本机设备节点,多一次过滤,防止重复初始化 for device_id, device_config in devices_config.items(): if device_config.get("type", "device") != "device": - self.lab_logger().debug(f"[Host Node] Skipping type {device_config['type']} {device_id} already existed, skipping.") + self.lab_logger().debug( + f"[Host Node] Skipping type {device_config['type']} {device_id} already existed, skipping." + ) continue if device_id not in self.devices_names: self.initialize_device(device_id, device_config) @@ -150,6 +161,13 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info("[Host Node] Host node initialized.") HostNode._ready_event.set() + def _send_re_register(self, sclient): + sclient.wait_for_service() + request = SerialCommand.Request() + request.command = "" + future = sclient.call_async(request) + response = future.result() + def _discover_devices(self) -> None: """ 发现网络中的设备 @@ -166,23 +184,37 @@ class HostNode(BaseROS2DeviceNode): current_devices = set() for device_id, namespace in nodes_and_names: - if not namespace.startswith("/devices"): + if not namespace.startswith("/devices/"): continue - + edge_device_id = namespace[9:] # 将设备添加到当前设备集合 - device_key = f"{namespace}/{device_id}" + device_key = f"{namespace}/{edge_device_id}" # namespace已经包含device_id了,这里复写一遍 current_devices.add(device_key) # 如果是新设备,记录并创建ActionClient - if device_id not in self.devices_names: + if edge_device_id not in self.devices_names: self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}") - self.devices_names[device_id] = namespace + self.devices_names[edge_device_id] = namespace self._create_action_clients_for_device(device_id, namespace) self._online_devices.add(device_key) + sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name") + threading.Thread( + target=self._send_re_register, + args=(sclient,), + daemon=True, + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + ).start() elif device_key not in self._online_devices: # 设备重新上线 self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}") self._online_devices.add(device_key) + sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name") + threading.Thread( + target=self._send_re_register, + args=(sclient,), + daemon=True, + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + ).start() # 检测离线设备 offline_devices = self._online_devices - current_devices @@ -224,16 +256,22 @@ class HostNode(BaseROS2DeviceNode): self._action_clients[action_id] = ActionClient( self, action_type, action_id, callback_group=self.callback_group ) - self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") + self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}") + action_name = action_id[len(namespace) + 1:] + edge_device_id = namespace[9:] from unilabos.app.mq import mqtt_client info_with_schema = ros_action_to_json_schema(action_type) - mqtt_client.publish_actions(action_id, info_with_schema) + mqtt_client.publish_actions(action_name, { + "device_id": edge_device_id, + "action_name": action_name, + "schema": info_with_schema, + }) except Exception as e: self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}") def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None: """ - 根据配置初始化设备 + 根据配置初始化设备, 此函数根据提供的设备配置动态导入适当的设备类并创建其实例。 同时为设备的动作值映射设置动作客户端。 @@ -249,7 +287,8 @@ class HostNode(BaseROS2DeviceNode): if d is None: return # noinspection PyProtectedMember - self.devices_names[device_id] = d._ros_node.namespace + self.devices_names[device_id] = d._ros_node.namespace # 这里不涉及二级device_id + self.device_machine_names[device_id] = "本地" self.devices_instances[device_id] = d # noinspection PyProtectedMember for action_name, action_value_mapping in d._ros_node._action_value_mappings.items(): @@ -257,13 +296,17 @@ class HostNode(BaseROS2DeviceNode): if action_id not in self._action_clients: action_type = action_value_mapping["type"] self._action_clients[action_id] = ActionClient(self, action_type, action_id) - self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") + self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的 from unilabos.app.mq import mqtt_client info_with_schema = ros_action_to_json_schema(action_type) - mqtt_client.publish_actions(action_id, info_with_schema) + mqtt_client.publish_actions(action_name, { + "device_id": device_id, + "action_name": action_name, + "schema": info_with_schema, + }) else: self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.") - device_key = f"{self.devices_names[device_id]}/{device_id}" + device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id # 添加到在线设备列表 self._online_devices.add(device_key) @@ -285,8 +328,8 @@ class HostNode(BaseROS2DeviceNode): # 解析设备名和属性名 parts = topic.split("/") - if len(parts) >= 4: - device_id = parts[-2] + if len(parts) >= 4: # 可能有ProtocolNode,创建更长的设备 + device_id = "/".join(parts[2:-1]) property_name = parts[-1] # 初始化设备状态字典 @@ -473,7 +516,7 @@ class HostNode(BaseROS2DeviceNode): """Resource""" - def _init_resource_service(self): + def _init_host_service(self): self._resource_services: Dict[str, Service] = { "resource_add": self.create_service( ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=ReentrantCallbackGroup() @@ -496,8 +539,39 @@ class HostNode(BaseROS2DeviceNode): "resource_list": self.create_service( ResourceList, "/resources/list", self._resource_list_callback, callback_group=ReentrantCallbackGroup() ), + "node_info_update": self.create_service( + SerialCommand, + "/node_info_update", + self._node_info_update_callback, + callback_group=ReentrantCallbackGroup(), + ), } + def _node_info_update_callback(self, request, response): + """ + 更新节点信息回调 + """ + self.lab_logger().info(f"[Host Node] Node info update request received: {request}") + try: + from unilabos.app.mq import mqtt_client + + info = json.loads(request.command) + if "SYNC_SLAVE_NODE_INFO" in info: + info = info["SYNC_SLAVE_NODE_INFO"] + machine_name = info["machine_name"] + edge_device_id = info["edge_device_id"] + self.device_machine_names[edge_device_id] = machine_name + else: + registry_config = info["registry_config"] + for device_config in registry_config: + mqtt_client.publish_registry(device_config["id"], device_config) + self.lab_logger().debug(f"[Host Node] Node info update: {info}") + response.response = "OK" + except Exception as e: + self.lab_logger().error(f"[Host Node] Error updating node info: {e.args}") + response.response = "ERROR" + return response + def _resource_add_callback(self, request, response): """ 添加资源回调 diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index b2402b5e..2ea30856 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -224,8 +224,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): if hasattr(self.device_instance, "setup") and asyncio.iscoroutinefunction(getattr(self.device_instance, "setup")): from unilabos.ros.nodes.base_device_node import ROS2DeviceNode ROS2DeviceNode.run_async_func(getattr(self.device_instance, "setup")).add_done_callback(lambda x: logger.debug(f"PyLabRobot设备实例 {self.device_instance} 设置完成")) -# 2486229810384 -#2486232539792 + class ProtocolNodeCreator(DeviceClassCreator[T]): """