本博文通过python脚本实现订阅ros2的多个topic,并且利用message filter进行时间同步后,再将topic录制为ros2的bag
通过命令即可运行,python3 sync_camera_joint.py --record,其中,加了--record flag就表示正式录制
import rclpy
from rclpy.node import Node
import message_filters #进行时间同步
from sensor_msgs.msg import Image, JointState # JointState对应于/inspire_hand/state/left_hand和/inspire_hand/state/right_hand
from geometry_msgs.msg import WrenchStamped #对应于/arm_6dof_left和/arm_6dof_right消息
from bodyctrl_msgs.msg import MotorStatusMsg #对应于 /arm/status
import threading
import time
import os
from datetime import datetime
import argparse
import sys
# 导入rosbag2相关模块
try:
import rosbag2_py
from rosbag2_py import SequentialWriter, StorageOptions, ConverterOptions
ROSBAG2_AVAILABLE = True
except ImportError:
ROSBAG2_AVAILABLE = False
print("警告: rosbag2_py 不可用,录制功能将禁用!!!!!!")
class MultiTopicSyncNode(Node):
def __init__(self, record_flag=False):
super().__init__('multi_topic_sync_node')
# 录制标志
self.record_flag = record_flag
# 定义需要订阅的topic列表
self.required_topics = [
'/camera/color/image_raw',
'/camera/depth/image_raw',
'/arm/status',
'/inspire_hand/state/left_hand',
'/inspire_hand/state/right_hand'
]
# 用于跟踪已确认的topic
self.confirmed_topics = {}
self.confirmation_lock = threading.Lock()
# 创建单次topic确认订阅器
self.confirmation_subscribers = []
# 主同步订阅器(稍后初始化)
self.sync_subscribers = {}
self.ts = None
self.sync_initialized = False
# Rosbag2录制相关变量
self.bag_writer = None
self.recording_enabled = self.record_flag and ROSBAG2_AVAILABLE
self.bag_path = None
# 同步统计
self.sync_count = 0
self.start_time = time.time()
# 如果启用录制,创建bag路径
if self.recording_enabled:
self.bag_path = self.create_bag_path()
self.get_logger().info(f'开始确认topic可用性... 录制模式: {"启用" if self.recording_enabled else "禁用"}')
#调用函数,首先确认哪些topic可用
self.start_topic_confirmation()
def create_bag_path(self):
"""创建rosbag存储路径"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
bag_dir = f"vla_sync_data_{timestamp}" #包的名字+时间
os.makedirs(bag_dir, exist_ok=True)
self.get_logger().info(f'Rosbag将保存到: {os.path.abspath(bag_dir)}')
return bag_dir
def start_topic_confirmation(self):
"""为每个topic创建临时订阅器来确认其存在和活跃"""
for topic in self.required_topics:
# 根据topic名称确定消息类型
msg_type = self.get_message_type(topic)#根据topic的名字来获取消息类型
if msg_type:
# 创建单次确认订阅器
sub = self.create_subscription(
msg_type,
topic,
self.create_topic_callback(topic),
10 # QoS队列大小
)
self.confirmation_subscribers.append(sub)
self.get_logger().info(f'正在确认topic: {topic}')
else:
self.get_logger().warning(f'报错!!!无法确定topic {topic}的消息类型')
# 启动超时检查
self.confirmation_timer = self.create_timer(5.0, self.check_confirmation_status)
def get_message_type(self, topic):
"""根据topic名称返回对应的消息类型"""
if 'image_raw' in topic:
return Image
elif 'hand/state' in topic:
return JointState
elif 'arm/status' in topic:
return MotorStatusMsg
else:
return None
def create_topic_callback(self, topic_name):
"""为每个topic创建确认回调函数"""
def topic_confirmation_callback(msg):
with self.confirmation_lock:
if topic_name not in self.confirmed_topics:
self.confirmed_topics[topic_name] = True
# 计算确认百分比
confirmed_percent = (len(self.confirmed_topics) / len(self.required_topics)) * 100
self.get_logger().info(
f'✓ 确认topic可用: {topic_name} '
f'({len(self.confirmed_topics)}/{len(self.required_topics)}, {confirmed_percent:.1f}%)'
)
# 检查是否所有topic都已确认,如果确认就开始订阅。
self.check_all_topics_confirmed()
return topic_confirmation_callback
def check_all_topics_confirmed(self):
"""检查是否所有需要的topic都已确认"""
confirmed_count = len(self.confirmed_topics)
total_count = len(self.required_topics)
confirmed_percent = (confirmed_count / total_count) * 100
# 根据确认的topic百分比决定是否开始同步
if confirmed_percent >= 100.0 and not self.sync_initialized:
self.get_logger().info('所有topic都已确认(100%),开始初始化时间同步器...')
self.initialize_synchronizer()
elif confirmed_percent <100.0 and not self.sync_initialized:
self.get_logger().warning(
f'已确认{confirmed_percent:.1f}%的topic ({confirmed_count}/{total_count}),'
f'开始部分同步(缺少: {[t for t in self.required_topics if t not in self.confirmed_topics]})'
)
return
def check_confirmation_status(self):
"""定时检查topic确认状态"""
if self.sync_initialized:
self.confirmation_timer.cancel()
return
confirmed_count = len(self.confirmed_topics)
total_count = len(self.required_topics)
confirmed_percent = (confirmed_count / total_count) * 100
missing_topics = [topic for topic in self.required_topics if topic not in self.confirmed_topics]
if missing_topics:
self.get_logger().warning(
f'等待以下topic ({confirmed_percent:.1f}% 已确认): {missing_topics}'
)
# 如果录制已启用但topic不完整,发出警告
if self.recording_enabled:
self.get_logger().warning(
f'录制功能已启用,但topic不完整({confirmed_percent:.1f}%),无法开始录制'
)
else:
self.get_logger().info('所有topic确认完成!')
def initialize_synchronizer(self):
"""初始化时间同步器"""
try:
# 创建同步订阅器
self.sync_subscribers['color_image'] = message_filters.Subscriber(self, Image, '/camera/color/image_raw')
self.sync_subscribers['depth_image'] = message_filters.Subscriber(self, Image, '/camera/depth/image_raw')
self.sync_subscribers['arm_status'] = message_filters.Subscriber(self, MotorStatusMsg, '/arm/status')
self.sync_subscribers['left_hand'] = message_filters.Subscriber(self, JointState, '/inspire_hand/state/left_hand')
self.sync_subscribers['right_hand'] = message_filters.Subscriber(self, JointState, '/inspire_hand/state/right_hand')
# 创建近似时间同步器
self.ts = message_filters.ApproximateTimeSynchronizer(
list(self.sync_subscribers.values()),
queue_size=20,
slop=0.1 # 100ms的时间容差
)
# 注册同步回调函数
self.ts.registerCallback(self.sync_callback)
# 如果启用录制,初始化rosbag录制
if self.recording_enabled:
self.initialize_rosbag_recorder() #进行rosbag的录制
self.sync_initialized = True
# 清理确认用的临时订阅器
for sub in self.confirmation_subscribers:
self.destroy_subscription(sub)
self.confirmation_subscribers.clear()
self.get_logger().info('✓ 多Topic时间同步器已成功启动并运行')
except Exception as e:
self.get_logger().error(f'初始化时间同步器失败: {str(e)}')
def initialize_rosbag_recorder(self):
"""初始化rosbag录制器"""
try:
storage_options = StorageOptions(
uri=self.bag_path,
storage_id='sqlite3'
)
converter_options = ConverterOptions(
input_serialization_format='cdr',
output_serialization_format='cdr'
)
self.bag_writer = SequentialWriter()
self.bag_writer.open(storage_options, converter_options)
# 为每个确认的topic创建录制通道
topic_types = {
'/camera/color/image_raw': 'sensor_msgs/msg/Image',
'/camera/depth/image_raw': 'sensor_msgs/msg/Image',
'/arm/status': 'bodyctrl_msgs/msg/MotorStatusMsg',
'/inspire_hand/state/left_hand': 'sensor_msgs/msg/JointState',
'/inspire_hand/state/right_hand': 'sensor_msgs/msg/JointState'
}
for topic in self.confirmed_topics:
if topic in topic_types:
self.bag_writer.create_topic(
topic,
topic_types[topic],
'cdr',
''
)
self.get_logger().info(f'✓ Rosbag录制已启动: {self.bag_path}')
except Exception as e:
self.get_logger().error(f'初始化rosbag录制失败: {str(e)}')
self.bag_writer = None
def write_to_bag(self, topic, msg):
"""将消息写入rosbag"""
if self.bag_writer is not None:
try:
# 使用消息的时间戳作为rosbag记录的时间戳
if hasattr(msg, 'header') and hasattr(msg.header, 'stamp'):
timestamp = msg.header.stamp
else:
timestamp = self.get_clock().now().to_msg()
self.bag_writer.write(topic, msg, timestamp)
except Exception as e:
self.get_logger().error(f'写入rosbag失败: {str(e)}')
def sync_callback(self, color_image, depth_image, arm_status, left_hand_state, right_hand_state):
"""
同步回调函数,当所有Topic的消息时间同步后调用
"""
try:
# 获取时间戳
sync_stamp = color_image.header.stamp
# 如果启用录制,录制到rosbag
if self.recording_enabled:
self.write_to_bag('/camera/color/image_raw', color_image)
self.write_to_bag('/camera/depth/image_raw', depth_image)
self.write_to_bag('/arm/status', arm_status)
self.write_to_bag('/inspire_hand/state/left_hand', left_hand_state)
self.write_to_bag('/inspire_hand/state/right_hand', right_hand_state)
self.sync_count += 1
elapsed_time = time.time() - self.start_time
if self.sync_count % 10 == 0: # 每10次同步打印一次统计
self.get_logger().info(
f'同步统计: {self.sync_count} 次同步, '
f'频率: {self.sync_count/elapsed_time:.2f} Hz'
)
except Exception as e:
self.get_logger().error(f'同步回调处理错误: {str(e)}')
def destroy_node(self):
"""重写销毁节点方法,确保正确关闭rosbag"""
if self.bag_writer is not None:
try:
# 这里需要调用rosbag2_py的关闭方法
# 注意: SequentialWriter没有显式的close方法,依赖析构函数
self.bag_writer = None
self.get_logger().info(f'Rosbag录制已完成: {self.bag_path}')
except Exception as e:
self.get_logger().error(f'关闭rosbag失败: {str(e)}')
super().destroy_node()
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='多Topic时间同步节点')
parser.add_argument(
'--record',
action='store_true',
help='启用rosbag录制功能'
)
return parser.parse_args()
def main(args=None):
# 解析命令行参数
cli_args = parse_arguments()
rclpy.init(args=args)
# 检查rosbag2可用性
if cli_args.record and not ROSBAG2_AVAILABLE:
print("错误: rosbag2_py 不可用,无法启用录制功能")
print("请安装: sudo apt install ros-{你的ROS2版本}-rosbag2-py")
return 1
# 创建节点,传入录制标志
node = MultiTopicSyncNode(record_flag=cli_args.record)
try:
# 运行节点
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('收到中断信号,正在关闭...')
finally:
# 打印最终统计
if node.sync_count > 0:
elapsed_time = time.time() - node.start_time
node.get_logger().info(
f'最终统计: {node.sync_count} 次同步, '
f'平均频率: {node.sync_count/elapsed_time:.2f} Hz'
)
# 清理资源
node.destroy_node()
rclpy.shutdown()
return 0
if __name__ == '__main__':
sys.exit(main())
其他补充
- ros2录制bag:
ros2 bag record -o BAG_NAME /arm/status /camera/color/image_raw - 查看录制出话题的信息,比如话题记录的时间,大小,类型,数量:
ros2 bag info BAG_NAME