引言
之前博客对动捕服的配置及使用做了说明。
本博文对动捕服数据采集对流程做记录~
本博文仅供本人学习记录用~
数采命令及代码:
首先开启主控,然后开启映射代码
cd ~/vla/nr-retargeting-demo
# 运行下面命令前,动捕人员的手臂最好摆成机器人本体当前的样子
python ./python_demo/mocap_retargeting_arm_ros_30.py --config-root config tiangong2
python time_consistency_sub
天工机器人配置
摄像头开启
- 摇动机器人的头部位置
ros2 topic pub /head/cmd_pos bodyctrl_msgs/msg/CmdSetMotorPosition "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: ''}, cmds: [{name: 1, pos: 0, spd: 1, cur: 1},{name: 2, pos: 0, spd: 1, cur: 1},{name: 3, pos: 0, spd: 1, cur: 1}]}"
- 打开天工机器人摄像头(对应IP为10.1.52.63)
PS:摄像头对应的配置文件在:”/home/nvidia/orbbec_camera_ros2.bk.20250812_173917/install/orbbec_camera/share/orbbec_camera/launch/gemini_330_series.launch.py”
# 打开摄像头
./start_tiangong_head_camera.sh
# 用MobaXterm开启rviz查看图片
source /opt/ros/humble/setup.bash
rviz2
查看所有的topic,ros2 topic list
/camera/accel/imu_info
/camera/color/camera_info
/camera/color/image_raw
/camera/color/metadata
/camera/depth/camera_info
/camera/depth/image_raw
/camera/depth/metadata
/camera/depth_filter_status
/camera/depth_to_accel
/camera/depth_to_color
/camera/depth_to_gyro
/camera/gyro/imu_info
/camera/gyro_accel/sample
/clicked_point
/diagnostics
/goal_pose
/initialpose
/parameter_events
/rosout
/tf
/tf_static
一些常用命令记录
# 查看消息内容
ros2 topic echo /camera/color/image_raw
# 单纯查看时间戳
ros2 topic echo /camera/color/image_raw | grep -A 3 "stamp"
ros2 topic echo /camera/color/image_raw --field header.stamp
ros2 topic echo /camera/color/image_raw --field height #查看图像高度
ros2 topic echo /camera/color/image_raw --field width #查看图像宽度
# 查看话题的发布者和订阅者数量等基本信息
ros2 topic info /camera/color/image_raw
# 确认消息类型定义,查看 header 字段的位置
ros2 interface show sensor_msgs/msg/Image
发布的图像分辨率为720*1280,
/camera/color/image_raw频率为30HZ左右
/camera/depth/image_raw频率为25HZ
机械臂开启
- 开启主控
ssh tg-x86-root
# 开启tmux,因为怕主控突然断掉
tmux
ros2 launch body_control body.launch.py
开启后,在x86或者orin上均可查询到机械臂的topic
/alarm
/arm/cmd_ctrl
/arm/cmd_current
/arm/cmd_dis
/arm/cmd_pos
/arm/cmd_set_zero
/arm/cmd_vel
/arm/motor_status
/arm/status
/arm_6dof_left
/arm_6dof_right
/audio/config/notify
/bodycontrol_state
/head/cmd_ctrl
/head/cmd_dis
/head/cmd_pos
/head/cmd_set_zero
/head/cmd_vel
/head/motor_status
/head/status
/imu
/imu/offset_notify
/imu/status
/inspire_hand/ctrl/left_hand
/inspire_hand/ctrl/right_hand
/inspire_hand/error/left_hand
/inspire_hand/error/right_hand
/inspire_hand/state/left_hand
/inspire_hand/state/right_hand
/leg/cmd_ctrl
/leg/cmd_dis
/leg/cmd_pos
/leg/cmd_set_zero
/leg/cmd_vel
/leg/motor_status
/leg/status
/parameter_events
/power/battery/status
/power/board/ctrl
/power/board/key_status
/power/board/status
/proc_manager/config/notify
/rosout
/sbus_data
/sbus_data/event
/waist/cmd_ctrl
/waist/cmd_dis
/waist/cmd_pos
/waist/cmd_set_zero
/waist/cmd_vel
/waist/motor_status
/waist/status
/arm/status的频率为500HZ
/inspire_hand/state/left_hand的频率为25HZ左右
键盘控制机械手的张合
- 通过下面命令来实现键盘控制手的张合
ros2 run tg_arm_move hand_motion_service_node.py
python ~/ws_tiangong_roobot/hand_motion_keyboard_control.py
- hand_motion_keyboard_control.py代码:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from tg_arm_move.srv import PlayHandMotion
from std_msgs.msg import Header
import sys
import termios
import tty
class HandMotionKeyboardControl(Node):
def __init__(self):
super().__init__('hand_motion_keyboard_control')
# Create service client
self.client = self.create_client(PlayHandMotion, '/play_hand_motion')
# Create publisher for action keys
self.action_publisher = self.create_publisher(Header, '/hand_motion_keyboard_action', 10)
# Wait for service to be available
self.get_logger().info('Waiting for /play_hand_motion service...')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
self.get_logger().info('Service is available!')
self.get_logger().info('Right hand: "o" for open_for_grasp, "g" for grasp')
self.get_logger().info('Left hand: "i" for open_for_grasp, "f" for grasp')
self.get_logger().info('Press "q" to quit')
# Flag to track if a request is in progress
self.request_in_progress = False
def call_service(self, hand, action_name):
"""Call the service with the specified hand and action in blocking mode."""
if self.request_in_progress:
self.get_logger().warn('Request already in progress, please wait...')
return
self.request_in_progress = True
# Create request
request = PlayHandMotion.Request()
request.hand = hand
request.action_names = [action_name]
request.delay = 0.0
self.get_logger().info(f'Calling service with hand: {hand}, action: {action_name}')
# Call service synchronously (blocking)
future = self.client.call_async(request)
# Wait for the result (blocking)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response = future.result()
self.get_logger().info(f'Service call completed: {response}')
else:
self.get_logger().error(f'Service call failed: {future.exception()}')
self.request_in_progress = False
def publish_action_key(self, key):
"""Publish the action key to the topic."""
msg = Header()
msg.stamp = self.get_clock().now().to_msg()
msg.frame_id = key
self.action_publisher.publish(msg)
self.get_logger().info(f'Published action key: {key} at timestamp: {msg.stamp.sec}.{msg.stamp.nanosec}')
def get_key(self):
"""Get a single keypress from the user."""
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
key = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return key
def run(self):
"""Main loop to listen for keyboard input."""
try:
while rclpy.ok():
key = self.get_key()
# Make it case insensitive
key_lower = key.lower()
if key_lower == 'o':
self.publish_action_key(key_lower)
self.call_service('right', 'open_for_grasp')
elif key_lower == 'g':
self.publish_action_key(key_lower)
self.call_service('right', 'grasp')
elif key_lower == 'i':
self.publish_action_key(key_lower)
self.call_service('left', 'open_for_grasp')
elif key_lower == 'f':
self.publish_action_key(key_lower)
self.call_service('left', 'grasp')
elif key_lower == 'q':
self.get_logger().info('Quitting...')
break
elif key == '\x03': # Ctrl+C
break
else:
self.get_logger().info(f'Unknown key: {key}')
except KeyboardInterrupt:
self.get_logger().info('Keyboard interrupt received')
def main(args=None):
rclpy.init(args=args)
node = HandMotionKeyboardControl()
try:
node.run()
except Exception as e:
node.get_logger().error(f'Error: {e}')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
数据录制
订阅topic并且进行时间同步与记录
消息的类型如下:
/camera/color/image_raw (Type: sensor_msgs/msg/Image)
/camera/depth/image_raw (Type: sensor_msgs/msg/Image)
/arm/motor_status (Type: bodyctrl_msgs/msg/MotorStatusMsg)
/arm/status (Type: bodyctrl_msgs/msg/MotorStatusMsg)
# /arm_6dof_left (Type: geometry_msgs/msg/WrenchStamped)
# /arm_6dof_right (Type: geometry_msgs/msg/WrenchStamped)
/inspire_hand/state/left_hand (Type: sensor_msgs/msg/JointState)
/inspire_hand/state/right_hand(Type: sensor_msgs/msg/JointState)
实现代码如下:
```p y import rclpy from rclpy.node import Node import message_filters #进行时间同步 from sensor_msgs.msg import Image, JointState # JointState对应于/inspire_hand/state/left_hand和/inspire_hand/state/right_hand from geometry_msgs.msg import WrenchStamped #对应于/arm_6dof_left和/arm_6dof_right消息 from bodyctrl_msgs.msg import MotorStatusMsg #对应于 /arm/status import threading import time import os from datetime import datetime import argparse import sys
导入rosbag2相关模块
try: import rosbag2_py from rosbag2_py import SequentialWriter, StorageOptions, ConverterOptions ROSBAG2_AVAILABLE = True except ImportError: ROSBAG2_AVAILABLE = False print(“警告: rosbag2_py 不可用,录制功能将禁用!!!!!!”)
class MultiTopicSyncNode(Node): def init(self, record_flag=False): super().init(‘multi_topic_sync_node’)
# 录制标志
self.record_flag = record_flag
# 定义需要订阅的topic列表
self.required_topics = [
'/camera/color/image_raw',
# '/camera/depth/image_raw',
'/arm/status' #,
# '/inspire_hand/state/left_hand',
# '/inspire_hand/state/right_hand'
]
# 用于跟踪已确认的topic
self.confirmed_topics = {}
self.confirmation_lock = threading.Lock()
# 创建单次topic确认订阅器
self.confirmation_subscribers = []
# 主同步订阅器(稍后初始化)
self.sync_subscribers = {}
self.ts = None
self.sync_initialized = False
# Rosbag2录制相关变量
self.bag_writer = None
self.recording_enabled = self.record_flag and ROSBAG2_AVAILABLE
self.bag_path = None
# 同步统计
self.sync_count = 0
self.start_time = time.time()
# # 如果启用录制,创建bag路径
# if self.recording_enabled:
# self.bag_path = self.create_bag_path()
self.get_logger().info(f'开始确认topic可用性... 录制模式: {"启用" if self.recording_enabled else "禁用"}')
#调用函数,首先确认哪些topic可用
self.start_topic_confirmation()
def create_bag_path(self):
"""创建rosbag存储路径 - 修改为返回路径但不创建目录"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
bag_dir = f"vla_sync_data_{timestamp}"
# 移除了 os.makedirs(bag_dir, exist_ok=True)
self.get_logger().info(f'Rosbag将保存到: {os.path.abspath(bag_dir)}')
return bag_dir
def start_topic_confirmation(self):
"""为每个topic创建临时订阅器来确认其存在和活跃"""
for topic in self.required_topics:
# 根据topic名称确定消息类型
msg_type = self.get_message_type(topic)#根据topic的名字来获取消息类型
if msg_type:
# 创建单次确认订阅器
sub = self.create_subscription(
msg_type,
topic,
self.create_topic_callback(topic),
10 # QoS队列大小
)
self.confirmation_subscribers.append(sub)
self.get_logger().info(f'正在确认topic: {topic}')
else:
self.get_logger().warning(f'报错!!!无法确定topic {topic}的消息类型')
# 启动超时检查
self.confirmation_timer = self.create_timer(5.0, self.check_confirmation_status)
def get_message_type(self, topic):
"""根据topic名称返回对应的消息类型"""
if 'image_raw' in topic:
return Image
elif 'hand/state' in topic:
return JointState
elif 'arm/status' in topic:
return MotorStatusMsg
else:
return None
def create_topic_callback(self, topic_name):
"""为每个topic创建确认回调函数"""
def topic_confirmation_callback(msg):
with self.confirmation_lock:
if topic_name not in self.confirmed_topics:
self.confirmed_topics[topic_name] = True
# 计算确认百分比
confirmed_percent = (len(self.confirmed_topics) / len(self.required_topics)) * 100
self.get_logger().info(
f'✓ 确认topic可用: {topic_name} '
f'({len(self.confirmed_topics)}/{len(self.required_topics)}, {confirmed_percent:.1f}%)'
)
# 检查是否所有topic都已确认,如果确认就开始订阅。
self.check_all_topics_confirmed()
return topic_confirmation_callback
def check_all_topics_confirmed(self):
"""检查是否所有需要的topic都已确认"""
confirmed_count = len(self.confirmed_topics)
total_count = len(self.required_topics)
confirmed_percent = (confirmed_count / total_count) * 100
# 根据确认的topic百分比决定是否开始同步
if confirmed_percent >= 100.0 and not self.sync_initialized:
self.get_logger().info('所有topic都已确认(100%),开始初始化时间同步器...')
self.initialize_synchronizer()
elif confirmed_percent <100.0 and not self.sync_initialized:
self.get_logger().warning(
f'已确认{confirmed_percent:.1f}%的topic ({confirmed_count}/{total_count}),'
f'开始部分同步(缺少: {[t for t in self.required_topics if t not in self.confirmed_topics]})'
)
return
def check_confirmation_status(self):
"""定时检查topic确认状态"""
if self.sync_initialized:
self.confirmation_timer.cancel()
return
confirmed_count = len(self.confirmed_topics)
total_count = len(self.required_topics)
confirmed_percent = (confirmed_count / total_count) * 100
missing_topics = [topic for topic in self.required_topics if topic not in self.confirmed_topics]
if missing_topics:
self.get_logger().warning(
f'等待以下topic ({confirmed_percent:.1f}% 已确认): {missing_topics}'
)
# 如果录制已启用但topic不完整,发出警告
if self.recording_enabled:
self.get_logger().warning(
f'录制功能已启用,但topic不完整({confirmed_percent:.1f}%),无法开始录制'
)
else:
self.get_logger().info('所有topic确认完成!')
def initialize_synchronizer(self):
"""初始化时间同步器"""
try:
# 创建同步订阅器
self.sync_subscribers['color_image'] = message_filters.Subscriber(self, Image, '/camera/color/image_raw')
self.sync_subscribers['depth_image'] = message_filters.Subscriber(self, Image, '/camera/depth/image_raw')
self.sync_subscribers['arm_status'] = message_filters.Subscriber(self, MotorStatusMsg, '/arm/status')
self.sync_subscribers['left_hand'] = message_filters.Subscriber(self, JointState, '/inspire_hand/state/left_hand')
self.sync_subscribers['right_hand'] = message_filters.Subscriber(self, JointState, '/inspire_hand/state/right_hand')
# 创建近似时间同步器
self.ts = message_filters.ApproximateTimeSynchronizer(
list(self.sync_subscribers.values()),
queue_size=20,
slop=0.1 # 100ms的时间容差
)
# 注册同步回调函数
self.ts.registerCallback(self.sync_callback)
# 如果启用录制,初始化rosbag录制
if self.recording_enabled:
self.initialize_rosbag_recorder() #进行rosbag的录制
self.sync_initialized = True
# 清理确认用的临时订阅器
for sub in self.confirmation_subscribers:
self.destroy_subscription(sub)
self.confirmation_subscribers.clear()
self.get_logger().info('✓ 多Topic时间同步器已成功启动并运行')
except Exception as e:
self.get_logger().error(f'初始化时间同步器失败: {str(e)}')
def initialize_rosbag_recorder(self):
"""初始化rosbag录制器"""
try:
# 在初始化录制器时才创建路径
if self.bag_path is None:
self.bag_path = self.create_bag_path()
storage_options = StorageOptions(
uri=self.bag_path,
storage_id='sqlite3'
)
converter_options = ConverterOptions(
input_serialization_format='cdr',
output_serialization_format='cdr'
)
self.bag_writer = SequentialWriter()
self.bag_writer.open(storage_options, converter_options)
# 为每个确认的topic创建录制通道
topic_types = {
'/camera/color/image_raw': 'sensor_msgs/msg/Image',
# '/camera/depth/image_raw': 'sensor_msgs/msg/Image',
'/arm/status': 'bodyctrl_msgs/msg/MotorStatusMsg' #,
# '/inspire_hand/state/left_hand': 'sensor_msgs/msg/JointState',
# '/inspire_hand/state/right_hand': 'sensor_msgs/msg/JointState'
}
for topic in self.confirmed_topics:
if topic in topic_types:
# 创建 TopicMetadata 对象
topic_metadata = rosbag2_py.TopicMetadata(
name=topic,
type=topic_types[topic],
serialization_format='cdr'
)
self.bag_writer.create_topic(topic_metadata)
self.get_logger().info(f'✓ Rosbag录制已启动: {self.bag_path}')
except Exception as e:
self.get_logger().error(f'初始化rosbag录制失败: {str(e)}')
self.bag_writer = None
def write_to_bag(self, topic, msg):
"""将消息写入rosbag"""
if self.bag_writer is not None:
try:
# 序列化消息
from rclpy.serialization import serialize_message
serialized_msg = serialize_message(msg)
# 获取时间戳(转换为纳秒)
if hasattr(msg, 'header') and hasattr(msg.header, 'stamp'):
timestamp = msg.header.stamp
timestamp_ns = timestamp.sec * 10**9 + timestamp.nanosec
else:
now = self.get_clock().now()
timestamp_ns = now.nanoseconds
# 使用正确的参数格式写入
self.bag_writer.write(topic, serialized_msg, timestamp_ns)
except Exception as e:
self.get_logger().error(f'{str(topic)}写入rosbag失败')
def sync_callback(self, color_image, depth_image, arm_status, left_hand_state, right_hand_state):
"""
同步回调函数,当所有Topic的消息时间同步后调用
"""
try:
# 获取时间戳
sync_stamp = color_image.header.stamp
# 如果启用录制,录制到rosbag
if self.recording_enabled:
self.write_to_bag('/camera/color/image_raw', color_image)
# self.write_to_bag('/camera/depth/image_raw', depth_image)
self.write_to_bag('/arm/status', arm_status)
# self.write_to_bag('/inspire_hand/state/left_hand', left_hand_state)
# self.write_to_bag('/inspire_hand/state/right_hand', right_hand_state)
self.sync_count += 1
elapsed_time = time.time() - self.start_time
if self.sync_count % 10 == 0: # 每10次同步打印一次统计
self.get_logger().info(
f'同步统计: {self.sync_count} 次同步, '
f'频率: {self.sync_count/elapsed_time:.2f} Hz'
)
except Exception as e:
self.get_logger().error(f'同步回调处理错误: {str(e)}')
def destroy_node(self):
"""重写销毁节点方法,确保正确关闭rosbag"""
if self.bag_writer is not None:
try:
# 这里需要调用rosbag2_py的关闭方法
# 注意: SequentialWriter没有显式的close方法,依赖析构函数
self.bag_writer = None
self.get_logger().info(f'Rosbag录制已完成: {self.bag_path}')
except Exception as e:
self.get_logger().error(f'关闭rosbag失败: {str(e)}')
super().destroy_node()
def parse_arguments(): “"”解析命令行参数””” parser = argparse.ArgumentParser(description=’多Topic时间同步节点’) parser.add_argument( ‘–record’, action=’store_true’, help=’启用rosbag录制功能’ ) return parser.parse_args()
def main(args=None): # 解析命令行参数 cli_args = parse_arguments()
rclpy.init(args=args)
# 检查rosbag2可用性
if cli_args.record and not ROSBAG2_AVAILABLE:
print("错误: rosbag2_py 不可用,无法启用录制功能")
print("请安装: sudo apt install ros-{你的ROS2版本}-rosbag2-py")
return 1
# 创建节点,传入录制标志
node = MultiTopicSyncNode(record_flag=cli_args.record)
try:
# 运行节点
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('收到中断信号,正在关闭...')
finally:
# 打印最终统计
if node.sync_count > 0:
elapsed_time = time.time() - node.start_time
node.get_logger().info(
f'最终统计: {node.sync_count} 次同步, '
f'平均频率: {node.sync_count/elapsed_time:.2f} Hz'
)
# 清理资源
node.destroy_node()
rclpy.shutdown()
return 0
if name == ‘main’: sys.exit(main())
运行命令 `python3 sync_camera_joint.py --record`。其中,添加了--record flag就表示正式录制。而下面命令则是查看rosbag:`ros2 bag info ROSBAG_NAME`
### SSH脚本一键启动
脚本内容如下,通过运行sh \~/vla/nr-retargeting-demo/data\_collection.sh控制tmux来实现:
```bash
#! /bin/bash
echo "正在启动数据采集流程..."
# 关闭所有名为 data_collection 的 tmux 会话
echo "关闭所有现有的 data_collection 会话..."
for session in $(tmux list-sessions -F "#{session_name}" 2>/dev/null | grep "data_collection"); do
echo "关闭会话: $session"
tmux kill-session -t "$session"
done
# 创建新的 tmux 会话
tmux new-session -d -s data_collection -n "ssh_head"
# 第一个窗口:SSH 连接并运行头部摄像头脚本
echo "设置 SSH 连接..."
tmux new-session -d -s data_collection -n "ssh_head"
tmux send-keys -t data_collection "sshpass -p ',624^Y)qENwVu;OD' ssh -o StrictHostKeyChecking=no nvidia@10.1.52.63" Enter
sleep 3 # 等待 SSH 连接建立
tmux send-keys -t data_collection "./start_tiangong_head_camera.sh" Enter
# # 第二个窗口:运行 mocap retargeting
# tmux new-session -d -s data_collection_1 -n "mocap"
# tmux send-keys -t data_collection_1 "cd ~/vla/nr-retargeting-demo" Enter
# tmux send-keys -t data_collection_1 "python ./python_demo/mocap_retargeting_arm_ros.py --config-root config tiangong2" Enter
# 第三个窗口:运行手部运动控制
tmux new-session -d -s data_collection_2 -n "hand_control_node"
tmux send-keys -t data_collection_2 "ros2 run tg_arm_move hand_motion_service_node.py" Enter
# # 第四个终端:手部开合控制:oi左右开,gf左右合
# tmux new-session -d -s data_collection_3 -n "hand_control"
# tmux send-keys -t data_collection_3 "python hand_motion_keyboard_control.py" Enter
echo "所有 tmux 会话已创建:"
echo " data_collection - SSH 头部摄像头"
echo " data_collection_1 - Mocap Retargeting"
echo " data_collection_2 - 手部运动控制节点"
echo " data_collection_3 - 手部键盘控制"
echo ""
echo "使用以下命令查看会话:"
echo " tmux list-sessions"
echo " tmux attach -t data_collection (查看 SSH 头部摄像头)"
echo " tmux attach -t data_collection_1 (查看 Mocap Retargeting)"
echo " tmux attach -t data_collection_2 (查看手部运动控制节点)"
echo " tmux attach -t data_collection_3 (查看手部键盘控制)"
rosbag录制
python3 ~/vla/nr-retargeting-demo/record_rosbag2.py left_hand_experiment /arm/status /camera/color/image_raw
- record_rosbag2.py的代码如下
#!/usr/bin/env python3
import subprocess
import sys
import argparse
from datetime import datetime
import os
def record_rosbag(exp_name, topics):
"""
录制 ROS2 bag,自动在实验名称后添加时间戳
Args:
exp_name: 实验名称
topics: 要录制的主题列表
"""
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# 构建输出文件名
output_name = f"{exp_name}_{timestamp}"
# 构建命令
cmd = ["ros2", "bag", "record", "-o", output_name] + topics
print(f"开始录制 ROS2 bag: {output_name}")
print(f"录制主题: {', '.join(topics)}")
print(f"命令: {' '.join(cmd)}")
# 执行命令
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"录制过程中出错: {e}")
except KeyboardInterrupt:
print("\n录制已停止")
def main():
parser = argparse.ArgumentParser(description="录制 ROS2 bag 并自动添加时间戳")
parser.add_argument("exp_name", help="实验名称")
parser.add_argument("topics", nargs="+", help="要录制的主题")
args = parser.parse_args()
record_rosbag(args.exp_name, args.topics)
if __name__ == "__main__":
main()
流程mark
完整流程
- 开启天工机器人主控
ssh tg-x86-root tmux ros2 launch body_control body.launch.py - 开启摄像头
ssh 10.1.52.63
./start_tiangong_head_camera.sh
- 开启映射代码:
ssh tg-x86
cd ~/vla/nr-retargeting-demo
# 运行下面命令前,动捕人员的手臂最好摆成机器人本体当前的样子
python ./python_demo/mocap_retargeting_arm_ros.py --config-root config tiangong2
# python time_consistency_sub
- 机械手开合
ros2 run tg_arm_move hand_motion_service_node.py
python ~/ws_tiangong_roobot/hand_motion_keyboard_control.py
- 数据包录制
# ros2 bag record -o 2025-11-05_left_1 /arm/status /camera/color/image_raw
python3 record_rosbag2.py exp_name /arm/status /camera/color/image_raw
- 其他:
# ros2 topic pub /head/cmd_pos bodyctrl_msgs/msg/CmdSetMotorPosition "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: ''}, cmds: [{name: 1, pos: 0, spd: 1, cur: 1},{name: 2, pos: 0, spd: 1, cur: 1},{name: 3, pos: 0, spd: 1, cur: 1}]}"
ros2 topic pub /head/cmd_pos bodyctrl_msgs/msg/CmdSetMotorPosition "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: ''}, cmds: [{name: 1, pos: 0, spd: 1, cur: 1},{name: 2, pos: 0.35, spd: 1, cur: 1},{name: 3, pos: 0, spd: 1, cur: 1}]}"
ros2 bag info ***
python3 sync_camera_joint.py --record
simple vesrion
- 开启天工机器人主控
ssh tg-x86-root tmux ros2 launch body_control body.launch.py -
sh ~/vla/nr-retargeting-demo/data_collection.sh - 开启映射代码:
cd ~/vla/nr-retargeting-demo
# 运行下面命令前,动捕人员的手臂最好摆成机器人本体当前的样子
python ./python_demo/mocap_retargeting_arm_ros.py --config-root config tiangong2
- 数据包录制
cd ~/vla/nr-retargeting-demo
python3 record_rosbag2.py exp_name /arm/status /camera/color/image_raw