mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-24 13:29:14 +00:00
fast registry load minor fix on skill & registry stripe ros2 schema desc add create-device-skill new registry system backwards to yaml remove not exist resource new registry sys exp. support with add device add ai conventions correct raise create resource error ret info fix revert ret info fix fix prcxi check add create_resource schema re signal host ready event add websocket connection timeout and improve reconnection logic add open_timeout parameter to websocket connection add TimeoutError and InvalidStatus exception handling implement exponential backoff for reconnection attempts simplify reconnection logic flow add gzip change pose extra to any add isFlapY
71 lines
2.9 KiB
Python
71 lines
2.9 KiB
Python
import rclpy
|
||
from rclpy.node import Node
|
||
import cv2
|
||
from sensor_msgs.msg import Image
|
||
from cv_bridge import CvBridge
|
||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker
|
||
from unilabos.registry.decorators import device
|
||
|
||
|
||
@device(
|
||
id="camera",
|
||
category=["camera"],
|
||
description="""VideoPublisher摄像头设备节点,用于实时视频采集和流媒体发布。该设备通过OpenCV连接本地摄像头(如USB摄像头、内置摄像头等),定时采集视频帧并将其转换为ROS2的sensor_msgs/Image消息格式发布到视频话题。主要用于实验室自动化系统中的视觉监控、图像分析、实时观察等应用场景。支持可配置的摄像头索引、发布频率等参数。""",
|
||
)
|
||
class VideoPublisher(BaseROS2DeviceNode):
|
||
def __init__(self, device_id='video_publisher', registry_name="", device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
|
||
# 初始化BaseROS2DeviceNode,使用自身作为driver_instance
|
||
BaseROS2DeviceNode.__init__(
|
||
self,
|
||
driver_instance=self,
|
||
device_id=device_id,
|
||
registry_name=registry_name,
|
||
device_uuid=device_uuid,
|
||
status_types={},
|
||
action_value_mappings={},
|
||
hardware_interface="camera",
|
||
print_publish=False,
|
||
resource_tracker=resource_tracker,
|
||
)
|
||
# 创建一个发布者,发布到 /video 话题,消息类型为 sensor_msgs/Image,队列长度设为 10
|
||
self.publisher_ = self.create_publisher(Image, f'/{device_id}/video', 10)
|
||
# 初始化摄像头(默认设备索引为 0)
|
||
self.cap = cv2.VideoCapture(camera_index)
|
||
if not self.cap.isOpened():
|
||
self.get_logger().error("无法打开摄像头")
|
||
# 用于将 OpenCV 的图像转换为 ROS 图像消息
|
||
self.bridge = CvBridge()
|
||
# 设置定时器,10 Hz 发布一次
|
||
timer_period = period # 单位:秒
|
||
self.timer = self.create_timer(timer_period, self.timer_callback)
|
||
|
||
def timer_callback(self):
|
||
ret, frame = self.cap.read()
|
||
if not ret:
|
||
self.get_logger().error("读取视频帧失败")
|
||
return
|
||
# 将 OpenCV 图像转换为 ROS Image 消息,注意图像编码需与摄像头数据匹配,这里使用 bgr8
|
||
img_msg = self.bridge.cv2_to_imgmsg(frame, encoding="bgr8")
|
||
self.publisher_.publish(img_msg)
|
||
# self.get_logger().info("已发布视频帧")
|
||
|
||
def destroy_node(self):
|
||
# 释放摄像头资源
|
||
if self.cap.isOpened():
|
||
self.cap.release()
|
||
super().destroy_node()
|
||
|
||
def main(args=None):
|
||
rclpy.init(args=args)
|
||
node = VideoPublisher()
|
||
try:
|
||
rclpy.spin(node)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
finally:
|
||
node.destroy_node()
|
||
rclpy.shutdown()
|
||
|
||
if __name__ == '__main__':
|
||
main()
|