diff --git a/test/experiments/HPLC.json b/test/experiments/HPLC.json
index 9e511b3c..6d866a9a 100644
--- a/test/experiments/HPLC.json
+++ b/test/experiments/HPLC.json
@@ -5,7 +5,7 @@
"name": "HPLC",
"parent": null,
"type": "device",
- "class": "hplc",
+ "class": "hplc.agilent",
"position": {
"x": 620.6111111111111,
"y": 171,
@@ -19,8 +19,8 @@
},
{
"id": "BottlesRack3",
- "name": "Revvity上样盘3",
- "parent": "Revvity",
+ "name": "上样盘3",
+ "parent": "HPLC",
"type": "plate",
"class": null,
"position": {
diff --git a/unilabos/app/main.py b/unilabos/app/main.py
index d418c5c1..f75a3295 100644
--- a/unilabos/app/main.py
+++ b/unilabos/app/main.py
@@ -58,6 +58,18 @@ def parse_args():
default=None,
help="配置文件路径,支持.py格式的Python配置文件",
)
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=8002,
+ help="信息页web服务的启动端口",
+ )
+ parser.add_argument(
+ "--open_browser",
+ type=bool,
+ default=True,
+ help="是否在启动时打开信息页",
+ )
return parser.parse_args()
@@ -84,6 +96,9 @@ def main():
# 设置BasicConfig参数
BasicConfig.is_host_mode = not args_dict.get("without_host", False)
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
+ machine_name = os.popen("hostname").read().strip()
+ machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
+ BasicConfig.machine_name = machine_name
from unilabos.resources.graphio import (
read_node_link_json,
@@ -151,7 +166,7 @@ def main():
mqtt_client.start()
start_backend(**args_dict)
- start_server()
+ start_server(port=args_dict.get("port", 8002), open_browser=args_dict.get("open_browser", False))
if __name__ == "__main__":
diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py
index 8ed35f95..a6123fb2 100644
--- a/unilabos/app/mq.py
+++ b/unilabos/app/mq.py
@@ -146,7 +146,7 @@ class MQTTClient:
if self.mqtt_disable:
return
status = {"data": device_status.get(device_id, {}), "device_id": device_id}
- address = f"labs/{MQConfig.lab_id}/devices"
+ address = f"labs/{MQConfig.lab_id}/devices/"
self.client.publish(address, json.dumps(status), qos=2)
logger.critical(f"Device status published: address: {address}, {status}")
@@ -168,11 +168,8 @@ class MQTTClient:
if self.mqtt_disable:
return
address = f"labs/{MQConfig.lab_id}/actions/"
- action_type_name = action_info["title"]
- action_info["title"] = action_id
- action_data = json.dumps({action_type_name: action_info}, ensure_ascii=False)
- self.client.publish(address, action_data, qos=2)
- logger.debug(f"Action data published: address: {address}, {action_data}")
+ self.client.publish(address, json.dumps(action_info), qos=2)
+ logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}")
mqtt_client = MQTTClient()
diff --git a/unilabos/app/web/pages.py b/unilabos/app/web/pages.py
index 7216a66c..a08cebb5 100644
--- a/unilabos/app/web/pages.py
+++ b/unilabos/app/web/pages.py
@@ -92,19 +92,7 @@ def setup_web_pages(router: APIRouter) -> None:
# 获取已加载的设备
if lab_registry:
- # 设备类型
- for device_id, device_info in lab_registry.device_type_registry.items():
- msg = {
- "id": device_id,
- "name": device_info.get("name", "未命名"),
- "file_path": device_info.get("file_path", ""),
- "class_json": json.dumps(
- device_info.get("class", {}), indent=4, ensure_ascii=False, cls=TypeEncoder
- ),
- }
- mqtt_client.publish_registry(device_id, device_info)
- devices.append(msg)
-
+ devices = json.loads(json.dumps(lab_registry.obtain_registry_device_info(), ensure_ascii=False, cls=TypeEncoder))
# 资源类型
for resource_id, resource_info in lab_registry.resource_type_registry.items():
resources.append(
diff --git a/unilabos/app/web/templates/status.html b/unilabos/app/web/templates/status.html
index 30c3e6b8..e1105b7b 100644
--- a/unilabos/app/web/templates/status.html
+++ b/unilabos/app/web/templates/status.html
@@ -96,17 +96,19 @@
- {{ device.class_json }}
-
+ {% if device.class %}
+ {{ device.class | tojson(indent=4) }}
+ {% else %}
+
+ // No data
+ {% endif %}
+
{% if device.is_online %}
在线
{% endif %}
@@ -362,7 +371,12 @@
- {{ action_info|tojson(indent=2) }}
+ {% if action_info %}
+ {{ action_info | tojson(indent=4) }}
+ {% else %}
+
+ // No data
+ {% endif %}
diff --git a/unilabos/app/web/utils/host_utils.py b/unilabos/app/web/utils/host_utils.py
index 0df5e816..a9070486 100644
--- a/unilabos/app/web/utils/host_utils.py
+++ b/unilabos/app/web/utils/host_utils.py
@@ -30,20 +30,19 @@ def get_host_node_info() -> Dict[str, Any]:
return host_info
host_info["available"] = True
host_info["devices"] = {
- device_id: {
+ edge_device_id: {
"namespace": namespace,
- "is_online": f"{namespace}/{device_id}" in host_node._online_devices,
- "key": f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}",
+ "is_online": f"{namespace}/{edge_device_id}" in host_node._online_devices,
+ "key": f"{namespace}/{edge_device_id}" if namespace.startswith("/") else f"/{namespace}/{edge_device_id}",
+ "machine_name": host_node.device_machine_names.get(edge_device_id, "未知"),
}
- for device_id, namespace in host_node.devices_names.items()
+ for edge_device_id, namespace in host_node.devices_names.items()
}
# 获取已订阅的主题
host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics))
# 获取动作客户端信息
for action_id, client in host_node._action_clients.items():
- host_info["action_clients"] = {
- action_id: get_action_info(client, full_name=action_id)
- }
+ host_info["action_clients"] = {action_id: get_action_info(client, full_name=action_id)}
# 获取设备状态
host_info["device_status"] = host_node.device_status
diff --git a/unilabos/app/web/utils/ros_utils.py b/unilabos/app/web/utils/ros_utils.py
index 01733510..8e1c767c 100644
--- a/unilabos/app/web/utils/ros_utils.py
+++ b/unilabos/app/web/utils/ros_utils.py
@@ -12,6 +12,7 @@ from unilabos.app.web.utils.action_utils import get_action_info
# 存储 ROS 节点信息的全局变量
ros_node_info = {"online_devices": {}, "device_topics": {}, "device_actions": {}}
+
def get_ros_node_info() -> Dict[str, Any]:
"""获取 ROS 节点信息,包括设备节点、发布的状态和动作
@@ -35,6 +36,13 @@ def update_ros_node_info() -> Dict[str, Any]:
try:
from unilabos.ros.nodes.base_device_node import registered_devices
+ from unilabos.ros.nodes.presets.host_node import HostNode
+
+ # 尝试获取主机节点实例
+ host_node = HostNode.get_instance(0)
+ device_machine_names = {}
+ if host_node:
+ device_machine_names = host_node.device_machine_names
for device_id, device_info in registered_devices.items():
# 设备基本信息
@@ -42,6 +50,7 @@ def update_ros_node_info() -> Dict[str, Any]:
"node_name": device_info["node_name"],
"namespace": device_info["namespace"],
"uuid": device_info["uuid"],
+ "machine_name": device_machine_names.get(device_id, "本地"),
}
# 设备话题(状态)信息
@@ -55,10 +64,7 @@ def update_ros_node_info() -> Dict[str, Any]:
}
# 设备动作信息
- result["device_actions"][device_id] = {
- k: get_action_info(v, k)
- for k, v in device_info["actions"].items()
- }
+ result["device_actions"][device_id] = {k: get_action_info(v, k) for k, v in device_info["actions"].items()}
# 更新全局变量
ros_node_info = result
except Exception as e:
diff --git a/unilabos/config/config.py b/unilabos/config/config.py
index 5c1f7b9c..3f3d8dd7 100644
--- a/unilabos/config/config.py
+++ b/unilabos/config/config.py
@@ -12,6 +12,7 @@ class BasicConfig:
config_path = ""
is_host_mode = True # 从registry.py移动过来
slave_no_host = False # 是否跳过rclient.wait_for_service()
+ machine_name = "undefined"
# MQTT配置
diff --git a/unilabos/registry/devices/characterization_optic.yaml b/unilabos/registry/devices/characterization_optic.yaml
index 0164ae4a..52ffc87a 100644
--- a/unilabos/registry/devices/characterization_optic.yaml
+++ b/unilabos/registry/devices/characterization_optic.yaml
@@ -19,6 +19,49 @@ raman_home_made:
status:
type: string
required:
- - status
+ - status
additionalProperties: false
- type: object
\ No newline at end of file
+ type: object
+hplc.agilent:
+ description: HPLC device
+ class:
+ module: unilabos.devices.hplc.AgilentHPLC:HPLCDriver
+ type: python
+ status_types:
+ device_status: String
+ could_run: Bool
+ driver_init_ok: Bool
+ is_running: Bool
+ finish_status: String
+ status_text: String
+ action_value_mappings:
+ execute_command_from_outer:
+ type: SendCmd
+ goal:
+ command: command
+ feedback: {}
+ result:
+ success: success
+ schema:
+ properties:
+ device_status:
+ type: string
+ could_run:
+ type: boolean
+ driver_init_ok:
+ type: boolean
+ is_running:
+ type: boolean
+ finish_status:
+ type: string
+ status_text:
+ type: string
+ required:
+ - device_status
+ - could_run
+ - driver_init_ok
+ - is_running
+ - finish_status
+ - status_text
+ additionalProperties: false
+ type: object
diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py
index 9c7a95b9..feba3fef 100644
--- a/unilabos/registry/registry.py
+++ b/unilabos/registry/registry.py
@@ -1,3 +1,4 @@
+import json
import os
import sys
from pathlib import Path
@@ -6,8 +7,9 @@ from typing import Any
import yaml
from unilabos.utils import logger
-from unilabos.ros.msgs.message_converter import msg_converter_manager
+from unilabos.ros.msgs.message_converter import msg_converter_manager, ros_action_to_json_schema
from unilabos.utils.decorator import singleton
+from unilabos.utils.type_check import TypeEncoder
DEFAULT_PATHS = [Path(__file__).absolute().parent]
@@ -129,6 +131,7 @@ class Registry:
action_config["type"] = self._replace_type_with_class(
action_config["type"], device_id, f"动作 {action_name}"
)
+ action_config["schema"] = ros_action_to_json_schema(action_config["type"])
self.device_type_registry.update(data)
@@ -143,6 +146,16 @@ class Registry:
f"[UniLab Registry] Device File-{i+1}/{len(files)} Not Valid YAML File: {file.absolute()}"
)
+ def obtain_registry_device_info(self):
+ devices = []
+ for device_id, device_info in self.device_type_registry.items():
+ msg = {
+ "id": device_id,
+ **device_info
+ }
+ devices.append(msg)
+ return devices
+
# 全局单例实例
lab_registry = Registry()
diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py
index e5c16bc4..71e4aaeb 100644
--- a/unilabos/resources/graphio.py
+++ b/unilabos/resources/graphio.py
@@ -1,4 +1,5 @@
import importlib
+import inspect
import json
from typing import Union
import numpy as np
@@ -384,7 +385,11 @@ def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR":
d = resource_ulab_to_plr_inner(resource)
"""无法通过Resource进行反序列化,例如TipSpot必须内部序列化好,直接用TipSpot序列化会多参数,导致出错"""
from pylabrobot.utils.object_parsing import find_subclass
- resource_plr = find_subclass(d["type"], ResourcePLR).deserialize(d, allow_marshal=True)
+ sub_cls = find_subclass(d["type"], ResourcePLR)
+ spect = inspect.signature(sub_cls)
+ if "category" not in spect.parameters:
+ d.pop("category")
+ resource_plr = sub_cls.deserialize(d, allow_marshal=True)
resource_plr.load_all_state(all_states)
return resource_plr
diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py
index 1ae37906..9ac96748 100644
--- a/unilabos/ros/main_slave_run.py
+++ b/unilabos/ros/main_slave_run.py
@@ -1,22 +1,25 @@
+import copy
+import json
import os
-import traceback
+import threading
from typing import Optional, Dict, Any, List
import rclpy
from unilabos_msgs.msg import Resource # type: ignore
-from unilabos_msgs.srv import ResourceAdd # type: ignore
+from unilabos_msgs.srv import ResourceAdd, SerialCommand # type: ignore
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.timer import Timer
+from unilabos.registry.registry import lab_registry
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg,
)
from unilabos.ros.nodes.presets.host_node import HostNode
-from unilabos.ros.x.rclpyx import run_event_loop_in_thread
from unilabos.utils import logger
from unilabos.config.config import BasicConfig
+from unilabos.utils.type_check import TypeEncoder
def exit() -> None:
@@ -59,16 +62,11 @@ def main(
discovery_interval,
)
- executor.add_node(host_node)
- # run_event_loop_in_thread()
+ thread = threading.Thread(target=executor.spin, daemon=True, name="host_executor_thread")
+ thread.start()
- try:
- executor.spin()
- except Exception as e:
- logger.error(traceback.format_exc())
- print(f"Exception caught: {e}")
- finally:
- exit()
+ while True:
+ input()
def slave(
@@ -82,7 +80,7 @@ def slave(
"""从节点函数"""
rclpy.init(args=args)
rclpy.__executor = executor = MultiThreadedExecutor()
-
+ devices_config_copy = copy.deepcopy(devices_config)
for device_id, device_config in devices_config.items():
d = initialize_device_from_dict(device_id, device_config)
if d is None:
@@ -93,32 +91,36 @@ def slave(
# else:
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
- machine_name = os.popen("hostname").read().strip()
- machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
- n = Node(f"slaveMachine_{machine_name}", parameter_overrides=[])
+ n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
executor.add_node(n)
- if BasicConfig.slave_no_host:
- # 确保ResourceAdd存在
- if "ResourceAdd" in globals():
- rclient = n.create_client(ResourceAdd, "/resources/add")
- rclient.wait_for_service() # FIXME 可能一直等待,加一个参数
+ thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
+ thread.start()
- request = ResourceAdd.Request()
- request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources_config]
- response = rclient.call_async(request)
- else:
- print("Warning: ResourceAdd service not available")
+ if not BasicConfig.slave_no_host:
+ sclient = n.create_client(SerialCommand, "/node_info_update")
+ sclient.wait_for_service()
- run_event_loop_in_thread()
+ request = SerialCommand.Request()
+ request.command = json.dumps({
+ "machine_name": BasicConfig.machine_name,
+ "type": "slave",
+ "devices_config": devices_config_copy,
+ "registry_config": lab_registry.obtain_registry_device_info()
+ }, ensure_ascii=False, cls=TypeEncoder)
+ response = sclient.call_async(request).result()
+ logger.info(f"Slave node info updated.")
- try:
- executor.spin()
- except Exception as e:
- print(f"Exception caught: {e}")
- finally:
- exit()
+ rclient = n.create_client(ResourceAdd, "/resources/add")
+ rclient.wait_for_service() # FIXME 可能一直等待,加一个参数
+ request = ResourceAdd.Request()
+ request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources_config]
+ response = rclient.call_async(request).result()
+ logger.info(f"Slave resource added.")
+
+ while True:
+ input()
if __name__ == "__main__":
main()
diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py
index 0ff03a68..7e032064 100644
--- a/unilabos/ros/nodes/base_device_node.py
+++ b/unilabos/ros/nodes/base_device_node.py
@@ -1,3 +1,4 @@
+import json
import threading
import time
import traceback
@@ -13,15 +14,17 @@ from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle
from rclpy.client import Client
from rclpy.callback_groups import ReentrantCallbackGroup
+from rclpy.service import Service
from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type
from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg,
convert_from_ros_msg,
convert_from_ros_msg_with_mapping,
- convert_to_ros_msg_with_mapping,
+ convert_to_ros_msg_with_mapping, ros_action_to_json_schema,
)
-from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore
+from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \
+ SerialCommand # type: ignore
from unilabos_msgs.msg import Resource # type: ignore
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
@@ -29,7 +32,7 @@ from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator
from unilabos.utils.async_util import run_async_func
from unilabos.utils.log import info, debug, warning, error, critical, logger
-from unilabos.utils.type_check import get_type_class
+from unilabos.utils.type_check import get_type_class, TypeEncoder
T = TypeVar("T")
@@ -44,19 +47,17 @@ class ROSLoggerAdapter:
@property
def identifier(self):
- return f"{self.namespace}/{self.node_name}"
+ return f"{self.namespace}"
- def __init__(self, ros_logger, node_name, namespace):
+ def __init__(self, ros_logger, namespace):
"""
初始化日志适配器
Args:
ros_logger: ROS2日志记录器
- node_name: 节点名称
namespace: 命名空间
"""
self.ros_logger = ros_logger
- self.node_name = node_name
self.namespace = namespace
self.level_2_logger_func = {
"info": info,
@@ -258,9 +259,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.lab_logger().critical("资源跟踪器未初始化,请检查")
# 创建自定义日志记录器
- self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.node_name, self.namespace)
+ self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.namespace)
- self._action_servers = {}
+ self._action_servers: Dict[str, ActionServer] = {}
self._property_publishers = {}
self._status_types = status_types
self._action_value_mappings = action_value_mappings
@@ -284,7 +285,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.create_ros_action_server(action_name, action_value_mapping)
# 创建线程池执行器
- self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1))
+ self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}")
# 创建资源管理客户端
self._resource_clients: Dict[str, Client] = {
@@ -295,6 +296,18 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"resource_list": self.create_client(ResourceList, "/resources/list"),
}
+ def query_host_name_cb(req, res):
+ self.register_device()
+ self.lab_logger().info("Host要求重新注册当前节点")
+ res.response = ""
+ return res
+
+ self._service_server: Dict[str, Service] = {
+ "query_host_name": self.create_service(
+ SerialCommand, f"/srv{self.namespace}/query_host_name", query_host_name_cb, callback_group=self.callback_group
+ ),
+ }
+
# 向全局在线设备注册表添加设备信息
self.register_device()
rclpy.get_global_executor().add_node(self)
@@ -318,6 +331,31 @@ class BaseROS2DeviceNode(Node, Generic[T]):
)
# 加入全局注册表
registered_devices[self.device_id] = device_info
+ from unilabos.config.config import BasicConfig
+ if not BasicConfig.is_host_mode:
+ sclient = self.create_client(SerialCommand, "/node_info_update")
+ # 启动线程执行发送任务
+ threading.Thread(
+ target=self.send_slave_node_info,
+ args=(sclient,),
+ daemon=True,
+ name=f"ROSDevice{self.device_id}_send_slave_node_info"
+ ).start()
+
+ def send_slave_node_info(self, sclient):
+ sclient.wait_for_service()
+ request = SerialCommand.Request()
+ from unilabos.config.config import BasicConfig
+ request.command = json.dumps({
+ "SYNC_SLAVE_NODE_INFO": {
+ "machine_name": BasicConfig.machine_name,
+ "type": "slave",
+ "edge_device_id": self.device_id
+ }}, ensure_ascii=False, cls=TypeEncoder)
+
+ # 发送异步请求并等待结果
+ future = sclient.call_async(request)
+ response = future.result()
def lab_logger(self):
"""
diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py
index 34132b35..5a739773 100644
--- a/unilabos/ros/nodes/presets/host_node.py
+++ b/unilabos/ros/nodes/presets/host_node.py
@@ -1,4 +1,5 @@
import copy
+import json
import threading
import time
import traceback
@@ -7,12 +8,13 @@ from typing import Optional, Dict, Any, List, ClassVar, Set
from action_msgs.msg import GoalStatus
from unilabos_msgs.msg import Resource # type: ignore
-from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore
+from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, SerialCommand # type: ignore
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.service import Service
from unique_identifier_msgs.msg import UUID
+from unilabos.registry.registry import lab_registry
from unilabos.resources.registry import add_schema
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
@@ -20,10 +22,12 @@ from unilabos.ros.msgs.message_converter import (
get_ros_type_by_msgname,
convert_from_ros_msg,
convert_to_ros_msg,
- msg_converter_manager, ros_action_to_json_schema,
+ msg_converter_manager,
+ ros_action_to_json_schema,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode
+from unilabos.utils.type_check import TypeEncoder
class HostNode(BaseROS2DeviceNode):
@@ -95,6 +99,7 @@ class HostNode(BaseROS2DeviceNode):
# 创建设备、动作客户端和目标存储
self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
+ self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射
self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
@@ -106,18 +111,24 @@ class HostNode(BaseROS2DeviceNode):
self._subscribed_topics = set() # 用于跟踪已订阅的话题
# 创建物料增删改查服务(非客户端)
- self._init_resource_service()
+ self._init_host_service()
self.device_status = {} # 用来存储设备状态
self.device_status_timestamps = {} # 用来存储设备状态最后更新时间
+ from unilabos.app.mq import mqtt_client
+ for device_config in lab_registry.obtain_registry_device_info():
+ mqtt_client.publish_registry(device_config["id"], device_config)
+
# 首次发现网络中的设备
self._discover_devices()
# 初始化所有本机设备节点,多一次过滤,防止重复初始化
for device_id, device_config in devices_config.items():
if device_config.get("type", "device") != "device":
- self.lab_logger().debug(f"[Host Node] Skipping type {device_config['type']} {device_id} already existed, skipping.")
+ self.lab_logger().debug(
+ f"[Host Node] Skipping type {device_config['type']} {device_id} already existed, skipping."
+ )
continue
if device_id not in self.devices_names:
self.initialize_device(device_id, device_config)
@@ -150,6 +161,13 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info("[Host Node] Host node initialized.")
HostNode._ready_event.set()
+ def _send_re_register(self, sclient):
+ sclient.wait_for_service()
+ request = SerialCommand.Request()
+ request.command = ""
+ future = sclient.call_async(request)
+ response = future.result()
+
def _discover_devices(self) -> None:
"""
发现网络中的设备
@@ -166,23 +184,37 @@ class HostNode(BaseROS2DeviceNode):
current_devices = set()
for device_id, namespace in nodes_and_names:
- if not namespace.startswith("/devices"):
+ if not namespace.startswith("/devices/"):
continue
-
+ edge_device_id = namespace[9:]
# 将设备添加到当前设备集合
- device_key = f"{namespace}/{device_id}"
+ device_key = f"{namespace}/{edge_device_id}" # namespace已经包含device_id了,这里复写一遍
current_devices.add(device_key)
# 如果是新设备,记录并创建ActionClient
- if device_id not in self.devices_names:
+ if edge_device_id not in self.devices_names:
self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}")
- self.devices_names[device_id] = namespace
+ self.devices_names[edge_device_id] = namespace
self._create_action_clients_for_device(device_id, namespace)
self._online_devices.add(device_key)
+ sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name")
+ threading.Thread(
+ target=self._send_re_register,
+ args=(sclient,),
+ daemon=True,
+ name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
+ ).start()
elif device_key not in self._online_devices:
# 设备重新上线
self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}")
self._online_devices.add(device_key)
+ sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name")
+ threading.Thread(
+ target=self._send_re_register,
+ args=(sclient,),
+ daemon=True,
+ name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
+ ).start()
# 检测离线设备
offline_devices = self._online_devices - current_devices
@@ -224,16 +256,22 @@ class HostNode(BaseROS2DeviceNode):
self._action_clients[action_id] = ActionClient(
self, action_type, action_id, callback_group=self.callback_group
)
- self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}")
+ self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}")
+ action_name = action_id[len(namespace) + 1:]
+ edge_device_id = namespace[9:]
from unilabos.app.mq import mqtt_client
info_with_schema = ros_action_to_json_schema(action_type)
- mqtt_client.publish_actions(action_id, info_with_schema)
+ mqtt_client.publish_actions(action_name, {
+ "device_id": edge_device_id,
+ "action_name": action_name,
+ "schema": info_with_schema,
+ })
except Exception as e:
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None:
"""
- 根据配置初始化设备
+ 根据配置初始化设备,
此函数根据提供的设备配置动态导入适当的设备类并创建其实例。
同时为设备的动作值映射设置动作客户端。
@@ -249,7 +287,8 @@ class HostNode(BaseROS2DeviceNode):
if d is None:
return
# noinspection PyProtectedMember
- self.devices_names[device_id] = d._ros_node.namespace
+ self.devices_names[device_id] = d._ros_node.namespace # 这里不涉及二级device_id
+ self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
@@ -257,13 +296,17 @@ class HostNode(BaseROS2DeviceNode):
if action_id not in self._action_clients:
action_type = action_value_mapping["type"]
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
- self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}")
+ self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的
from unilabos.app.mq import mqtt_client
info_with_schema = ros_action_to_json_schema(action_type)
- mqtt_client.publish_actions(action_id, info_with_schema)
+ mqtt_client.publish_actions(action_name, {
+ "device_id": device_id,
+ "action_name": action_name,
+ "schema": info_with_schema,
+ })
else:
self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.")
- device_key = f"{self.devices_names[device_id]}/{device_id}"
+ device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id
# 添加到在线设备列表
self._online_devices.add(device_key)
@@ -285,8 +328,8 @@ class HostNode(BaseROS2DeviceNode):
# 解析设备名和属性名
parts = topic.split("/")
- if len(parts) >= 4:
- device_id = parts[-2]
+ if len(parts) >= 4: # 可能有ProtocolNode,创建更长的设备
+ device_id = "/".join(parts[2:-1])
property_name = parts[-1]
# 初始化设备状态字典
@@ -473,7 +516,7 @@ class HostNode(BaseROS2DeviceNode):
"""Resource"""
- def _init_resource_service(self):
+ def _init_host_service(self):
self._resource_services: Dict[str, Service] = {
"resource_add": self.create_service(
ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=ReentrantCallbackGroup()
@@ -496,8 +539,39 @@ class HostNode(BaseROS2DeviceNode):
"resource_list": self.create_service(
ResourceList, "/resources/list", self._resource_list_callback, callback_group=ReentrantCallbackGroup()
),
+ "node_info_update": self.create_service(
+ SerialCommand,
+ "/node_info_update",
+ self._node_info_update_callback,
+ callback_group=ReentrantCallbackGroup(),
+ ),
}
+ def _node_info_update_callback(self, request, response):
+ """
+ 更新节点信息回调
+ """
+ self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
+ try:
+ from unilabos.app.mq import mqtt_client
+
+ info = json.loads(request.command)
+ if "SYNC_SLAVE_NODE_INFO" in info:
+ info = info["SYNC_SLAVE_NODE_INFO"]
+ machine_name = info["machine_name"]
+ edge_device_id = info["edge_device_id"]
+ self.device_machine_names[edge_device_id] = machine_name
+ else:
+ registry_config = info["registry_config"]
+ for device_config in registry_config:
+ mqtt_client.publish_registry(device_config["id"], device_config)
+ self.lab_logger().debug(f"[Host Node] Node info update: {info}")
+ response.response = "OK"
+ except Exception as e:
+ self.lab_logger().error(f"[Host Node] Error updating node info: {e.args}")
+ response.response = "ERROR"
+ return response
+
def _resource_add_callback(self, request, response):
"""
添加资源回调
diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py
index b2402b5e..2ea30856 100644
--- a/unilabos/ros/utils/driver_creator.py
+++ b/unilabos/ros/utils/driver_creator.py
@@ -224,8 +224,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
if hasattr(self.device_instance, "setup") and asyncio.iscoroutinefunction(getattr(self.device_instance, "setup")):
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
ROS2DeviceNode.run_async_func(getattr(self.device_instance, "setup")).add_done_callback(lambda x: logger.debug(f"PyLabRobot设备实例 {self.device_instance} 设置完成"))
-# 2486229810384
-#2486232539792
+
class ProtocolNodeCreator(DeviceClassCreator[T]):
"""
|