实验笔记之——动捕服数据采集

2025-11-01

引言

之前博客对动捕服的配置及使用做了说明。

本博文对动捕服数据采集对流程做记录~

本博文仅供本人学习记录用~

数采命令及代码:

首先开启主控,然后开启映射代码

cd ~/vla/nr-retargeting-demo
# 运行下面命令前,动捕人员的手臂最好摆成机器人本体当前的样子
python ./python_demo/mocap_retargeting_arm_ros_30.py --config-root config tiangong2
python time_consistency_sub

天工机器人配置

摄像头开启

  1. 摇动机器人的头部位置
ros2 topic pub /head/cmd_pos bodyctrl_msgs/msg/CmdSetMotorPosition "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: ''}, cmds: [{name: 1, pos: 0, spd: 1, cur: 1},{name: 2, pos: 0, spd: 1, cur: 1},{name: 3, pos: 0, spd: 1, cur: 1}]}"
  1. 打开天工机器人摄像头(对应IP为10.1.52.63)

PS:摄像头对应的配置文件在:”/home/nvidia/orbbec_camera_ros2.bk.20250812_173917/install/orbbec_camera/share/orbbec_camera/launch/gemini_330_series.launch.py”

# 打开摄像头
./start_tiangong_head_camera.sh

# 用MobaXterm开启rviz查看图片
source /opt/ros/humble/setup.bash
rviz2

查看所有的topic,ros2 topic list

/camera/accel/imu_info
/camera/color/camera_info
/camera/color/image_raw
/camera/color/metadata
/camera/depth/camera_info
/camera/depth/image_raw
/camera/depth/metadata
/camera/depth_filter_status
/camera/depth_to_accel
/camera/depth_to_color
/camera/depth_to_gyro
/camera/gyro/imu_info
/camera/gyro_accel/sample
/clicked_point
/diagnostics
/goal_pose
/initialpose
/parameter_events
/rosout
/tf
/tf_static

一些常用命令记录

# 查看消息内容
ros2 topic echo /camera/color/image_raw
# 单纯查看时间戳
ros2 topic echo /camera/color/image_raw | grep -A 3 "stamp"
ros2 topic echo /camera/color/image_raw --field header.stamp
ros2 topic echo /camera/color/image_raw --field height #查看图像高度
ros2 topic echo /camera/color/image_raw --field width  #查看图像宽度

# 查看话题的发布者和订阅者数量等基本信息
ros2 topic info /camera/color/image_raw
# 确认消息类型定义,查看 header 字段的位置
ros2 interface show sensor_msgs/msg/Image

发布的图像分辨率为720*1280,

/camera/color/image_raw频率为30HZ左右

/camera/depth/image_raw频率为25HZ

机械臂开启

  1. 开启主控
ssh tg-x86-root
# 开启tmux,因为怕主控突然断掉
tmux
ros2 launch body_control body.launch.py

开启后,在x86或者orin上均可查询到机械臂的topic

/alarm
/arm/cmd_ctrl
/arm/cmd_current
/arm/cmd_dis
/arm/cmd_pos
/arm/cmd_set_zero
/arm/cmd_vel
/arm/motor_status
/arm/status
/arm_6dof_left
/arm_6dof_right
/audio/config/notify
/bodycontrol_state
/head/cmd_ctrl
/head/cmd_dis
/head/cmd_pos
/head/cmd_set_zero
/head/cmd_vel
/head/motor_status
/head/status
/imu
/imu/offset_notify
/imu/status
/inspire_hand/ctrl/left_hand
/inspire_hand/ctrl/right_hand
/inspire_hand/error/left_hand
/inspire_hand/error/right_hand
/inspire_hand/state/left_hand
/inspire_hand/state/right_hand
/leg/cmd_ctrl
/leg/cmd_dis
/leg/cmd_pos
/leg/cmd_set_zero
/leg/cmd_vel
/leg/motor_status
/leg/status
/parameter_events
/power/battery/status
/power/board/ctrl
/power/board/key_status
/power/board/status
/proc_manager/config/notify
/rosout
/sbus_data
/sbus_data/event
/waist/cmd_ctrl
/waist/cmd_dis
/waist/cmd_pos
/waist/cmd_set_zero
/waist/cmd_vel
/waist/motor_status
/waist/status

/arm/status的频率为500HZ

/inspire_hand/state/left_hand的频率为25HZ左右

键盘控制机械手的张合

  • 通过下面命令来实现键盘控制手的张合
ros2 run tg_arm_move hand_motion_service_node.py
python ~/ws_tiangong_roobot/hand_motion_keyboard_control.py
  • hand_motion_keyboard_control.py代码:
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from tg_arm_move.srv import PlayHandMotion
from std_msgs.msg import Header
import sys
import termios
import tty

class HandMotionKeyboardControl(Node):
    def __init__(self):
        super().__init__('hand_motion_keyboard_control')

        # Create service client
        self.client = self.create_client(PlayHandMotion, '/play_hand_motion')

        # Create publisher for action keys
        self.action_publisher = self.create_publisher(Header, '/hand_motion_keyboard_action', 10)

        # Wait for service to be available
        self.get_logger().info('Waiting for /play_hand_motion service...')
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service not available, waiting...')

        self.get_logger().info('Service is available!')
        self.get_logger().info('Right hand: "o" for open_for_grasp, "g" for grasp')
        self.get_logger().info('Left hand:  "i" for open_for_grasp, "f" for grasp')
        self.get_logger().info('Press "q" to quit')

        # Flag to track if a request is in progress
        self.request_in_progress = False

    def call_service(self, hand, action_name):
        """Call the service with the specified hand and action in blocking mode."""
        if self.request_in_progress:
            self.get_logger().warn('Request already in progress, please wait...')
            return

        self.request_in_progress = True

        # Create request
        request = PlayHandMotion.Request()
        request.hand = hand
        request.action_names = [action_name]
        request.delay = 0.0

        self.get_logger().info(f'Calling service with hand: {hand}, action: {action_name}')

        # Call service synchronously (blocking)
        future = self.client.call_async(request)

        # Wait for the result (blocking)
        rclpy.spin_until_future_complete(self, future)

        if future.result() is not None:
            response = future.result()
            self.get_logger().info(f'Service call completed: {response}')
        else:
            self.get_logger().error(f'Service call failed: {future.exception()}')

        self.request_in_progress = False

    def publish_action_key(self, key):
        """Publish the action key to the topic."""
        msg = Header()
        msg.stamp = self.get_clock().now().to_msg()
        msg.frame_id = key
        self.action_publisher.publish(msg)
        self.get_logger().info(f'Published action key: {key} at timestamp: {msg.stamp.sec}.{msg.stamp.nanosec}')

    def get_key(self):
        """Get a single keypress from the user."""
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            tty.setraw(fd)
            key = sys.stdin.read(1)
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
        return key

    def run(self):
        """Main loop to listen for keyboard input."""
        try:
            while rclpy.ok():
                key = self.get_key()

                # Make it case insensitive
                key_lower = key.lower()

                if key_lower == 'o':
                    self.publish_action_key(key_lower)
                    self.call_service('right', 'open_for_grasp')
                elif key_lower == 'g':
                    self.publish_action_key(key_lower)
                    self.call_service('right', 'grasp')
                elif key_lower == 'i':
                    self.publish_action_key(key_lower)
                    self.call_service('left', 'open_for_grasp')
                elif key_lower == 'f':
                    self.publish_action_key(key_lower)
                    self.call_service('left', 'grasp')
                elif key_lower == 'q':
                    self.get_logger().info('Quitting...')
                    break
                elif key == '\x03':  # Ctrl+C
                    break
                else:
                    self.get_logger().info(f'Unknown key: {key}')
        except KeyboardInterrupt:
            self.get_logger().info('Keyboard interrupt received')

def main(args=None):
    rclpy.init(args=args)

    node = HandMotionKeyboardControl()

    try:
        node.run()
    except Exception as e:
        node.get_logger().error(f'Error: {e}')
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

数据录制

订阅topic并且进行时间同步与记录

消息的类型如下:

/camera/color/image_raw (Type: sensor_msgs/msg/Image)
/camera/depth/image_raw (Type: sensor_msgs/msg/Image)

/arm/motor_status (Type: bodyctrl_msgs/msg/MotorStatusMsg)
/arm/status (Type: bodyctrl_msgs/msg/MotorStatusMsg)
# /arm_6dof_left (Type: geometry_msgs/msg/WrenchStamped)
# /arm_6dof_right (Type: geometry_msgs/msg/WrenchStamped)

/inspire_hand/state/left_hand (Type: sensor_msgs/msg/JointState)
/inspire_hand/state/right_hand(Type: sensor_msgs/msg/JointState)

实现代码如下:

```p y import rclpy from rclpy.node import Node import message_filters #进行时间同步 from sensor_msgs.msg import Image, JointState # JointState对应于/inspire_hand/state/left_hand和/inspire_hand/state/right_hand from geometry_msgs.msg import WrenchStamped #对应于/arm_6dof_left和/arm_6dof_right消息 from bodyctrl_msgs.msg import MotorStatusMsg #对应于 /arm/status import threading import time import os from datetime import datetime import argparse import sys

导入rosbag2相关模块

try: import rosbag2_py from rosbag2_py import SequentialWriter, StorageOptions, ConverterOptions ROSBAG2_AVAILABLE = True except ImportError: ROSBAG2_AVAILABLE = False print(“警告: rosbag2_py 不可用,录制功能将禁用!!!!!!”)

class MultiTopicSyncNode(Node): def init(self, record_flag=False): super().init(‘multi_topic_sync_node’)

    # 录制标志
    self.record_flag = record_flag
    
    # 定义需要订阅的topic列表
    self.required_topics = [
        '/camera/color/image_raw',
        # '/camera/depth/image_raw', 
        '/arm/status' #,
        # '/inspire_hand/state/left_hand',
        # '/inspire_hand/state/right_hand'
    ]
    
    # 用于跟踪已确认的topic
    self.confirmed_topics = {}
    self.confirmation_lock = threading.Lock()
    
    # 创建单次topic确认订阅器
    self.confirmation_subscribers = []
    
    # 主同步订阅器(稍后初始化)
    self.sync_subscribers = {}
    self.ts = None
    self.sync_initialized = False
    
    # Rosbag2录制相关变量
    self.bag_writer = None
    self.recording_enabled = self.record_flag and ROSBAG2_AVAILABLE
    self.bag_path = None
    
    # 同步统计
    self.sync_count = 0
    self.start_time = time.time()
    
    # # 如果启用录制,创建bag路径
    # if self.recording_enabled:
    #     self.bag_path = self.create_bag_path()
    
    self.get_logger().info(f'开始确认topic可用性... 录制模式: {"启用" if self.recording_enabled else "禁用"}')

    #调用函数,首先确认哪些topic可用
    self.start_topic_confirmation()
    
def create_bag_path(self):
    """创建rosbag存储路径 - 修改为返回路径但不创建目录"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    bag_dir = f"vla_sync_data_{timestamp}"
    # 移除了 os.makedirs(bag_dir, exist_ok=True)
    self.get_logger().info(f'Rosbag将保存到: {os.path.abspath(bag_dir)}')
    return bag_dir
    
def start_topic_confirmation(self):
    """为每个topic创建临时订阅器来确认其存在和活跃"""
    for topic in self.required_topics:
        # 根据topic名称确定消息类型
        msg_type = self.get_message_type(topic)#根据topic的名字来获取消息类型
        if msg_type:
            # 创建单次确认订阅器
            sub = self.create_subscription(
                msg_type,
                topic,
                self.create_topic_callback(topic),
                10  # QoS队列大小
            )
            self.confirmation_subscribers.append(sub)
            self.get_logger().info(f'正在确认topic: {topic}')
        else:
            self.get_logger().warning(f'报错!!!无法确定topic {topic}的消息类型')
    
    # 启动超时检查
    self.confirmation_timer = self.create_timer(5.0, self.check_confirmation_status)
    
def get_message_type(self, topic):
    """根据topic名称返回对应的消息类型"""
    if 'image_raw' in topic:
        return Image
    elif 'hand/state' in topic:
        return JointState
    elif 'arm/status' in topic:
        return MotorStatusMsg
    else:
        return None

def create_topic_callback(self, topic_name):
    """为每个topic创建确认回调函数"""
    def topic_confirmation_callback(msg):
        with self.confirmation_lock:
            if topic_name not in self.confirmed_topics:
                self.confirmed_topics[topic_name] = True
                
                # 计算确认百分比
                confirmed_percent = (len(self.confirmed_topics) / len(self.required_topics)) * 100
                
                self.get_logger().info(
                    f'✓ 确认topic可用: {topic_name} '
                    f'({len(self.confirmed_topics)}/{len(self.required_topics)}, {confirmed_percent:.1f}%)'
                )
                
                # 检查是否所有topic都已确认,如果确认就开始订阅。
                self.check_all_topics_confirmed()
    return topic_confirmation_callback

def check_all_topics_confirmed(self):
    """检查是否所有需要的topic都已确认"""
    confirmed_count = len(self.confirmed_topics)
    total_count = len(self.required_topics)
    
    confirmed_percent = (confirmed_count / total_count) * 100
    
    # 根据确认的topic百分比决定是否开始同步
    if confirmed_percent >= 100.0 and not self.sync_initialized:
        self.get_logger().info('所有topic都已确认(100%),开始初始化时间同步器...')
        self.initialize_synchronizer()
    elif confirmed_percent <100.0 and not self.sync_initialized:
        self.get_logger().warning(
            f'已确认{confirmed_percent:.1f}%的topic ({confirmed_count}/{total_count}),'
            f'开始部分同步(缺少: {[t for t in self.required_topics if t not in self.confirmed_topics]})'
        )
        return

def check_confirmation_status(self):
    """定时检查topic确认状态"""
    if self.sync_initialized:
        self.confirmation_timer.cancel()
        return
        
    confirmed_count = len(self.confirmed_topics)
    total_count = len(self.required_topics)
    confirmed_percent = (confirmed_count / total_count) * 100
    
    missing_topics = [topic for topic in self.required_topics if topic not in self.confirmed_topics]
    if missing_topics:
        self.get_logger().warning(
            f'等待以下topic ({confirmed_percent:.1f}% 已确认): {missing_topics}'
        )

        # 如果录制已启用但topic不完整,发出警告
        if self.recording_enabled:
            self.get_logger().warning(
                f'录制功能已启用,但topic不完整({confirmed_percent:.1f}%),无法开始录制'
            )
    else:
        self.get_logger().info('所有topic确认完成!')

def initialize_synchronizer(self):
    """初始化时间同步器"""
    try:
        # 创建同步订阅器
        self.sync_subscribers['color_image'] = message_filters.Subscriber(self, Image, '/camera/color/image_raw')
        self.sync_subscribers['depth_image'] = message_filters.Subscriber(self, Image, '/camera/depth/image_raw')
        self.sync_subscribers['arm_status'] = message_filters.Subscriber(self, MotorStatusMsg, '/arm/status')
        self.sync_subscribers['left_hand'] = message_filters.Subscriber(self, JointState, '/inspire_hand/state/left_hand')
        self.sync_subscribers['right_hand'] = message_filters.Subscriber(self, JointState, '/inspire_hand/state/right_hand')
        
        # 创建近似时间同步器
        self.ts = message_filters.ApproximateTimeSynchronizer(
            list(self.sync_subscribers.values()),
            queue_size=20,
            slop=0.1  # 100ms的时间容差
        )
        
        # 注册同步回调函数
        self.ts.registerCallback(self.sync_callback)
        
        # 如果启用录制,初始化rosbag录制
        if self.recording_enabled:
            self.initialize_rosbag_recorder() #进行rosbag的录制
        
        self.sync_initialized = True
        
        # 清理确认用的临时订阅器
        for sub in self.confirmation_subscribers:
            self.destroy_subscription(sub)
        self.confirmation_subscribers.clear()
        
        self.get_logger().info('✓ 多Topic时间同步器已成功启动并运行')
        
    except Exception as e:
        self.get_logger().error(f'初始化时间同步器失败: {str(e)}')

def initialize_rosbag_recorder(self):
    """初始化rosbag录制器"""
    try:
        # 在初始化录制器时才创建路径
        if self.bag_path is None:
            self.bag_path = self.create_bag_path()

        storage_options = StorageOptions(
            uri=self.bag_path,
            storage_id='sqlite3'
        )
        converter_options = ConverterOptions(
            input_serialization_format='cdr',
            output_serialization_format='cdr'
        )
        
        self.bag_writer = SequentialWriter()
        self.bag_writer.open(storage_options, converter_options)
        
        # 为每个确认的topic创建录制通道
        topic_types = {
            '/camera/color/image_raw': 'sensor_msgs/msg/Image',
            # '/camera/depth/image_raw': 'sensor_msgs/msg/Image',
            '/arm/status': 'bodyctrl_msgs/msg/MotorStatusMsg' #,
            # '/inspire_hand/state/left_hand': 'sensor_msgs/msg/JointState',
            # '/inspire_hand/state/right_hand': 'sensor_msgs/msg/JointState'
        }
        
        for topic in self.confirmed_topics:
            if topic in topic_types:
                # 创建 TopicMetadata 对象
                topic_metadata = rosbag2_py.TopicMetadata(
                    name=topic,
                    type=topic_types[topic],
                    serialization_format='cdr'
                )
                self.bag_writer.create_topic(topic_metadata)
        
        self.get_logger().info(f'✓ Rosbag录制已启动: {self.bag_path}')
        
    except Exception as e:
        self.get_logger().error(f'初始化rosbag录制失败: {str(e)}')
        self.bag_writer = None

def write_to_bag(self, topic, msg):
    """将消息写入rosbag"""
    if self.bag_writer is not None:
        try:
            # 序列化消息
            from rclpy.serialization import serialize_message
            serialized_msg = serialize_message(msg)
            
            # 获取时间戳(转换为纳秒)
            if hasattr(msg, 'header') and hasattr(msg.header, 'stamp'):
                timestamp = msg.header.stamp
                timestamp_ns = timestamp.sec * 10**9 + timestamp.nanosec
            else:
                now = self.get_clock().now()
                timestamp_ns = now.nanoseconds
            
            # 使用正确的参数格式写入
            self.bag_writer.write(topic, serialized_msg, timestamp_ns)
            
        except Exception as e:
            self.get_logger().error(f'{str(topic)}写入rosbag失败')

def sync_callback(self, color_image, depth_image, arm_status, left_hand_state, right_hand_state):
    """
    同步回调函数,当所有Topic的消息时间同步后调用
    """
    try:
        # 获取时间戳
        sync_stamp = color_image.header.stamp
        
        # 如果启用录制,录制到rosbag
        if self.recording_enabled:
            self.write_to_bag('/camera/color/image_raw', color_image)
            # self.write_to_bag('/camera/depth/image_raw', depth_image)
            self.write_to_bag('/arm/status', arm_status)
            # self.write_to_bag('/inspire_hand/state/left_hand', left_hand_state)
            # self.write_to_bag('/inspire_hand/state/right_hand', right_hand_state)
        
        self.sync_count += 1
        elapsed_time = time.time() - self.start_time
        
        if self.sync_count % 10 == 0:  # 每10次同步打印一次统计
            self.get_logger().info(
                f'同步统计: {self.sync_count} 次同步, '
                f'频率: {self.sync_count/elapsed_time:.2f} Hz'
            )
        
    except Exception as e:
        self.get_logger().error(f'同步回调处理错误: {str(e)}')

def destroy_node(self):
    """重写销毁节点方法,确保正确关闭rosbag"""
    if self.bag_writer is not None:
        try:
            # 这里需要调用rosbag2_py的关闭方法
            # 注意: SequentialWriter没有显式的close方法,依赖析构函数
            self.bag_writer = None
            self.get_logger().info(f'Rosbag录制已完成: {self.bag_path}')
        except Exception as e:
            self.get_logger().error(f'关闭rosbag失败: {str(e)}')
    
    super().destroy_node()

def parse_arguments(): “"”解析命令行参数””” parser = argparse.ArgumentParser(description=’多Topic时间同步节点’) parser.add_argument( ‘–record’, action=’store_true’, help=’启用rosbag录制功能’ ) return parser.parse_args()

def main(args=None): # 解析命令行参数 cli_args = parse_arguments()

rclpy.init(args=args)

# 检查rosbag2可用性
if cli_args.record and not ROSBAG2_AVAILABLE:
    print("错误: rosbag2_py 不可用,无法启用录制功能")
    print("请安装: sudo apt install ros-{你的ROS2版本}-rosbag2-py")
    return 1

# 创建节点,传入录制标志
node = MultiTopicSyncNode(record_flag=cli_args.record)

try:
    # 运行节点
    rclpy.spin(node)
except KeyboardInterrupt:
    node.get_logger().info('收到中断信号,正在关闭...')
finally:
    # 打印最终统计
    if node.sync_count > 0:
        elapsed_time = time.time() - node.start_time
        node.get_logger().info(
            f'最终统计: {node.sync_count} 次同步, '
            f'平均频率: {node.sync_count/elapsed_time:.2f} Hz'
        )
    
    # 清理资源
    node.destroy_node()
    rclpy.shutdown()

return 0

if name == ‘main’: sys.exit(main())


运行命令 `python3 sync_camera_joint.py --record`。其中,添加了--record flag就表示正式录制。而下面命令则是查看rosbag:`ros2 bag info ROSBAG_NAME`

### SSH脚本一键启动

脚本内容如下,通过运行sh \~/vla/nr-retargeting-demo/data\_collection.sh控制tmux来实现:

```bash
#! /bin/bash

echo "正在启动数据采集流程..."

# 关闭所有名为 data_collection 的 tmux 会话
echo "关闭所有现有的 data_collection 会话..."
for session in $(tmux list-sessions -F "#{session_name}" 2>/dev/null | grep "data_collection"); do
    echo "关闭会话: $session"
    tmux kill-session -t "$session"
done

# 创建新的 tmux 会话
tmux new-session -d -s data_collection -n "ssh_head"

# 第一个窗口:SSH 连接并运行头部摄像头脚本
echo "设置 SSH 连接..."
tmux new-session -d -s data_collection -n "ssh_head"
tmux send-keys -t data_collection "sshpass -p ',624^Y)qENwVu;OD' ssh -o StrictHostKeyChecking=no nvidia@10.1.52.63" Enter
sleep 3  # 等待 SSH 连接建立
tmux send-keys -t data_collection "./start_tiangong_head_camera.sh" Enter

# # 第二个窗口:运行 mocap retargeting
# tmux new-session -d -s data_collection_1 -n "mocap"
# tmux send-keys -t data_collection_1 "cd ~/vla/nr-retargeting-demo" Enter
# tmux send-keys -t data_collection_1 "python ./python_demo/mocap_retargeting_arm_ros.py --config-root config tiangong2" Enter

# 第三个窗口:运行手部运动控制
tmux new-session -d -s data_collection_2 -n "hand_control_node"
tmux send-keys -t data_collection_2 "ros2 run tg_arm_move hand_motion_service_node.py" Enter

# # 第四个终端:手部开合控制:oi左右开,gf左右合
# tmux new-session -d -s data_collection_3 -n "hand_control"
# tmux send-keys -t data_collection_3 "python hand_motion_keyboard_control.py" Enter

echo "所有 tmux 会话已创建:"
echo "  data_collection    - SSH 头部摄像头"
echo "  data_collection_1  - Mocap Retargeting"
echo "  data_collection_2  - 手部运动控制节点"
echo "  data_collection_3  - 手部键盘控制"
echo ""
echo "使用以下命令查看会话:"
echo "  tmux list-sessions"
echo "  tmux attach -t data_collection    (查看 SSH 头部摄像头)"
echo "  tmux attach -t data_collection_1  (查看 Mocap Retargeting)"
echo "  tmux attach -t data_collection_2  (查看手部运动控制节点)"
echo "  tmux attach -t data_collection_3  (查看手部键盘控制)"

rosbag录制

python3 ~/vla/nr-retargeting-demo/record_rosbag2.py left_hand_experiment /arm/status /camera/color/image_raw

  • record_rosbag2.py的代码如下
#!/usr/bin/env python3
import subprocess
import sys
import argparse
from datetime import datetime
import os

def record_rosbag(exp_name, topics):
    """
    录制 ROS2 bag,自动在实验名称后添加时间戳
    
    Args:
        exp_name: 实验名称
        topics: 要录制的主题列表
    """
    # 获取当前时间戳
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    
    # 构建输出文件名
    output_name = f"{exp_name}_{timestamp}"
    
    # 构建命令
    cmd = ["ros2", "bag", "record", "-o", output_name] + topics
    
    print(f"开始录制 ROS2 bag: {output_name}")
    print(f"录制主题: {', '.join(topics)}")
    print(f"命令: {' '.join(cmd)}")
    
    # 执行命令
    try:
        subprocess.run(cmd, check=True)
    except subprocess.CalledProcessError as e:
        print(f"录制过程中出错: {e}")
    except KeyboardInterrupt:
        print("\n录制已停止")

def main():
    parser = argparse.ArgumentParser(description="录制 ROS2 bag 并自动添加时间戳")
    parser.add_argument("exp_name", help="实验名称")
    parser.add_argument("topics", nargs="+", help="要录制的主题")
    
    args = parser.parse_args()
    
    record_rosbag(args.exp_name, args.topics)

if __name__ == "__main__":
    main()

流程mark

完整流程

  1. 开启天工机器人主控
    ssh tg-x86-root
    tmux
    ros2 launch body_control body.launch.py
    
  2. 开启摄像头
ssh 10.1.52.63

./start_tiangong_head_camera.sh

  1. 开启映射代码:
ssh tg-x86
cd ~/vla/nr-retargeting-demo
# 运行下面命令前,动捕人员的手臂最好摆成机器人本体当前的样子
python ./python_demo/mocap_retargeting_arm_ros.py --config-root config tiangong2

# python time_consistency_sub
  1. 机械手开合
ros2 run tg_arm_move hand_motion_service_node.py

python ~/ws_tiangong_roobot/hand_motion_keyboard_control.py
  1. 数据包录制
# ros2 bag record -o 2025-11-05_left_1 /arm/status /camera/color/image_raw
python3 record_rosbag2.py exp_name /arm/status /camera/color/image_raw
  1. 其他:
# ros2 topic pub /head/cmd_pos bodyctrl_msgs/msg/CmdSetMotorPosition "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: ''}, cmds: [{name: 1, pos: 0, spd: 1, cur: 1},{name: 2, pos: 0, spd: 1, cur: 1},{name: 3, pos: 0, spd: 1, cur: 1}]}"

ros2 topic pub /head/cmd_pos bodyctrl_msgs/msg/CmdSetMotorPosition "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: ''}, cmds: [{name: 1, pos: 0, spd: 1, cur: 1},{name: 2, pos: 0.35, spd: 1, cur: 1},{name: 3, pos: 0, spd: 1, cur: 1}]}"

ros2 bag info ***
python3 sync_camera_joint.py --record

simple vesrion

  1. 开启天工机器人主控
    ssh tg-x86-root
    tmux
    ros2 launch body_control body.launch.py
    
  2. sh ~/vla/nr-retargeting-demo/data_collection.sh

  3. 开启映射代码:
cd ~/vla/nr-retargeting-demo
# 运行下面命令前,动捕人员的手臂最好摆成机器人本体当前的样子
python ./python_demo/mocap_retargeting_arm_ros.py --config-root config tiangong2
  1. 数据包录制
cd ~/vla/nr-retargeting-demo
python3 record_rosbag2.py exp_name /arm/status /camera/color/image_raw

其他补充

  • 动捕服重映射Link
  • 运控程序Link