mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-27 15:24:03 +00:00
P1 Edge: 关节桥接重构 — 直接订阅 /joint_states + 资源跟随 + 吞吐优化
- HostNode 直接订阅 /joint_states (JointStateMsg),绕过 JointRepublisher 中间人 - 新增 resource_pose 订阅,实现资源夹取跟随 (gripper attach/detach) - 吞吐优化:死区过滤 (1e-4 rad)、抑频 (~20Hz)、增量 resource_poses - JointRepublisher 修复 str→json.dumps (E1) - communication.py 新增 publish_joint_state 抽象方法 - ws_client.py 实现 push_joint_state action 发送 - 57 项测试覆盖:关节分组、资源跟随、同类型多设备、优化行为 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -50,6 +50,17 @@ class BaseCommunicationClient(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def publish_joint_state(self, node_uuid: str, joint_states: dict, resource_poses: dict = None) -> None:
|
||||
"""
|
||||
发布高频关节状态数据(push_joint_state action,不写 DB)
|
||||
|
||||
Args:
|
||||
node_uuid: 设备节点的云端 UUID
|
||||
joint_states: 关节名 → 角度/位置 的映射
|
||||
resource_poses: 物料附着映射(可选)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def publish_job_status(
|
||||
self, feedback_data: dict, job_id: str, status: str, return_info: Optional[dict] = None
|
||||
|
||||
@@ -1434,6 +1434,21 @@ class WebSocketClient(BaseCommunicationClient):
|
||||
self.message_processor.send_message(message)
|
||||
# logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}")
|
||||
|
||||
def publish_joint_state(self, node_uuid: str, joint_states: dict, resource_poses: dict = None) -> None:
|
||||
"""发布高频关节状态(push_joint_state,不写 DB)"""
|
||||
if self.is_disabled or not self.is_connected():
|
||||
return
|
||||
|
||||
message = {
|
||||
"action": "push_joint_state",
|
||||
"data": {
|
||||
"node_uuid": node_uuid,
|
||||
"joint_states": joint_states or {},
|
||||
"resource_poses": resource_poses or {},
|
||||
},
|
||||
}
|
||||
self.message_processor.send_message(message)
|
||||
|
||||
def publish_job_status(
|
||||
self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None
|
||||
) -> None:
|
||||
|
||||
@@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Unio
|
||||
|
||||
from action_msgs.msg import GoalStatus
|
||||
from geometry_msgs.msg import Point
|
||||
from sensor_msgs.msg import JointState as JointStateMsg
|
||||
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
|
||||
from rclpy.service import Service
|
||||
from typing_extensions import TypedDict
|
||||
@@ -348,6 +349,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
else:
|
||||
self.lab_logger().warning(f"[Host Node] Device {device_id} already existed, skipping.")
|
||||
self.update_device_status_subscriptions()
|
||||
|
||||
# 订阅 joint_state_repub topic,桥接关节数据到云端
|
||||
self._init_joint_state_bridge()
|
||||
|
||||
# TODO: 需要验证 初始化所有控制器节点
|
||||
if controllers_config:
|
||||
update_rate = controllers_config["controller_manager"]["ros__parameters"]["update_rate"]
|
||||
@@ -782,6 +787,179 @@ class HostNode(BaseROS2DeviceNode):
|
||||
else:
|
||||
self.lab_logger().trace(f"Status updated: {device_id}.{property_name} = {msg.data}")
|
||||
|
||||
"""关节数据 & 资源跟随桥接"""
|
||||
|
||||
# 吞吐优化参数
|
||||
_JOINT_DEAD_BAND: float = 1e-4 # 关节角度变化小于此值视为无变化
|
||||
_JOINT_MIN_INTERVAL: float = 0.05 # 最小发送间隔 (秒),限制到 ~20Hz
|
||||
|
||||
def _init_joint_state_bridge(self):
|
||||
"""
|
||||
订阅 /joint_states (sensor_msgs/JointState) 和 resource_pose (String),
|
||||
构建 device_id → uuid 映射,并维护 resource_poses 状态。
|
||||
|
||||
吞吐优化:
|
||||
- 死区过滤 (dead band): 关节角度变化 < 阈值时不发送
|
||||
- 抑频 (throttle): 限制最大发送频率,避免 ROS2 1kHz 打满 WS
|
||||
- 增量 resource_poses: 仅在 resource_pose 实际变化时才附带发送
|
||||
"""
|
||||
# 构建 device_id → cloud_uuid 映射(从 devices_config 中获取)
|
||||
self._device_uuid_map: Dict[str, str] = {}
|
||||
for tree in self.devices_config.trees:
|
||||
node = tree.root_node
|
||||
if node.res_content.type == "device" and node.res_content.uuid:
|
||||
self._device_uuid_map[node.res_content.id] = node.res_content.uuid
|
||||
|
||||
# 按 device_id 长度降序排列,最长前缀优先匹配(避免 arm 抢先匹配 arm_left_j1)
|
||||
self._device_ids_sorted = sorted(self._device_uuid_map.keys(), key=len, reverse=True)
|
||||
|
||||
# 资源挂载状态:{resource_id: parent_link_name}
|
||||
self._resource_poses: Dict[str, str] = {}
|
||||
# resource_pose 变化标志,仅在真正变化时随关节数据发送
|
||||
self._resource_poses_dirty: bool = False
|
||||
|
||||
# 吞吐优化状态
|
||||
self._last_joint_values: Dict[str, float] = {} # 上次发送的关节值(全局)
|
||||
self._last_send_time: float = -float("inf") # 上次发送时间戳(初始为-inf确保首条通过)
|
||||
self._last_sent_resource_poses: Dict[str, str] = {} # 上次发送的 resource_poses 快照
|
||||
|
||||
if not self._device_uuid_map:
|
||||
self.lab_logger().debug("[Host Node] 无设备 UUID 映射,跳过关节桥接")
|
||||
return
|
||||
|
||||
# 直接订阅 /joint_states(sensor_msgs/JointState),无需经过 JointRepublisher
|
||||
self.create_subscription(
|
||||
JointStateMsg,
|
||||
"/joint_states",
|
||||
self._joint_state_callback,
|
||||
10,
|
||||
callback_group=self.callback_group,
|
||||
)
|
||||
|
||||
# 订阅 resource_pose(资源挂载变化,由 ResourceMeshManager 发布)
|
||||
from std_msgs.msg import String as StdString
|
||||
self.create_subscription(
|
||||
StdString,
|
||||
"resource_pose",
|
||||
self._resource_pose_callback,
|
||||
10,
|
||||
callback_group=self.callback_group,
|
||||
)
|
||||
|
||||
self.lab_logger().info(
|
||||
f"[Host Node] 已订阅 /joint_states 和 resource_pose,设备映射: {list(self._device_uuid_map.keys())}"
|
||||
)
|
||||
|
||||
def _resource_pose_callback(self, msg):
|
||||
"""
|
||||
接收 ResourceMeshManager 发布的资源挂载变更。
|
||||
|
||||
msg.data 格式: JSON dict,如 {"tip_rack_A1": "gripper_link", "plate_1": "deck_link"}
|
||||
空 dict {} 表示无变化(心跳包)。
|
||||
"""
|
||||
try:
|
||||
data = json.loads(msg.data)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
return
|
||||
if not isinstance(data, dict) or not data:
|
||||
return
|
||||
# 检测实际变化
|
||||
has_change = False
|
||||
for k, v in data.items():
|
||||
if self._resource_poses.get(k) != v:
|
||||
has_change = True
|
||||
break
|
||||
if has_change:
|
||||
self._resource_poses.update(data)
|
||||
self._resource_poses_dirty = True
|
||||
|
||||
def _joint_state_callback(self, msg: JointStateMsg):
|
||||
"""
|
||||
直接接收 /joint_states (sensor_msgs/JointState),按设备分组后通过 bridge 发送到云端。
|
||||
|
||||
吞吐优化:
|
||||
1. 抑频: 距上次发送 < _JOINT_MIN_INTERVAL 则跳过(除非有 resource_pose 变化)
|
||||
2. 死区: 所有关节角度变化 < _JOINT_DEAD_BAND 则跳过(除非有 resource_pose 变化)
|
||||
3. 增量 resource_poses: 仅在 dirty 时附带,否则发空 dict
|
||||
"""
|
||||
names = list(msg.name)
|
||||
positions = list(msg.position)
|
||||
if not names or len(names) != len(positions):
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
resource_dirty = self._resource_poses_dirty
|
||||
|
||||
# 抑频检查:resource_pose 变化时强制发送
|
||||
if not resource_dirty and (now - self._last_send_time) < self._JOINT_MIN_INTERVAL:
|
||||
return
|
||||
|
||||
# 死区过滤:检测是否有关节值实质变化
|
||||
has_significant_change = False
|
||||
for name, pos in zip(names, positions):
|
||||
last_val = self._last_joint_values.get(name)
|
||||
if last_val is None or abs(float(pos) - last_val) >= self._JOINT_DEAD_BAND:
|
||||
has_significant_change = True
|
||||
break
|
||||
|
||||
# 无关节变化且无资源变化 → 跳过
|
||||
if not has_significant_change and not resource_dirty:
|
||||
return
|
||||
|
||||
# 更新上次发送的关节值
|
||||
for name, pos in zip(names, positions):
|
||||
self._last_joint_values[name] = float(pos)
|
||||
self._last_send_time = now
|
||||
|
||||
# 按设备 ID 分组关节数据(最长前缀优先匹配)
|
||||
device_joints: Dict[str, Dict[str, float]] = {}
|
||||
for name, pos in zip(names, positions):
|
||||
matched_device = None
|
||||
for device_id in self._device_ids_sorted:
|
||||
if name.startswith(device_id + "_"):
|
||||
matched_device = device_id
|
||||
break
|
||||
|
||||
if matched_device:
|
||||
if matched_device not in device_joints:
|
||||
device_joints[matched_device] = {}
|
||||
device_joints[matched_device][name] = float(pos)
|
||||
elif len(self._device_uuid_map) == 1:
|
||||
fallback_id = self._device_ids_sorted[0]
|
||||
if fallback_id not in device_joints:
|
||||
device_joints[fallback_id] = {}
|
||||
device_joints[fallback_id][name] = float(pos)
|
||||
|
||||
# 构建设备级 resource_poses(仅在 dirty 时附带实际数据)
|
||||
device_resource_poses: Dict[str, Dict[str, str]] = {}
|
||||
if resource_dirty:
|
||||
for resource_id, link_name in self._resource_poses.items():
|
||||
matched_device = None
|
||||
for device_id in self._device_ids_sorted:
|
||||
if link_name.startswith(device_id + "_"):
|
||||
matched_device = device_id
|
||||
break
|
||||
if matched_device:
|
||||
if matched_device not in device_resource_poses:
|
||||
device_resource_poses[matched_device] = {}
|
||||
device_resource_poses[matched_device][resource_id] = link_name
|
||||
elif len(self._device_uuid_map) == 1:
|
||||
fallback_id = self._device_ids_sorted[0]
|
||||
if fallback_id not in device_resource_poses:
|
||||
device_resource_poses[fallback_id] = {}
|
||||
device_resource_poses[fallback_id][resource_id] = link_name
|
||||
self._resource_poses_dirty = False
|
||||
|
||||
# 通过 bridge 发送 push_joint_state(含 resource_poses)
|
||||
for device_id, joint_states in device_joints.items():
|
||||
node_uuid = self._device_uuid_map.get(device_id)
|
||||
if not node_uuid:
|
||||
continue
|
||||
resource_poses = device_resource_poses.get(device_id, {})
|
||||
for bridge in self.bridges:
|
||||
if hasattr(bridge, "publish_joint_state"):
|
||||
bridge.publish_joint_state(node_uuid, joint_states, resource_poses)
|
||||
|
||||
def send_goal(
|
||||
self,
|
||||
item: "QueueItem",
|
||||
|
||||
@@ -41,7 +41,7 @@ class JointRepublisher(BaseROS2DeviceNode):
|
||||
json_dict["velocity"] = list(msg.velocity)
|
||||
json_dict["effort"] = list(msg.effort)
|
||||
|
||||
self.msg.data = str(json_dict)
|
||||
self.msg.data = json.dumps(json_dict)
|
||||
self.joint_repub.publish(self.msg)
|
||||
# print('-'*20)
|
||||
# print(self.msg.data)
|
||||
|
||||
Reference in New Issue
Block a user