From a08f9ecb0b131822869222d46e761e00ca4c718e Mon Sep 17 00:00:00 2001 From: Zachary359 Date: Fri, 23 Jan 2026 16:55:39 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ocs2=E9=80=89=E9=A1=B9?= =?UTF-8?q?=E4=BB=A5=E5=8F=8A=E8=AE=B0=E5=BD=95=E6=9C=AB=E7=AB=AFpose?= =?UTF-8?q?=E5=B9=B6playback=20=E4=BD=BF=E7=94=A8dual=20target=20stamp?= =?UTF-8?q?=E7=9A=84topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- record/record_playback.py | 317 +++++++++++++++++++++++++++++++++++--- 1 file changed, 293 insertions(+), 24 deletions(-) diff --git a/record/record_playback.py b/record/record_playback.py index afa0ebd..2c15c79 100644 --- a/record/record_playback.py +++ b/record/record_playback.py @@ -2,8 +2,8 @@ 关节录制和回放脚本 功能: -1. 录制模式:录制当前所有关节状态(左右灵巧手/夹爪、左右臂、腰部、头部),按Enter记录一次节点,可随时保存为JSON -2. 回放模式:加载JSON文件,机器人进入movej模式,按Enter发送一次movej指令 +1. 录制模式:录制当前所有关节状态和末端pose(左右灵巧手/夹爪、左右臂、腰部、头部),按Enter记录一次节点,可随时保存为JSON +2. 回放模式:加载JSON文件,可选择MOVEJ模式(关节控制)或OCS2模式(pose控制),按Enter发送一次指令 使用方法: python joint_record_playback.py [record|playback] [--file ] @@ -128,6 +128,44 @@ def record_node(self) -> bool: 'positions': body_data.get('positions', []) } + # 获取并保存末端执行器pose + left_pose = self.interface.get_end_effector_pose() + if left_pose and self.interface.left_arm_handler: + left_frame_id = self.interface.left_arm_handler.get_frame_id() + node_data['left_end_effector_pose'] = { + 'position': { + 'x': left_pose.position.x, + 'y': left_pose.position.y, + 'z': left_pose.position.z + }, + 'orientation': { + 'x': left_pose.orientation.x, + 'y': left_pose.orientation.y, + 'z': left_pose.orientation.z, + 'w': left_pose.orientation.w + }, + 'frame_id': left_frame_id if left_frame_id else 'arm_base' # 如果没有frame_id,使用默认值 + } + + if self.is_dual_arm: + right_pose = self.interface.get_right_end_effector_pose() + if right_pose and self.interface.right_arm_handler: + right_frame_id = self.interface.right_arm_handler.get_frame_id() + node_data['right_end_effector_pose'] = { + 'position': { + 'x': right_pose.position.x, + 'y': right_pose.position.y, + 'z': right_pose.position.z + }, + 'orientation': { + 'x': right_pose.orientation.x, + 'y': right_pose.orientation.y, + 'z': right_pose.orientation.z, + 'w': right_pose.orientation.w + }, + 'frame_id': right_frame_id if right_frame_id else 'arm_base' # 如果没有frame_id,使用默认值 + } + # 保存节点 self.recorded_nodes.append(node_data) @@ -145,6 +183,10 @@ def record_node(self) -> bool: print(f" 头部: {len(node_data['head']['positions'])} 个关节") if 'body' in node_data: print(f" 腰部: {len(node_data['body']['positions'])} 个关节") + if 'left_end_effector_pose' in node_data: + print(f" 左臂末端pose: 已记录") + if 'right_end_effector_pose' in node_data: + print(f" 右臂末端pose: 已记录") return True @@ -367,6 +409,189 @@ def play_next_node(self) -> bool: import traceback traceback.print_exc() return False + + def play_next_node_ocs2(self) -> bool: + """使用OCS2模式播放下一个节点(通过pose控制) + + Returns: + True if successfully played, False if no more nodes + """ + if self.current_index >= len(self.nodes): + print(" ✗ 已到达最后一个节点") + return False + + node = self.nodes[self.current_index] + self.current_index += 1 + + print(f"\n → 播放节点 #{self.current_index}/{len(self.nodes)} (OCS2模式)") + + try: + # 切换到OCS2状态 + self.interface.send_fsm_command(3) # OCS2状态 + time.sleep(0.3) # 等待状态切换 + + # 检查是否有pose数据 + has_left_pose = 'left_end_effector_pose' in node + has_right_pose = self.is_dual_arm and 'right_end_effector_pose' in node + + if not has_left_pose and not has_right_pose: + print(" ⚠ 警告: 节点中没有末端pose数据,无法使用OCS2模式") + print(" 请使用MOVEJ模式回放,或重新录制包含pose数据的节点") + return False + + from geometry_msgs.msg import Pose + + # 如果是双臂模式且左右臂都有pose数据,使用双臂接口一次发送 + if self.is_dual_arm and has_left_pose and has_right_pose: + # 构建左臂pose + left_pose = Pose() + left_pose_data = node['left_end_effector_pose'] + left_pose.position.x = left_pose_data['position']['x'] + left_pose.position.y = left_pose_data['position']['y'] + left_pose.position.z = left_pose_data['position']['z'] + left_pose.orientation.x = left_pose_data['orientation']['x'] + left_pose.orientation.y = left_pose_data['orientation']['y'] + left_pose.orientation.z = left_pose_data['orientation']['z'] + left_pose.orientation.w = left_pose_data['orientation']['w'] + + # 构建右臂pose + right_pose = Pose() + right_pose_data = node['right_end_effector_pose'] + right_pose.position.x = right_pose_data['position']['x'] + right_pose.position.y = right_pose_data['position']['y'] + right_pose.position.z = right_pose_data['position']['z'] + right_pose.orientation.x = right_pose_data['orientation']['x'] + right_pose.orientation.y = right_pose_data['orientation']['y'] + right_pose.orientation.z = right_pose_data['orientation']['z'] + right_pose.orientation.w = right_pose_data['orientation']['w'] + + # 获取frame_id(优先使用左臂的frame_id,如果不同则警告) + left_frame_id = left_pose_data.get('frame_id', 'arm_base') + right_frame_id = right_pose_data.get('frame_id', 'arm_base') + frame_id = left_frame_id + + if left_frame_id != right_frame_id: + print(f" ⚠ 警告: 左右臂frame_id不同 (左: {left_frame_id}, 右: {right_frame_id}),使用左臂的frame_id") + + # 使用send_dual_arm_target_stamped一次发送双臂pose到/dual_target/stamped topic + try: + self.interface.send_dual_arm_target_stamped(left_pose, right_pose, frame_id) + print(f" ✓ 双臂: 已发送目标pose到/dual_target/stamped (frame: {frame_id})") + except Exception as e: + print(f" ✗ 双臂发送失败: {e}") + # 降级到分别发送 + if self.interface.left_arm_handler: + self.interface.left_arm_handler.send_target_stamped(left_frame_id, left_pose) + print(f" ✓ 左臂: 已发送目标pose (frame: {left_frame_id})(降级模式)") + if self.interface.right_arm_handler: + self.interface.right_arm_handler.send_target_stamped(right_frame_id, right_pose) + print(f" ✓ 右臂: 已发送目标pose (frame: {right_frame_id})(降级模式)") + else: + # 单臂模式或只有一侧有数据,分别发送 + # 发送左臂目标pose(使用stamped版本) + if has_left_pose and self.interface.left_arm_handler: + left_pose = Pose() + pose_data = node['left_end_effector_pose'] + left_pose.position.x = pose_data['position']['x'] + left_pose.position.y = pose_data['position']['y'] + left_pose.position.z = pose_data['position']['z'] + left_pose.orientation.x = pose_data['orientation']['x'] + left_pose.orientation.y = pose_data['orientation']['y'] + left_pose.orientation.z = pose_data['orientation']['z'] + left_pose.orientation.w = pose_data['orientation']['w'] + + # 获取frame_id,如果没有则使用默认值 + frame_id = pose_data.get('frame_id', 'arm_base') + + # 使用send_target_stamped发送到/left_target/stamped topic + self.interface.left_arm_handler.send_target_stamped(frame_id, left_pose) + print(f" ✓ 左臂: 已发送目标pose (frame: {frame_id})") + + # 发送右臂目标pose(使用stamped版本) + if has_right_pose and self.interface.right_arm_handler: + right_pose = Pose() + pose_data = node['right_end_effector_pose'] + right_pose.position.x = pose_data['position']['x'] + right_pose.position.y = pose_data['position']['y'] + right_pose.position.z = pose_data['position']['z'] + right_pose.orientation.x = pose_data['orientation']['x'] + right_pose.orientation.y = pose_data['orientation']['y'] + right_pose.orientation.z = pose_data['orientation']['z'] + right_pose.orientation.w = pose_data['orientation']['w'] + + # 获取frame_id,如果没有则使用默认值 + frame_id = pose_data.get('frame_id', 'arm_base') + + # 使用send_target_stamped发送到/right_target/stamped topic + self.interface.right_arm_handler.send_target_stamped(frame_id, right_pose) + print(f" ✓ 右臂: 已发送目标pose (frame: {frame_id})") + + # 发送左夹爪/灵巧手关节位置 + if 'left_gripper' in node and node['left_gripper'].get('positions'): + positions = node['left_gripper']['positions'] + # 判断是单关节夹爪还是多关节灵巧手 + if len(positions) == 1: + # 单关节夹爪,使用 gripper_handler + if self.interface.left_gripper_handler: + self.interface.left_gripper_handler.send_joint_positions(positions[0]) + print(f" ✓ 左夹爪: 1 个关节") + else: + # 多关节灵巧手,使用关节控制器 + if self.interface.config.left_hand_joint_controller_topic: + self.interface.send_left_hand_joint_positions(positions) + print(f" ✓ 左灵巧手: {len(positions)} 个关节") + elif self.interface.left_gripper_handler: + # 如果没有关节控制器,尝试使用 gripper_handler(可能会失败) + logger.warning("Left hand joint controller not available, trying gripper handler (may fail)") + try: + self.interface.left_gripper_handler.send_joint_positions(positions[0]) + print(f" ⚠ 左灵巧手: 仅发送第一个关节({len(positions)} 个关节可用)") + except Exception as e: + print(f" ✗ 左灵巧手发送失败: {e}") + + # 发送右夹爪/灵巧手关节位置(双臂模式) + if self.is_dual_arm and 'right_gripper' in node and node['right_gripper'].get('positions'): + positions = node['right_gripper']['positions'] + # 判断是单关节夹爪还是多关节灵巧手 + if len(positions) == 1: + # 单关节夹爪,使用 gripper_handler + if self.interface.right_gripper_handler: + self.interface.right_gripper_handler.send_joint_positions(positions[0]) + print(f" ✓ 右夹爪: 1 个关节") + else: + # 多关节灵巧手,使用关节控制器 + if self.interface.config.right_hand_joint_controller_topic: + self.interface.send_right_hand_joint_positions(positions) + print(f" ✓ 右灵巧手: {len(positions)} 个关节") + elif self.interface.right_gripper_handler: + # 如果没有关节控制器,尝试使用 gripper_handler(可能会失败) + logger.warning("Right hand joint controller not available, trying gripper handler (may fail)") + try: + self.interface.right_gripper_handler.send_joint_positions(positions[0]) + print(f" ⚠ 右灵巧手: 仅发送第一个关节({len(positions)} 个关节可用)") + except Exception as e: + print(f" ✗ 右灵巧手发送失败: {e}") + + # 发送头部关节位置 + if 'head' in node and node['head'].get('positions'): + positions = node['head']['positions'] + self.interface.send_head_joint_positions(positions) + print(f" ✓ 头部: {len(positions)} 个关节") + + # 发送腰部关节位置 + if 'body' in node and node['body'].get('positions'): + positions = node['body']['positions'] + self.interface.send_body_joint_positions(positions) + print(f" ✓ 腰部: {len(positions)} 个关节") + + print(f" ✓ 节点 #{self.current_index} 已发送 (OCS2模式)") + return True + + except Exception as e: + print(f" ✗ 播放节点失败: {e}") + import traceback + traceback.print_exc() + return False def record_mode(interface: ROS2RobotInterface): @@ -511,28 +736,72 @@ def playback_mode(interface: ROS2RobotInterface, input_file: Optional[str] = Non interface.disconnect() return 1 - print("\n操作说明:") - print(" - 按 Enter 键:发送下一个节点的movej指令") - print(" - 输入 'q' 或 'quit' 后按 Enter:退出") - print(" - 按 Ctrl+C:强制退出\n") - - # 先切换到HOLD状态,然后再切换到MOVEJ状态 - print("\n准备切换到MOVEJ状态...") - try: - # 先切换到HOLD状态 - print(" → 先切换到HOLD状态...") - interface.send_fsm_command(2) # HOLD状态 - time.sleep(0.3) # 等待状态切换完成 - print(" ✓ 已切换到HOLD状态") + # 选择回放模式 + print("\n请选择回放模式:") + print(" [1] MOVEJ模式 - 使用关节角度控制(默认)") + print(" [2] OCS2模式 - 使用末端pose控制(需要节点中包含pose数据)") + + while True: + mode_choice = input("\n请选择模式 (1/2,默认1): ").strip() + if not mode_choice: + mode_choice = '1' + if mode_choice in ['1', '2']: + use_ocs2 = (mode_choice == '2') + break + else: + print(" ⚠ 无效选择,请输入 1 或 2") + + if use_ocs2: + print("\n → 使用OCS2模式回放") + print("\n操作说明:") + print(" - 按 Enter 键:发送下一个节点的OCS2 pose指令") + print(" - 输入 'q' 或 'quit' 后按 Enter:退出") + print(" - 按 Ctrl+C:强制退出\n") - # 再切换到MOVEJ状态 - print(" → 再切换到MOVEJ状态...") - interface.send_fsm_command(4) # MOVEJ状态 - time.sleep(0.3) # 等待状态切换完成 - print(" ✓ 已切换到MOVEJ状态\n") - except Exception as e: - print(f" ⚠ 切换状态失败: {e}") - print(" 将继续尝试发送关节位置(send_joint_positions会自动切换状态)\n") + # 切换到OCS2状态 + print("\n准备切换到OCS2状态...") + try: + # 先切换到HOLD状态 + print(" → 先切换到HOLD状态...") + interface.send_fsm_command(2) # HOLD状态 + time.sleep(0.3) # 等待状态切换完成 + print(" ✓ 已切换到HOLD状态") + + # 再切换到OCS2状态 + print(" → 再切换到OCS2状态...") + interface.send_fsm_command(3) # OCS2状态 + time.sleep(0.3) # 等待状态切换完成 + print(" ✓ 已切换到OCS2状态\n") + except Exception as e: + print(f" ⚠ 切换状态失败: {e}\n") + + play_func = player.play_next_node_ocs2 + else: + print("\n → 使用MOVEJ模式回放") + print("\n操作说明:") + print(" - 按 Enter 键:发送下一个节点的movej指令") + print(" - 输入 'q' 或 'quit' 后按 Enter:退出") + print(" - 按 Ctrl+C:强制退出\n") + + # 先切换到HOLD状态,然后再切换到MOVEJ状态 + print("\n准备切换到MOVEJ状态...") + try: + # 先切换到HOLD状态 + print(" → 先切换到HOLD状态...") + interface.send_fsm_command(2) # HOLD状态 + time.sleep(0.3) # 等待状态切换完成 + print(" ✓ 已切换到HOLD状态") + + # 再切换到MOVEJ状态 + print(" → 再切换到MOVEJ状态...") + interface.send_fsm_command(4) # MOVEJ状态 + time.sleep(0.3) # 等待状态切换完成 + print(" ✓ 已切换到MOVEJ状态\n") + except Exception as e: + print(f" ⚠ 切换状态失败: {e}") + print(" 将继续尝试发送关节位置(send_joint_positions会自动切换状态)\n") + + play_func = player.play_next_node try: while True: @@ -540,7 +809,7 @@ def playback_mode(interface: ROS2RobotInterface, input_file: Optional[str] = Non if user_input == '': # 按Enter播放下一个节点 - if not player.play_next_node(): + if not play_func(): print("\n ✓ 所有节点已播放完成") break elif user_input in ['q', 'quit']: From d0a8806a497e09afc66bbc5fd52d365cf8c089ac Mon Sep 17 00:00:00 2001 From: Zachary359 Date: Thu, 5 Feb 2026 21:09:00 +0800 Subject: [PATCH 2/6] auto --- record/auto_playback.py | 492 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 492 insertions(+) create mode 100644 record/auto_playback.py diff --git a/record/auto_playback.py b/record/auto_playback.py new file mode 100644 index 0000000..a36f01b --- /dev/null +++ b/record/auto_playback.py @@ -0,0 +1,492 @@ +#!/usr/bin/env python3 +""" +自动播放脚本 - 每10秒切换动作 + +功能: +使用硬编码的两个动作点,每间隔2秒自动切换到下一个动作 + +使用方法: + python auto_playback.py [--interval ] +""" + +import json +import sys +import time +import argparse +import re +from typing import Dict, List, Any +from pathlib import Path + +# 尝试导入 ROS2RobotInterface +try: + from ros2_robot_interface import ROS2RobotInterface, ROS2RobotInterfaceConfig +except ImportError: + print("错误: 无法导入 ros2_robot_interface") + print("请确保已安装 ros2_robot_interface 库") + sys.exit(1) + + +def parse_data_file(filepath: str) -> List[Dict[str, Any]]: + """解析 data1.txt 文件,提取动作点 + + Args: + filepath: 数据文件路径 + + Returns: + 动作点列表 + """ + with open(filepath, 'r', encoding='utf-8') as f: + content = f.read() + + # 修复格式问题:添加缺失的逗号和括号 + # 首先尝试修复格式 + content = content.strip() + + # 查找所有动作点(通过查找 "left_arm" 开始的位置) + actions = [] + + # 使用正则表达式分割不同的动作点 + # 查找每个动作点的开始("left_arm" 前面可能有空白和逗号) + pattern = r'(\s*"left_arm"\s*:\s*\{[^}]*"positions"\s*:\s*\[[^\]]+\]\s*\})' + + # 更简单的方法:查找所有 "left_arm" 开始的位置 + left_arm_positions = [m.start() for m in re.finditer(r'"left_arm"\s*:\s*\{', content)] + + if len(left_arm_positions) < 2: + # 如果找不到两个动作点,尝试手动解析 + # 先修复格式,然后尝试解析为 JSON + print("尝试修复文件格式...") + + # 在第一个 "right_arm" 后添加逗号(如果缺失) + content = re.sub(r'(\]\s*)\s*"right_arm"', r'\1,\n "right_arm"', content, count=1) + + # 在第一个 "body" 的 } 后添加逗号(如果缺失) + content = re.sub(r'(\]\s*)\s*\}\s*(\s*"left_arm")', r'\1\n }\n },\n {\n "left_arm"', content) + + # 修复最后一个动作点的格式 + content = re.sub(r'(\]\s*)\s*\}\s*$', r'\1\n }\n }', content) + + # 添加外层括号 + if not content.strip().startswith('['): + content = '[\n {\n' + content + if not content.strip().endswith(']'): + content = content + '\n]' + + # 尝试解析为 JSON + try: + # 如果内容看起来像是一个对象数组 + if content.strip().startswith('{'): + # 只有一个对象,需要包装成数组 + data = json.loads('[' + content + ']') + elif content.strip().startswith('['): + data = json.loads(content) + else: + # 尝试手动解析 + # 分割成两个动作点 + parts = content.split('"left_arm"') + if len(parts) >= 3: + # 第一个动作点 + part1 = '"left_arm"' + parts[1] + # 找到第一个动作点的结束(在第二个 "left_arm" 之前) + if part1.strip().endswith(','): + part1 = part1.rstrip(',') + part1 = '{' + part1 + '}' + + # 第二个动作点 + part2 = '"left_arm"' + parts[2] + if part2.strip().endswith(','): + part2 = part2.rstrip(',') + part2 = '{' + part2 + '}' + + data = [json.loads(part1), json.loads(part2)] + else: + raise ValueError("无法解析文件格式") + except json.JSONDecodeError as e: + print(f"JSON 解析错误: {e}") + print("尝试手动解析...") + + # 手动解析方法:查找每个动作点的各个部分 + actions = [] + + # 查找第一个动作点(行1-99) + # 查找所有键值对 + left_arm_match = re.search(r'"left_arm"\s*:\s*\{[^}]+\}', content) + if left_arm_match: + # 提取第一个动作点的所有部分 + start_pos = left_arm_match.start() + # 找到第一个动作点的结束位置(在第二个 "left_arm" 之前) + second_left_arm = content.find('"left_arm"', start_pos + 1) + if second_left_arm > 0: + first_action_str = content[start_pos:second_left_arm].strip() + # 移除末尾的逗号 + first_action_str = first_action_str.rstrip(',').strip() + # 添加大括号 + if not first_action_str.startswith('{'): + first_action_str = '{' + first_action_str + '}' + + try: + first_action = json.loads(first_action_str) + actions.append(first_action) + except: + pass + + # 提取第二个动作点 + if second_left_arm > 0: + second_action_str = content[second_left_arm:].strip() + second_action_str = second_action_str.rstrip(',').strip() + if not second_action_str.startswith('{'): + second_action_str = '{' + second_action_str + '}' + + try: + second_action = json.loads(second_action_str) + actions.append(second_action) + except: + pass + + if len(actions) < 2: + # 最后的尝试:直接使用正则表达式提取 + print("使用正则表达式提取动作点...") + # 简化:直接查找 positions 数组 + # 这个方法更可靠:查找所有 "positions" 数组 + positions_matches = list(re.finditer(r'"positions"\s*:\s*\[([^\]]+)\]', content)) + + if len(positions_matches) >= 14: # 每个动作点有7个部分(left_arm, right_arm, left_gripper, right_gripper, head, body) + # 手动构建动作点 + # 第一个动作点:前7个 positions + # 第二个动作点:后7个 positions + pass # 这个方法太复杂,改用更简单的方法 + + # 最简单的方法:直接硬编码解析这两个动作点 + print("使用硬编码方式解析动作点...") + actions = [ + { + "left_arm": { + "names": ["left_joint1", "left_joint2", "left_joint3", "left_joint4", "left_joint5", "left_joint6", "left_joint7"], + "positions": [1.9031929192987382, -0.8684644880629735, 0.24016829460865796, -1.1371121280262346, -0.2638275514168129, -0.6349110528383953, 0.0011697802450267199] + }, + "right_arm": { + "names": ["right_joint1", "right_joint2", "right_joint3", "right_joint4", "right_joint5", "right_joint6", "right_joint7"], + "positions": [-1.8729998630721831, 0.9880000075379893, 2.013999728563938, -0.888000172818277, 0.7788995842844642, -0.1329997295771567, -0.981999972181068] + }, + "left_gripper": { + "names": ["left_hand_index_joint", "left_hand_middle_joint", "left_hand_pinky_joint", "left_hand_ring_joint", "left_hand_thumb_joint1", "left_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "right_gripper": { + "names": ["right_hand_index_joint", "right_hand_middle_joint", "right_hand_pinky_joint", "right_hand_ring_joint", "right_hand_thumb_joint1", "right_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "head": { + "names": ["head_joint1", "head_joint2"], + "positions": [-0.000296705972839036, -0.00010471975511965978] + }, + "body": { + "names": ["body_joint1", "body_joint2", "body_joint3", "body_joint4"], + "positions": [-0.2687457982220869, -0.5837951814995832, -0.3448596069015596, 0.0] + } + }, + { + "left_arm": { + "names": ["left_joint1", "left_joint2", "left_joint3", "left_joint4", "left_joint5", "left_joint6", "left_joint7"], + "positions": [1.9031917208762479, -0.8684592283198206, 0.24016183644301453, -1.1371002769593836, -0.26382648615237686, -0.6349059928323241, 0.00116726350578173] + }, + "right_arm": { + "names": ["right_joint1", "right_joint2", "right_joint3", "right_joint4", "right_joint5", "right_joint6", "right_joint7"], + "positions": [-1.873000395704401, 1.3080002509577848, 2.013999728563938, -0.887999640186059, 0.7788999837586276, -0.1329999709261305, -0.9819998390230137] + }, + "left_gripper": { + "names": ["left_hand_index_joint", "left_hand_middle_joint", "left_hand_pinky_joint", "left_hand_ring_joint", "left_hand_thumb_joint1", "left_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "right_gripper": { + "names": ["right_hand_index_joint", "right_hand_middle_joint", "right_hand_pinky_joint", "right_hand_ring_joint", "right_hand_thumb_joint1", "right_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "head": { + "names": ["head_joint1", "head_joint2"], + "positions": [-0.000296705972839036, -0.00010471975511965978] + }, + "body": { + "names": ["body_joint1", "body_joint2", "body_joint3", "body_joint4"], + "positions": [-0.2687457982220869, -0.5837951814995832, -0.3448596069015596, 0.0] + } + } + ] + return actions + + return data if isinstance(data, list) else [data] + + +def send_action(interface: ROS2RobotInterface, action: Dict[str, Any]): + """发送一个动作点到机器人 + + Args: + interface: ROS2RobotInterface 实例 + action: 动作点字典 + """ + try: + # 切换到MOVEJ状态 + interface.send_fsm_command(4) # MOVEJ状态 + time.sleep(0.1) # 等待状态切换 + + is_dual_arm = interface.config.right_end_effector_pose_topic is not None + + # 发送双臂关节位置 + left_arm_positions = None + right_arm_positions = None + + if 'left_arm' in action and action['left_arm'].get('positions'): + left_arm_positions = action['left_arm']['positions'] + + if is_dual_arm and 'right_arm' in action and action['right_arm'].get('positions'): + right_arm_positions = action['right_arm']['positions'] + + # 如果是双臂模式且左右臂都有数据,使用双臂轨迹接口 + if is_dual_arm and left_arm_positions is not None and right_arm_positions is not None: + try: + interface.send_dual_arm_joint_positions(left_arm_positions, right_arm_positions) + print(f" ✓ 双臂: 左臂 {len(left_arm_positions)} 个关节,右臂 {len(right_arm_positions)} 个关节") + except Exception as e: + print(f" ✗ 双臂发送失败: {e}") + # 降级到分别发送 + if interface.left_arm_handler: + interface.left_arm_handler.send_joint_positions(left_arm_positions) + if interface.right_arm_handler: + interface.right_arm_handler.send_joint_positions(right_arm_positions) + else: + # 单臂模式或只有一侧有数据,分别发送 + if left_arm_positions is not None and interface.left_arm_handler: + interface.left_arm_handler.send_joint_positions(left_arm_positions) + print(f" ✓ 左臂: {len(left_arm_positions)} 个关节") + + if right_arm_positions is not None and interface.right_arm_handler: + interface.right_arm_handler.send_joint_positions(right_arm_positions) + print(f" ✓ 右臂: {len(right_arm_positions)} 个关节") + + # 发送左夹爪/灵巧手关节位置 + if 'left_gripper' in action and action['left_gripper'].get('positions'): + positions = action['left_gripper']['positions'] + if len(positions) == 1: + if interface.left_gripper_handler: + interface.left_gripper_handler.send_joint_positions(positions[0]) + print(f" ✓ 左夹爪: 1 个关节") + else: + if interface.config.left_hand_joint_controller_topic: + interface.send_left_hand_joint_positions(positions) + print(f" ✓ 左灵巧手: {len(positions)} 个关节") + + # 发送右夹爪/灵巧手关节位置(双臂模式) + if is_dual_arm and 'right_gripper' in action and action['right_gripper'].get('positions'): + positions = action['right_gripper']['positions'] + if len(positions) == 1: + if interface.right_gripper_handler: + interface.right_gripper_handler.send_joint_positions(positions[0]) + print(f" ✓ 右夹爪: 1 个关节") + else: + if interface.config.right_hand_joint_controller_topic: + interface.send_right_hand_joint_positions(positions) + print(f" ✓ 右灵巧手: {len(positions)} 个关节") + + # 发送头部关节位置 + if 'head' in action and action['head'].get('positions'): + positions = action['head']['positions'] + interface.send_head_joint_positions(positions) + print(f" ✓ 头部: {len(positions)} 个关节") + + # 发送腰部关节位置 + if 'body' in action and action['body'].get('positions'): + positions = action['body']['positions'] + interface.send_body_joint_positions(positions) + print(f" ✓ 腰部: {len(positions)} 个关节") + + return True + + except Exception as e: + print(f" ✗ 发送动作失败: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """主函数""" + parser = argparse.ArgumentParser( + description='自动播放脚本 - 每2秒切换动作', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +使用示例: + # 使用默认间隔2秒 + python auto_playback.py + + # 指定间隔时间 + python auto_playback.py --interval 2 + """ + ) + + parser.add_argument( + '--interval', '-i', + type=float, + default=2.0, + help='动作切换间隔时间(秒,默认: 2.0)' + ) + + args = parser.parse_args() + + # ======================================================================== + # 初始化和连接 + # ======================================================================== + print("\n" + "=" * 70) + print(" " * 20 + "自动播放脚本") + print("=" * 70) + + # 使用硬编码的动作点数据 + print("\n[1] 加载动作点数据...") + actions = [ + { + "left_arm": { + "names": ["left_joint1", "left_joint2", "left_joint3", "left_joint4", "left_joint5", "left_joint6", "left_joint7"], + "positions": [1.9031929192987382, -0.8684644880629735, 0.24016829460865796, -1.1371121280262346, -0.2638275514168129, -0.6349110528383953, 0.0011697802450267199] + }, + "right_arm": { + "names": ["right_joint1", "right_joint2", "right_joint3", "right_joint4", "right_joint5", "right_joint6", "right_joint7"], + "positions": [-1.8729998630721831, 0.9880000075379893, 2.013999728563938, -0.888000172818277, 0.7788995842844642, -0.1329997295771567, -0.981999972181068] + }, + "left_gripper": { + "names": ["left_hand_index_joint", "left_hand_middle_joint", "left_hand_pinky_joint", "left_hand_ring_joint", "left_hand_thumb_joint1", "left_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "right_gripper": { + "names": ["right_hand_index_joint", "right_hand_middle_joint", "right_hand_pinky_joint", "right_hand_ring_joint", "right_hand_thumb_joint1", "right_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "head": { + "names": ["head_joint1", "head_joint2"], + "positions": [-0.000296705972839036, -0.00010471975511965978] + }, + "body": { + "names": ["body_joint1", "body_joint2", "body_joint3", "body_joint4"], + "positions": [-0.2687457982220869, -0.5837951814995832, -0.3448596069015596, 0.0] + } + }, + { + "left_arm": { + "names": ["left_joint1", "left_joint2", "left_joint3", "left_joint4", "left_joint5", "left_joint6", "left_joint7"], + "positions": [1.9031917208762479, -0.8684592283198206, 0.24016183644301453, -1.1371002769593836, -0.26382648615237686, -0.6349059928323241, 0.00116726350578173] + }, + "right_arm": { + "names": ["right_joint1", "right_joint2", "right_joint3", "right_joint4", "right_joint5", "right_joint6", "right_joint7"], + "positions": [-1.873000395704401, 1.3080002509577848, 2.013999728563938, -0.887999640186059, 0.7788999837586276, -0.1329999709261305, -0.9819998390230137] + }, + "left_gripper": { + "names": ["left_hand_index_joint", "left_hand_middle_joint", "left_hand_pinky_joint", "left_hand_ring_joint", "left_hand_thumb_joint1", "left_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "right_gripper": { + "names": ["right_hand_index_joint", "right_hand_middle_joint", "right_hand_pinky_joint", "right_hand_ring_joint", "right_hand_thumb_joint1", "right_hand_thumb_joint2"], + "positions": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5] + }, + "head": { + "names": ["head_joint1", "head_joint2"], + "positions": [-0.000296705972839036, -0.00010471975511965978] + }, + "body": { + "names": ["body_joint1", "body_joint2", "body_joint3", "body_joint4"], + "positions": [-0.2687457982220869, -0.5837951814995832, -0.3448596069015596, 0.0] + } + } + ] + print(f" ✓ 已加载 {len(actions)} 个动作点") + + print("\n[2] 创建配置...") + config = ROS2RobotInterfaceConfig() + + print("[3] 创建ROS2RobotInterface实例...") + interface = ROS2RobotInterface(config) + + print("[4] 连接到ROS 2...") + try: + interface.connect() + print(" ✓ 接口连接成功!\n") + except Exception as e: + print(f" ✗ 连接失败: {e}\n") + return 1 + + # 检查模式 + is_dual_arm = interface.config.right_end_effector_pose_topic is not None + print(f" ✓ 检测到{'双臂' if is_dual_arm else '单臂'}模式\n") + + # 切换到MOVEJ状态 + print("\n准备切换到MOVEJ状态...") + try: + print(" → 先切换到HOLD状态...") + interface.send_fsm_command(2) # HOLD状态 + time.sleep(0.3) + print(" ✓ 已切换到HOLD状态") + + print(" → 再切换到MOVEJ状态...") + interface.send_fsm_command(4) # MOVEJ状态 + time.sleep(0.3) + print(" ✓ 已切换到MOVEJ状态\n") + except Exception as e: + print(f" ⚠ 切换状态失败: {e}\n") + + # ======================================================================== + # 自动播放循环 + # ======================================================================== + print(f"\n开始自动播放,每 {args.interval} 秒切换一次动作...") + print("按 Ctrl+C 停止\n") + + current_index = 0 + + try: + while True: + action = actions[current_index] + print(f"\n[{time.strftime('%H:%M:%S')}] 播放动作点 #{current_index + 1}/{len(actions)}") + + if send_action(interface, action): + print(f" ✓ 动作点 #{current_index + 1} 已发送") + else: + print(f" ✗ 动作点 #{current_index + 1} 发送失败") + + # 切换到下一个动作点 + current_index = (current_index + 1) % len(actions) + + # 等待指定时间 + print(f"\n等待 {args.interval} 秒后切换到下一个动作...") + time.sleep(args.interval) + + except KeyboardInterrupt: + print("\n\n ⚠ 用户中断") + + # 切换回HOLD状态 + print("\n切换回HOLD状态...") + try: + interface.send_fsm_command(2) # HOLD状态 + time.sleep(0.5) + print(" ✓ 已切换到HOLD状态") + except Exception as e: + print(f" ⚠ 切换状态失败: {e}") + + # 断开连接 + print("\n[5] 断开连接...") + interface.disconnect() + print(" ✓ 已断开连接\n") + + print("=" * 70) + print("程序结束") + print("=" * 70 + "\n") + + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except KeyboardInterrupt: + print("\n\n用户中断程序") + sys.exit(1) + except Exception as e: + print(f"\n\n未预期的错误: {e}") + import traceback + traceback.print_exc() + sys.exit(1) From 3795dcdb47986bdb1708dd1f4e2886217245ac0c Mon Sep 17 00:00:00 2001 From: Zachary359 Date: Tue, 10 Feb 2026 13:44:46 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E5=88=87=E6=8D=A2=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=9C=BA=E4=B9=8B=E5=90=8Esleep=200.1s=E7=A1=AE=E4=BF=9Dfsm?= =?UTF-8?q?=E5=88=87=E6=8D=A2=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ros2_robot_interface/ros_interface.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ros2_robot_interface/ros_interface.py b/ros2_robot_interface/ros_interface.py index 3d98e4c..ab91bf9 100644 --- a/ros2_robot_interface/ros_interface.py +++ b/ros2_robot_interface/ros_interface.py @@ -815,6 +815,9 @@ def send_fsm_command(self, command: int) -> None: self._current_fsm_command = command else: logger.debug(f"FSM command {command} is a special command, not updating internal state") + + # 等待状态机完成切换,避免后续指令在旧状态下执行 + time.sleep(0.1) def get_fsm_command(self) -> int: """Get current FSM command. From 2505bafd6c53f37ad258c8cc445d1acb8b7a5a59 Mon Sep 17 00:00:00 2001 From: Zachary359 Date: Tue, 10 Feb 2026 13:56:37 +0800 Subject: [PATCH 4/6] update --- ros2_robot_interface/ros_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ros2_robot_interface/ros_interface.py b/ros2_robot_interface/ros_interface.py index ab91bf9..e57fe8a 100644 --- a/ros2_robot_interface/ros_interface.py +++ b/ros2_robot_interface/ros_interface.py @@ -817,7 +817,7 @@ def send_fsm_command(self, command: int) -> None: logger.debug(f"FSM command {command} is a special command, not updating internal state") # 等待状态机完成切换,避免后续指令在旧状态下执行 - time.sleep(0.1) + time.sleep(0.3) def get_fsm_command(self) -> int: """Get current FSM command. From 14649ae24c88da2bc1663b85699ed6d5f5a821a5 Mon Sep 17 00:00:00 2001 From: Zachary359 Date: Tue, 10 Feb 2026 14:03:12 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E4=B8=BA=E5=8F=AF=E9=85=8D=E7=BD=AE=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ros2_robot_interface/config.py | 5 +++++ ros2_robot_interface/ros_interface.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/ros2_robot_interface/config.py b/ros2_robot_interface/config.py index 4312324..e541e39 100644 --- a/ros2_robot_interface/config.py +++ b/ros2_robot_interface/config.py @@ -124,6 +124,11 @@ class ROS2RobotInterfaceConfig: joint_state_timeout: float = 0.0 # 【可选】关节状态超时时间(单位:秒) end_effector_pose_timeout: float = 0.0 # 【可选】末端执行器位姿超时时间(单位:秒) + # ============================================================================ + # 状态机切换(可选,手动配置) + # ============================================================================ + fsm_state_switch_settle_time: float = 0.3 # 【可选】发送 FSM 状态切换命令后的等待时间(单位:秒),用于确保对端完成状态切换 + # ============================================================================ # ROS 2 节点配置(可选,手动配置) # ============================================================================ diff --git a/ros2_robot_interface/ros_interface.py b/ros2_robot_interface/ros_interface.py index e57fe8a..ced6aa6 100644 --- a/ros2_robot_interface/ros_interface.py +++ b/ros2_robot_interface/ros_interface.py @@ -817,7 +817,7 @@ def send_fsm_command(self, command: int) -> None: logger.debug(f"FSM command {command} is a special command, not updating internal state") # 等待状态机完成切换,避免后续指令在旧状态下执行 - time.sleep(0.3) + time.sleep(self.config.fsm_state_switch_settle_time) def get_fsm_command(self) -> int: """Get current FSM command. From 7bf39873930ce0bdc978e83a58f59785cc5b6559 Mon Sep 17 00:00:00 2001 From: Zachary359 Date: Tue, 10 Feb 2026 14:43:51 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E5=BE=AA=E7=8E=AF?= =?UTF-8?q?=E6=92=AD=E6=94=BE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- record/record_playback.py | 130 ++++++++++++++++++++++++++++++++------ 1 file changed, 111 insertions(+), 19 deletions(-) diff --git a/record/record_playback.py b/record/record_playback.py index 2c15c79..6d05233 100644 --- a/record/record_playback.py +++ b/record/record_playback.py @@ -3,10 +3,12 @@ 功能: 1. 录制模式:录制当前所有关节状态和末端pose(左右灵巧手/夹爪、左右臂、腰部、头部),按Enter记录一次节点,可随时保存为JSON -2. 回放模式:加载JSON文件,可选择MOVEJ模式(关节控制)或OCS2模式(pose控制),按Enter发送一次指令 +2. 回放模式:加载JSON文件,可选择MOVEJ或OCS2模式,以及单步回放或循环回放。 + - 单步回放:按 Enter 发送下一个节点 + - 循环回放:选择后输入每个点之间的间隔时间(秒),第一次按 Enter 开始播放,再次按 Enter 暂停 使用方法: - python joint_record_playback.py [record|playback] [--file ] + python record_playback.py [record|playback] [--file ] """ import json @@ -15,6 +17,7 @@ import argparse import os import logging +import threading from typing import Dict, List, Optional, Any from pathlib import Path @@ -751,11 +754,46 @@ def playback_mode(interface: ROS2RobotInterface, input_file: Optional[str] = Non else: print(" ⚠ 无效选择,请输入 1 或 2") + # 选择回放方式:单步 或 循环 + print("\n请选择回放方式:") + print(" [1] 单步回放 - 按 Enter 发送下一个节点") + print(" [2] 循环回放 - 按 Enter 开始播放,再次按 Enter 暂停,每个点之间按间隔时间循环") + + while True: + step_choice = input("\n请选择回放方式 (1/2,默认1): ").strip() + if not step_choice: + step_choice = '1' + if step_choice in ['1', '2']: + loop_playback = (step_choice == '2') + break + else: + print(" ⚠ 无效选择,请输入 1 或 2") + + loop_interval = 2.0 + if loop_playback: + while True: + interval_input = input("请输入每个点之间的间隔时间(秒,默认 2.0): ").strip() + if not interval_input: + loop_interval = 2.0 + break + try: + loop_interval = float(interval_input) + if loop_interval <= 0: + print(" ⚠ 间隔时间需大于 0,请重新输入") + continue + break + except ValueError: + print(" ⚠ 请输入有效的数字") + if use_ocs2: print("\n → 使用OCS2模式回放") - print("\n操作说明:") - print(" - 按 Enter 键:发送下一个节点的OCS2 pose指令") - print(" - 输入 'q' 或 'quit' 后按 Enter:退出") + print("\n操作说明:" if not loop_playback else f"\n操作说明(循环回放,间隔 {loop_interval} 秒):") + if loop_playback: + print(" - 按 Enter 键:开始播放(再次按 Enter 暂停)") + print(" - 输入 'q' 或 'quit' 后按 Enter:退出") + else: + print(" - 按 Enter 键:发送下一个节点的OCS2 pose指令") + print(" - 输入 'q' 或 'quit' 后按 Enter:退出") print(" - 按 Ctrl+C:强制退出\n") # 切换到OCS2状态 @@ -778,9 +816,13 @@ def playback_mode(interface: ROS2RobotInterface, input_file: Optional[str] = Non play_func = player.play_next_node_ocs2 else: print("\n → 使用MOVEJ模式回放") - print("\n操作说明:") - print(" - 按 Enter 键:发送下一个节点的movej指令") - print(" - 输入 'q' 或 'quit' 后按 Enter:退出") + print("\n操作说明:" if not loop_playback else f"\n操作说明(循环回放,间隔 {loop_interval} 秒):") + if loop_playback: + print(" - 按 Enter 键:开始播放(再次按 Enter 暂停)") + print(" - 输入 'q' 或 'quit' 后按 Enter:退出") + else: + print(" - 按 Enter 键:发送下一个节点的movej指令") + print(" - 输入 'q' 或 'quit' 后按 Enter:退出") print(" - 按 Ctrl+C:强制退出\n") # 先切换到HOLD状态,然后再切换到MOVEJ状态 @@ -804,18 +846,68 @@ def playback_mode(interface: ROS2RobotInterface, input_file: Optional[str] = Non play_func = player.play_next_node try: - while True: - user_input = input("按 Enter 发送下一个节点,输入 'q' 退出: ").strip().lower() - - if user_input == '': - # 按Enter播放下一个节点 - if not play_func(): - print("\n ✓ 所有节点已播放完成") + if loop_playback: + # 循环回放:Enter 开始/暂停,q 退出 + quit_flag = threading.Event() + playing = threading.Event() # 首次 Enter 开始,再次 Enter 暂停 + + def input_thread_fn(): + while not quit_flag.is_set(): + try: + line = input() + except (EOFError, KeyboardInterrupt): + break + line = line.strip().lower() + if line in ['q', 'quit']: + quit_flag.set() + break + # Enter:切换 开始/暂停 + if playing.is_set(): + playing.clear() + print(" → 已暂停") + else: + playing.set() + print(" → 开始播放") + + t = threading.Thread(target=input_thread_fn, daemon=True) + t.start() + + print("按 Enter 开始播放,再次按 Enter 暂停,输入 'q' 退出\n") + # 等待第一次 Enter 才开始 + while not playing.is_set() and not quit_flag.is_set(): + time.sleep(0.05) + while not quit_flag.is_set(): + if not playing.is_set(): + time.sleep(0.05) + continue + success = play_func() + if not success: + # 播完一轮,循环:重置索引 + player.current_index = 0 + if not player.nodes: + break + continue + # 间隔时间内可被暂停或退出打断 + steps = max(1, int(loop_interval / 0.1)) + for _ in range(steps): + if quit_flag.is_set(): + break + if not playing.is_set(): + break + time.sleep(0.1) + else: + # 单步回放 + while True: + user_input = input("按 Enter 发送下一个节点,输入 'q' 退出: ").strip().lower() + + if user_input == '': + if not play_func(): + print("\n ✓ 所有节点已播放完成") + break + elif user_input in ['q', 'quit']: break - elif user_input in ['q', 'quit']: - break - else: - print(" ⚠ 无效输入,请重试") + else: + print(" ⚠ 无效输入,请重试") except KeyboardInterrupt: print("\n\n ⚠ 用户中断")