24 实践课-设备控制节点开发

设备控制节点开发

关联:索引

兼容说明:

0)快速自检:先确认 MoveIt2 包是否存在(证据优先)

source /opt/ros/humble/setup.bash
ros2 pkg list | grep -E "moveit" | head

逐行解释:

如果查不到(常见于未安装),按需安装(示例):

sudo apt update
sudo apt install -y ros-humble-moveit

逐行解释:

0.1)快速自检:确认 ros2_control 与控制器是否存在(要“真执行”就必须有)

说明:

先用证据确认 ros2_control 相关包是否存在:

source /opt/ros/humble/setup.bash
ros2 pkg list | grep -E "ros2_control|controller_manager|ros2_controllers" | head

逐行解释:

如果查不到(常见于未安装),按需安装(示例):

sudo apt update
sudo apt install -y ros-humble-ros2-control ros-humble-ros2-controllers ros-humble-controller-manager

逐行解释:

安装后你还需要在“运行阶段”给出控制链路证据(后文会做):ros2 control list_controllersros2 topic echo --once /joint_states、以及 MoveIt2 轨迹执行动作是否存在(例如 FollowJointTrajectory)。


系统架构与接口约定(必读)

1)节点分工(推荐最小架构)

2)接口约定(建议口径,可根据你现有视觉节点调整)

视觉输出(与 YOLOv8 视觉检测口径对齐,建议直接复用):

控制节点输出:

分拣箱位(预置):


  1. MoveIt2 demo/bringup 能启动(终端日志无关键报错)。
  2. RViz2 中能看到机器人模型与 Planning Scene(截图)。
  3. 你能证明“规划链路确实存在且可观察”(动作列表/日志/控制器反馈其一即可,但必须可复现)。

1)MoveIt2 在做什么(用一句话串起来)

MoveIt2 = “给定目标(位姿/关节角)+ 约束(姿态/路径/速度/碰撞)→ 规划器算出可行轨迹 → 轨迹交给控制器执行 → 执行结果/状态反馈用于闭环”。

你必须能说清以下四个关键词的关系:

  1. 目标表达是否正确:目标坐标系是什么?目标是否在可达空间?姿态是否奇异?
  2. 场景是否合理:是否发生自碰撞/环境碰撞?附加物体是否误判为碰撞?
  3. 规划配置是否匹配:planning group 名称是否正确?关节限制/速度缩放是否过小?
  4. 执行链路是否通:控制器是否启动?FollowJointTrajectory 是否可用?

0)复用上学期机械臂 URDF(课程资源)

本复用课程资料中的机械臂模型(上一学期已验证通过;URDF 与关节命名不依赖发行版,ROS2 Humble 可直接复用):

建议做法:创建一个“机器人描述包”并放入该 URDF,保证后续工具能通过 package:// 路径找到模型。

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --license MIT robot_arm_description
mkdir -p robot_arm_description/urdf
mkdir -p robot_arm_description/config

逐行解释:

把课程资料中的 robot_arm.urdf 复制到 robot_arm_description/urdf/robot_arm.urdf 后,编译使其在环境中可见:

cd ~/ros2_ws
colcon build --symlink-install
source install/setup.bash

工程规范补全(否则后续 launch/配置文件可能找不到):

示例(片段,按你包的 package_name 与 imports 补齐即可):

data_files=[
    # 注册资源索引:让 `ros2 pkg` 能识别这个包
    ('share/ament_index/resource_index/packages', ['resource/' + package_name]),
    # 安装 package.xml:ament/colcon 运行时需要它来解析包信息
    ('share/' + package_name, ['package.xml']),
    # 安装 URDF:让 launch 里能通过 get_package_share_directory() 找到模型文件
    # 说明:glob 来自 `from glob import glob`
    ('share/' + package_name + '/urdf', glob('urdf/*.urdf')),
    # 安装 ros2_control 控制器配置:controller_manager 读取 YAML
    ('share/' + package_name + '/config', glob('config/*.yaml')),
    # 安装 launch:让你能 `ros2 launch <pkg> <file>.launch.py`
    ('share/' + package_name + '/launch', glob('launch/*.py')),
]

1)用 ros2_control 跑通“能执行到控制器”的最小链路(推荐先做)

本步骤目标是得到两条证据:控制器 active、FollowJointTrajectory 接口存在。

1)控制器配置文件(示例,参考课程资料的标准写法):

arm_controllers.yaml(放到你的描述包或 bringup 包的 config/ 下):

controller_manager:
  ros__parameters:
    update_rate: 20  # 控制循环频率(Hz),过低会影响跟踪质量,过高对 CPU 要求更高

    arm_controller:  # 控制器实例名(后续 spawner/ros2 control 都用这个名字)
      type: joint_trajectory_controller/JointTrajectoryController  # 轨迹控制器:对外提供 FollowJointTrajectory

    hand_controller:
      type: forward_command_controller/ForwardCommandController  # 前向命令控制器:直接写某个接口(这里用 position)

    joint_state_broadcaster:
      type: joint_state_broadcaster/JointStateBroadcaster  # 把关节状态发布到 /joint_states(RViz/MoveIt2 常用)

arm_controller:
  ros__parameters:
    joints:
      # 关节名必须与 URDF 完全一致(拼写/大小写必须一致),否则控制器会报错无法激活
      - joint1
      - joint2
      - joint3
      - joint4
      - joint5
      - joint6
    command_interfaces:
      - position  # 发送给硬件/模拟硬件的“命令接口”(这里以位置控制为例)
    state_interfaces:
      - position  # 从硬件读取的“状态接口”(至少要有 position 才能做状态反馈)
      - velocity  # velocity 可选,但很多控制器/可视化会用到
    allow_nonzero_velocity_at_trajectory_end: true
    # 说明:true 表示允许轨迹结束时速度不严格回到 0(更容易让轨迹顺利结束,但可能带来轻微残余速度)

hand_controller:
  ros__parameters:
    joints:
      - hand_right_joint
    interface_name: position  # hand_controller 将写 hand_right_joint 的 position 接口

robot_arm.urdf 复制为 robot_arm_ros2_control.urdf,在 </robot> 前添加:

<!--
  ros2_control 配置块(FakeSystem):
  - 用 mock_components/GenericSystem 模拟硬件,便于在 RViz/课堂环境先把链路跑通
  - 真机时需要替换为对应硬件驱动插件(HardwareInterface)
-->
<ros2_control name="FakeSystem" type="system">
  <hardware>
    <plugin>mock_components/GenericSystem</plugin>
  </hardware>

  <!-- 每个关节都要声明 command/state 接口;控制器激活时会检查接口是否匹配 -->
  <joint name="joint1">
    <command_interface name="position" />
    <!-- initial_value 用于模拟硬件的初始关节位置 -->
    <state_interface name="position"><param name="initial_value">0</param></state_interface>
    <state_interface name="velocity" />
  </joint>
  <joint name="joint2">
    <command_interface name="position" />
    <state_interface name="position"><param name="initial_value">0</param></state_interface>
    <state_interface name="velocity" />
  </joint>
  <joint name="joint3">
    <command_interface name="position" />
    <state_interface name="position"><param name="initial_value">0</param></state_interface>
    <state_interface name="velocity" />
  </joint>
  <joint name="joint4">
    <command_interface name="position" />
    <state_interface name="position"><param name="initial_value">0</param></state_interface>
    <state_interface name="velocity" />
  </joint>
  <joint name="joint5">
    <command_interface name="position" />
    <state_interface name="position"><param name="initial_value">0</param></state_interface>
    <state_interface name="velocity" />
  </joint>
  <joint name="joint6">
    <command_interface name="position" />
    <state_interface name="position"><param name="initial_value">0</param></state_interface>
    <state_interface name="velocity" />
  </joint>
  <joint name="hand_right_joint">
    <command_interface name="position" />
    <state_interface name="position"><param name="initial_value">0</param></state_interface>
    <state_interface name="velocity" />
  </joint>

  <!-- 说明:hand_left_joint 在 URDF 里是 mimic 关节,通常不作为独立驱动关节,因此这里不单独声明 -->
</ros2_control>

关键解释:

3)准备一个最小启动文件(bringup)

你需要同时启动 robot_state_publisher 与 ros2_control 的 controller_manager,并生成控制器。

示例启动文件 robot_arm_ros2_control.launch.py(放到你选择的包的 launch/ 目录):

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    # 通过 ament 索引找到包的 share 目录(要求 setup.py 已把 urdf/config 安装到 share/<pkg>/)
    pkg_share = get_package_share_directory("robot_arm_description")
    urdf_path = f"{pkg_share}/urdf/robot_arm_ros2_control.urdf"
    controllers_path = f"{pkg_share}/config/arm_controllers.yaml"

    # robot_description 是 URDF 文本;robot_state_publisher、ros2_control_node、MoveIt2 都依赖它
    robot_description = open(urdf_path, encoding="utf-8").read()

    # 发布 TF:基于 robot_description + /joint_states 生成 TF 树
    # 注意:/joint_states 由 joint_state_broadcaster 发布(需要控制器激活)
    robot_state_publisher = Node(
        package="robot_state_publisher",
        executable="robot_state_publisher",
        name="robot_state_publisher",
        output="both",
        parameters=[{"robot_description": robot_description}],
    )

    # controller_manager 节点:负责加载硬件(FakeSystem)并运行控制循环
    # 关键:parameters 同时传 robot_description 与 controllers.yaml(否则控制器无法按 YAML 配置加载)
    ros2_control_node = Node(
        package="controller_manager",
        executable="ros2_control_node",
        output="both",
        parameters=[{"robot_description": robot_description}, controllers_path],
    )

    # spawner:通过 controller_manager 的服务加载并激活控制器
    # 顺序:通常先 joint_state_broadcaster,再 arm_controller/hand_controller(便于先有 /joint_states)
    spawner = Node(
        package="controller_manager",
        executable="spawner",
        arguments=["joint_state_broadcaster", "arm_controller", "hand_controller"],
        output="screen",
    )

    # RViz2:可视化与 MoveIt2 交互(本 launch 不加载固定 rviz 配置,课堂可按需补充 -d)
    rviz2 = Node(
        package="rviz2",
        executable="rviz2",
        output="log",
    )

    return LaunchDescription([robot_state_publisher, ros2_control_node, spawner, rviz2])

工程规范提示(避免“文件找不到/不被安装”):

source /opt/ros/humble/setup.bash
source ~/ros2_ws/install/setup.bash

# 方式 A:launch 放在 robot_arm_description 包内
ros2 launch robot_arm_description robot_arm_ros2_control.launch.py

# 或方式 B:launch 放在 robot_arm_bringup 包内
# ros2 launch robot_arm_bringup robot_arm_ros2_control.launch.py

ros2 control list_controllers
ros2 action list -t | grep FollowJointTrajectory
ros2 topic echo --once /joint_states

2)用 moveit_setup_assistant 生成 MoveIt2 配置包(推荐做法)

适用情况:

快捷复用(可选):

先确认 setup assistant 是否存在(证据):

source /opt/ros/humble/setup.bash
ros2 pkg executables moveit_setup_assistant

逐行解释:

启动 setup assistant(课程资料口径):

ros2 launch moveit_setup_assistant setup_assistant.launch.py

逐行解释:

重要提醒(避免走弯路):

生成配置包后,你需要重点核对 3 件事(否则“能规划但不能动”):

  1. SRDF 的 planning group 与实际关节一致(名称完全匹配)。
  2. TF/robot_state_publisher 正常发布(RViz2 能看到完整模型与关节状态变化)。

1)启动 demo(示例口径)

source /opt/ros/humble/setup.bash
source ~/ros2_ws/install/setup.bash
ros2 launch robot_arm_config demo.launch.py

逐行解释:

结论先行:

你必须用证据判断 demo.launch.py 是否已经负责启动 ros2_control:

# 证据 1:是否存在 controller_manager 节点
ros2 node list | grep -E "controller_manager|ros2_control"

# 证据 2:是否能看到控制器 active
ros2 control list_controllers

(可选)直接检查 demo.launch.py 是否包含 ros2_control 相关节点(证据更直观):

pkg_prefix=$(ros2 pkg prefix robot_arm_config)
launch_dir="$pkg_prefix/share/robot_arm_config/launch"
grep -R -n -E "ros2_control_node|controller_manager|spawner" "$launch_dir" | head

逐行解释:

1.2)两种组合方式(demo 是否带 ros2_control,分别怎么做)

方式 A(推荐):demo.launch.py 已经带 ros2_control

方式 B:demo.launch.py 不带 ros2_control(或带但启动失败)

组合 launch 示例 robot_arm_moveit_with_control.launch.py(示例片段,按你包名调整):

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch_ros.actions import Node

def generate_launch_description():
    demo_pkg = get_package_share_directory("robot_arm_config")
    demo_launch = PythonLaunchDescriptionSource(f"{demo_pkg}/launch/demo.launch.py")

    desc_pkg = get_package_share_directory("robot_arm_description")
    urdf_path = f"{desc_pkg}/urdf/robot_arm_ros2_control.urdf"
    controllers_path = f"{desc_pkg}/config/arm_controllers.yaml"
    robot_description = open(urdf_path, encoding="utf-8").read()

    ros2_control_node = Node(
        package="controller_manager",
        executable="ros2_control_node",
        output="both",
        parameters=[{"robot_description": robot_description}, controllers_path],
    )

    spawner = Node(
        package="controller_manager",
        executable="spawner",
        arguments=["joint_state_broadcaster", "arm_controller", "hand_controller"],
        output="screen",
    )

    return LaunchDescription(
        [
            IncludeLaunchDescription(demo_launch),
            ros2_control_node,
            spawner,
        ]
    )

组合方式的关键点(必须满足,否则会“规划能跑但执行失败/或系统冲突”):

2)用 CLI 给出“规划链路存在”的证据(推荐做法)

ros2 action list | grep -E "follow_joint_trajectory|move_group|execute"

逐行解释:

你要验证两件事:控制器是否在跑、MoveIt2 的轨迹是否能“落到控制器”。

1)控制器状态(证据):

ros2 control list_controllers

逐行解释:

2)关节状态是否发布(证据):

ros2 topic list -t | grep joint_states
ros2 topic echo --once /joint_states

逐行解释:

3)轨迹执行接口是否存在(证据):

ros2 action list -t | grep -E "FollowJointTrajectory|ExecuteTrajectory"

逐行解释:

  1. 写出你 demo 的 planning group 名称与 end-effector link 名称(用 RViz2 / 参数 / 文档查证据,不允许拍脑袋)。
  2. 让机械臂做一次“关节空间目标”规划并截图(只要规划成功即可,是否真实执行以你环境为准)。

1)抓取位姿的三段式(建议统一口径)

2)坐标系与姿态:你必须能回答的三个问题

  1. 预置方块的抓取位姿在哪个坐标系下定义?(建议统一为 base_link
  2. 末端执行器目标姿态用哪个 link 表达?(ee_link / tool0,以你的 MoveIt2 配置为证据)
  3. 抓取位姿偏移(pregrasp/retreat)是沿哪个方向定义的?(建议沿末端坐标系的接近轴,而不是 world 固定轴)

输入:

输出:

示例:用“沿物体法向方向”做预抓取偏移(概念示意)

T_base_pregrasp = T_base_grasp ⊕ translate(-d_pre, 0, 0)  (在夹爪自身坐标系下做偏移)
T_base_retreat  = T_base_grasp ⊕ translate(+d_retreat, 0, 0)

解释要点:

4)姿态规划难点清单(遇到就按证据排查)

1)接口建议(与前序“服务/动作选型”衔接)

本采用“品质 Topic 输入 + 对外 PickPlace Action(核心任务接口)+ 状态 Topic 输出 + 可选使能服务”的口径:

2)创建控制包(Python 版本,最小可跑通骨架)

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_arm_interfaces --build-type ament_cmake
ros2 pkg create sorting_arm_control --build-type ament_python --dependencies rclpy std_srvs std_msgs apple_quality_msgs control_msgs trajectory_msgs sorting_arm_interfaces

逐行解释:

sorting_arm_interfaces/action/ 下创建动作文件 PickPlace.action(示例):

string grade
---
bool success
int32 request_id
string message
---
int32 request_id
string phase
string message

补齐接口包的构建配置(否则 action 不会生成):

1)sorting_arm_interfaces/package.xml 关键依赖(示例):

<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

2)sorting_arm_interfaces/CMakeLists.txt 生成 action(示例):

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "action/PickPlace.action"
)

ament_export_dependencies(rosidl_default_runtime)
ament_package()

构建后你应能看到(证据):

ros2 interface show sorting_arm_interfaces/action/PickPlace
ros2 action list -t | grep pick_place

建议文件路径(你需要把代码放到这里,ros2 run 才能找到):

入口点配置(~/ros2_ws/src/sorting_arm_control/setup.py,把可执行名与 Python 模块绑定):

entry_points={
    'console_scripts': [
        'vision_pick_place_controller = sorting_arm_control.vision_pick_place_controller:main',
    ],
},

逐段解释:

3)视觉分拣控制节点骨架(重点看“状态闭环”和“证据输出”)

说明:下面这份骨架对应上文“路线 A(保底、变量最少)”——直接调用 ros2_control 控制器接口完成定点抓取闭环;如果你要按《11. 机械臂物品抓取仿真》的思路把 MoveIt2 引入执行,请看后面的“3.2)用 MoveIt2 执行预置关节目标”。

import json
import time

import rclpy
from rclpy.action import ActionClient, ActionServer, CancelResponse, GoalResponse
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup, ReentrantCallbackGroup
from rclpy.duration import Duration
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from std_srvs.srv import SetBool
from std_msgs.msg import String, Float64MultiArray

from apple_quality_msgs.msg import AppleQualityArray
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
from sorting_arm_interfaces.action import PickPlace

class VisionPickPlaceController(Node):
    def __init__(self):
        super().__init__('vision_pick_place_controller')

        self._cb_reentrant = ReentrantCallbackGroup()
        self._cb_enable_service = MutuallyExclusiveCallbackGroup()
        self._cb_auto_client = MutuallyExclusiveCallbackGroup()

        # 使能开关(急停/防误触发):data=false 时不启动新任务
        self._enable_srv = self.create_service(SetBool, '/sorting/arm/enable', self.on_enable, callback_group=self._cb_enable_service)

        # 订阅视觉品质结果(来自 YOLOv8 示例中的 /sorting/perception/apple_quality)
        # QoS 这里用默认 depth=10(默认够用);工程里可根据频率/可靠性调整 QoS
        self._quality_sub = self.create_subscription(AppleQualityArray, '/sorting/perception/apple_quality', self.on_quality, 10)

        # 状态发布:用 JSON 字符串承载结构化字段(工程更推荐自定义 msg)
        self._state_pub = self.create_publisher(String, '/sorting/arm/state', 10)

        # 对外任务动作:抓取放置任务应采用 Action(可反馈/可取消/可复盘)
        self.declare_parameter('pick_place_action_name', '/sorting/arm/pick_place')
        self.declare_parameter('auto_goal_from_vision', True)
        self._pick_place_action_name = str(self.get_parameter('pick_place_action_name').value)
        self._auto_goal_from_vision = bool(self.get_parameter('auto_goal_from_vision').value)

        # 运行状态
        self._enabled = True
        self._busy = False

        # 证据字段:seq 用于全局递增序号,request_id 用于串联同一次任务的多条状态
        self._seq = 0
        self._request_id = 0

        # 最新视觉等级(grade)缓存:课堂用“取第一颗苹果”简化
        self._latest_grade = None

        # 过期窗口:如果视觉消息太旧,就不触发(避免网络抖动/旧结果导致误动作)
        self._stale_sec = 1.0
        self._last_quality_time = time.time()

        # 运动执行接口(ros2_control)
        # 说明:JointTrajectoryController 默认提供 action:/<controller_name>/follow_joint_trajectory
        self._arm_action = ActionClient(self, FollowJointTrajectory, '/arm_controller/follow_joint_trajectory', callback_group=self._cb_reentrant)
        self._hand_pub = self.create_publisher(Float64MultiArray, '/hand_controller/commands', 10)

        self._pick_place_server = ActionServer(
            self,
            PickPlace,
            self._pick_place_action_name,
            execute_callback=self.execute_pick_place,
            goal_callback=self.on_goal,
            cancel_callback=self.on_cancel,
            callback_group=self._cb_reentrant,
        )
        self._auto_goal_client = ActionClient(self, PickPlace, self._pick_place_action_name, callback_group=self._cb_auto_client)

        # 定点抓取参数(课堂要求:至少做到“按 grade 去不同箱位”的定点抓取)
        # 关节顺序必须与控制器 joints 配置一致(示例为 joint1..joint6)
        self._arm_joint_names = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']

        # 你需要把这些关节角替换为你自己的“抓取点/箱位点”(从 RViz 执行后读取 /joint_states 获取证据)
        self._ready_joints = [0.0, -0.3643, -0.6419, -0.8154, -0.9541, -1.6828]
        self._pick_joints = [0.0, -0.35, -0.70, -0.70, -1.10, 1.55]
        self._bin_joints_by_grade = {
            "A": [0.4, -0.2, -0.8, -0.6, -1.1, 1.55],
            "B": [0.0, -0.2, -0.8, -0.6, -1.1, 1.55],
            "C": [-0.4, -0.2, -0.8, -0.6, -1.1, 1.55],
        }

        self._arm_move_time_sec = 2.0
        self._gripper_open_pos = 0.0
        self._gripper_close_pos = 0.05

    def publish_state(self, request_id: int, phase: str, ok: bool, message: str, grade: str | None = None):
        # 统一状态输出口径:seq/request_id/ts/phase/ok/message/grade
        # 其中 phase 用于“状态机阶段”,ok 用于快速判断成功/失败
        self._seq += 1
        payload = {
            "seq": self._seq,
            "request_id": request_id,
            "ts": time.time(),
            "phase": phase,
            "ok": ok,
            "message": message,
            "grade": grade,
        }
        msg = String()
        msg.data = json.dumps(payload, ensure_ascii=False)
        self._state_pub.publish(msg)
        self.get_logger().info(msg.data)

    def publish_feedback(self, goal_handle, request_id: int, phase: str, message: str):
        fb = PickPlace.Feedback()
        fb.request_id = int(request_id)
        fb.phase = phase
        fb.message = message
        goal_handle.publish_feedback(fb)

    def on_goal(self, goal_request) -> GoalResponse:
        grade = str(goal_request.grade).strip().upper()
        if not grade:
            return GoalResponse.REJECT
        if not self._enabled or self._busy:
            return GoalResponse.REJECT
        return GoalResponse.ACCEPT

    def on_cancel(self, goal_handle) -> CancelResponse:
        return CancelResponse.ACCEPT

    def send_auto_goal(self, grade: str):
        if not self._auto_goal_client.wait_for_server(timeout_sec=1.0):
            return
        goal = PickPlace.Goal()
        goal.grade = grade
        self._auto_goal_client.send_goal_async(goal)

    def send_arm_joint_goal(self, request_id: int, joints: list[float], phase: str, grade: str):
        if len(joints) != len(self._arm_joint_names):
            raise ValueError("joint length mismatch")

        if not self._arm_action.wait_for_server(timeout_sec=2.0):
            raise RuntimeError("arm_controller action server not available")

        goal = FollowJointTrajectory.Goal()
        goal.trajectory.joint_names = list(self._arm_joint_names)

        point = JointTrajectoryPoint()
        point.positions = list(joints)
        point.time_from_start = Duration(seconds=float(self._arm_move_time_sec)).to_msg()
        goal.trajectory.points = [point]

        self.publish_state(request_id, phase, True, f"send FollowJointTrajectory: {joints}", grade=grade)

        send_future = self._arm_action.send_goal_async(goal)
        t0 = time.time()
        while rclpy.ok() and not send_future.done():
            if (time.time() - t0) > 3.0:
                raise TimeoutError("trajectory goal response timeout")
            time.sleep(0.01)

        arm_goal_handle = send_future.result()
        if not arm_goal_handle.accepted:
            raise RuntimeError("trajectory goal rejected")

        result_future = arm_goal_handle.get_result_async()
        t1 = time.time()
        while rclpy.ok() and not result_future.done():
            if (time.time() - t1) > 10.0:
                raise TimeoutError("trajectory result timeout")
            time.sleep(0.02)

        result = result_future.result()
        status = getattr(result, "status", None)
        self.publish_state(request_id, phase, True, f"trajectory done status={status}", grade=grade)

    def set_gripper(self, request_id: int, position: float, phase: str, grade: str):
        msg = Float64MultiArray()
        msg.data = [float(position)]
        self._hand_pub.publish(msg)
        self.publish_state(request_id, phase, True, f"hand_controller command={position}", grade=grade)

    def on_enable(self, request, response):
        # SetBool.srv:request.data 为 bool
        self._enabled = bool(request.data)
        response.success = True
        response.message = "enabled" if self._enabled else "disabled"
        # 注意:这里用当前 request_id 输出 enable 事件,仅作为证据;不代表启动/结束任务
        self.publish_state(self._request_id, "enable", True, response.message)
        return response

    def on_quality(self, msg: AppleQualityArray):
        # 视觉消息里可能包含多个苹果;课堂先取第一个作为决策输入
        if not msg.apples:
            return
        grade = msg.apples[0].grade.strip().upper()
        if not grade:
            return
        self._latest_grade = grade
        self._last_quality_time = time.time()
        if self._auto_goal_from_vision:
            self.maybe_start_job()

    def maybe_start_job(self):
        # 触发条件:已使能、当前不忙、grade 非空、视觉数据未过期
        if not self._enabled or self._busy:
            return
        if not self._latest_grade:
            return
        if (time.time() - self._last_quality_time) > self._stale_sec:
            return
        self.send_auto_goal(self._latest_grade)

    def execute_pick_place(self, goal_handle):
        grade = str(goal_handle.request.grade).strip().upper()
        self._busy = True
        self._request_id += 1
        request_id = self._request_id

        result = PickPlace.Result()
        result.request_id = int(request_id)

        try:
            self.publish_state(request_id, "start", True, "job started", grade=grade)
            self.publish_feedback(goal_handle, request_id, "start", "job started")

            if grade not in self._bin_joints_by_grade:
                raise ValueError(f"unknown grade: {grade}")

            self.publish_feedback(goal_handle, request_id, "ready", "move to ready")
            self.send_arm_joint_goal(request_id, self._ready_joints, "ready", grade)
            self.set_gripper(request_id, self._gripper_open_pos, "open_gripper", grade)

            self.publish_feedback(goal_handle, request_id, "grasp", "move to pick")
            self.send_arm_joint_goal(request_id, self._pick_joints, "grasp", grade)
            self.set_gripper(request_id, self._gripper_close_pos, "close_gripper", grade)

            self.publish_feedback(goal_handle, request_id, "place", f"move to bin {grade}")
            place_joints = self._bin_joints_by_grade[grade]
            self.send_arm_joint_goal(request_id, place_joints, "place", grade)
            self.set_gripper(request_id, self._gripper_open_pos, "release", grade)

            self.send_arm_joint_goal(request_id, self._ready_joints, "done", grade)

            goal_handle.succeed()
            result.success = True
            result.message = "ok"
            return result
        except Exception as e:
            msg = f"{type(e).__name__}: {e}"
            self.publish_state(request_id, "error", False, msg, grade=grade)
            self.publish_feedback(goal_handle, request_id, "error", msg)
            goal_handle.abort()
            result.success = False
            result.message = msg
            return result
        finally:
            self._busy = False

def main(args=None):
    rclpy.init(args=args)
    node = VisionPickPlaceController()
    executor = MultiThreadedExecutor(num_threads=4)
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

逐段解释(你要能口头讲清):

重要提醒:

思路:用 MoveIt2 demo(RViz2)把机械臂执行到目标姿态,然后把 /joint_states 中的关节角记录下来作为你的 pick_joints / bins_joints.(A|B|C)

1)启动 demo(示例):

source /opt/ros/humble/setup.bash
ros2 launch robot_arm_config demo.launch.py

2)在 RViz2 中执行到你想要的“抓取点/箱位点”后,读取一次关节状态(证据):

ros2 topic echo --once /joint_states

3)按控制器关节顺序整理:

3.2)用 MoveIt2 执行“预置关节目标”(推荐实现,与《11. 机械臂物品抓取仿真》一致)

前置条件(证据化):

ros2 node list | grep move_group
ros2 control list_controllers

环境证据(能 import 到 MoveItPy 才算安装与 Python API 可用):

source /opt/ros/humble/setup.bash
python3 -c "from moveit.planning import MoveItPy; print('MoveItPy OK')"

核心思路(示例片段,仅展示关键 API 形态):

from moveit.core.robot_state import RobotState
from moveit.planning import MoveItPy

robot = MoveItPy(node_name="sorting_moveit_py")
arm = robot.get_planning_component("arm")
hand = robot.get_planning_component("hand")

def plan_and_execute_joint_goal(joints: list[float]):
    robot_state = RobotState(robot.get_robot_model())
    robot_state.set_joint_group_positions("arm", joints)
    arm.set_start_state_to_current_state()
    arm.set_goal_state(robot_state=robot_state)
    plan_result = arm.plan()
    if not plan_result:
        raise RuntimeError("arm plan failed")
    robot.execute(plan_result.trajectory, controllers=[])

def plan_and_execute_named_hand_state(name: str):
    hand.set_start_state_to_current_state()
    hand.set_goal_state(name)  # 例如 "open"/"close"
    plan_result = hand.plan()
    if not plan_result:
        raise RuntimeError("hand plan failed")
    robot.execute(plan_result.trajectory, controllers=[])

把它接到本讲的“视觉分拣”闭环里:

把“预置关节目标”升级为 MoveIt2 pose 目标真规划执行(进阶,按证据对齐接口):

1)先查你当前 bringup 暴露了哪些接口(不要凭感觉写死):

ros2 node list | grep move
ros2 action list -t | grep -E "MoveGroup|ExecuteTrajectory|FollowJointTrajectory"
ros2 service list | grep -E "plan|compute|ik|fk"

逐行解释:

2)推荐的工程落地方式(更稳定、对学生更友好):

编译与启动:

cd ~/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run sorting_arm_control vision_pick_place_controller \
  --ros-args --params-file ~/ros2_ws/src/sorting_arm_control/config/vision_pick_place_controller.yaml

逐行解释:

状态订阅(反馈证据):

ros2 topic echo /sorting/arm/state

动作接口证据(对外 Action):

ros2 action list -t | grep pick_place
ros2 interface show sorting_arm_interfaces/action/PickPlace

手动触发一次抓取放置(不依赖视觉 topic 自动触发):

ros2 action send_goal /sorting/arm/pick_place sorting_arm_interfaces/action/PickPlace "{grade: 'B'}" --feedback

取消任务(证据,Action 具备 cancel 语义):

ros2 action cancel /sorting/arm/pick_place

发布测试检测结果(用你自己的手发一条“视觉检测”来验证闭环):

ros2 topic pub --once /sorting/perception/apple_quality apple_quality_msgs/msg/AppleQualityArray "{
  header: {frame_id: 'camera_link'},
  apples: [
    {
      grade: 'A',
      diameter_mm: 75.0,
      red_ratio: 0.85,
      defect_score: 0.10
    }
  ]
}"

逐段解释:

可选:禁用/启用控制节点(用于急停/防误触发):

ros2 service call /sorting/arm/enable std_srvs/srv/SetBool "{data: false}"
ros2 service call /sorting/arm/enable std_srvs/srv/SetBool "{data: true}"

逐行解释:

作业

1)提交 MoveIt2 环境与 bringup 运行证据:终端关键输出 + RViz2 截图(包含机器人模型与 Planning Scene)。

2)提交“规划与执行链路存在”的证据(至少包含两类):

并用 120 字左右说明:为什么仅有“能规划”不等于“能执行到硬件/仿真”。

3)提交视觉分拣控制节点代码(最小闭环版):

4)提交 1 份视觉品质证据与说明:

5)提交一次失败排查记录(200–300 字):按“现象→证据→定位→修复→复测”结构。问题可选:

必须包含证据输出或截图(日志/ros2 CLI/RViz2 任一即可)。