14 ROS2 理论课-AI 大模型辅助开发通信节点:生成、审计、优化与故障分析

ROS2 AI 大模型辅助开发通信节点:生成、审计、优化与故障分析

关联:索引

  1. 场景提问:同样是“发消息”,为什么 Topic/Service/Action 不能混用?为什么“能运行”不等价于“业务正确”?

1. 需求描述 → 代码生成 → 审计优化 → 测试验证(四步闭环)

你是 ROS2 Humble 通信节点开发专家。请基于以下业务生成可运行的 ROS2 Python 节点代码,并给出运行/测试命令。

业务背景:分拣系统
模块:
- perception_node:发布检测结果 Topic
- dispatcher_node:提供任务分配 Service
- arm_action_server:提供机械臂执行 Action(可反馈/可取消)
约束:
0)实现范围:只实现 1 条最小闭环链路(Topic + Service + Action),不要扩展其它接口或额外节点。
1)命名统一:topic/service/action 名称遵循 /sorting/<module>/<name>
2)接口契约必须严格对齐上节课自定义接口(不得自造字段/类型):
   - Topic:/sorting/perception/detections 使用 ros2_comm_design_interfaces/msg/DetectedItemArray
     - 必填字段:stamp, task_id, frame_id, items[]
     - items[] 元素类型 DetectedItem(至少填 stamp, task_id, class_id, score, pose)
   - Service:/sorting/dispatcher/assign_task 使用 ros2_comm_design_interfaces/srv/AssignTask
     - Request:request_id, task_id, target_class, target_count
     - Response:ok, task_id, error_code, message
   - Action:/sorting/arm/execute_path 使用 ros2_comm_design_interfaces/action/ExecuteArmPath
     - Goal:task_id, path_id, speed, timeout_sec
     - Result:ok, error_code, message, final_state(final_state 需对齐 action_msgs/msg/GoalStatus 的 STATUS_* 语义)
     - Feedback:progress(0.0~1.0), current_step, safety_state
3)每条交互必须可验收:给出 ros2 CLI 测试命令与期望关键输出(至少 Topic/Service/Action 各 1 条)
4)必须处理异常:Service 参数校验失败;Action 取消与超时(timeout_sec 生效);失败必须返回 error_code!=0 且 message 可解释
5)日志要可定位:关键路径打印 task_id/request_id 与 error_code(包含正常与异常路径)
6)性能与 QoS:Topic 使用 qos_profile_sensor_data;发布频率 30Hz(约 0.033s)

输出要求:
- 给出 3 个节点的完整代码(可放在一个包内的多个文件)
- 给出 package 结构建议(minimal)
- 给出如何运行每个节点的命令
- 给出 Topic/Service/Action 的测试命令(至少各 1 条)

1.1 接口契约绑定(来自上节课,先对齐再生成)

本讲不再“现场编接口字段”,而是直接复用上节课的接口契约,并要求 AI 生成代码时严格按字段与类型实现:

colcon build --packages-select ros2_comm_design_interfaces
source install/setup.bash
ros2 interface show ros2_comm_design_interfaces/msg/DetectedItemArray
ros2 interface show ros2_comm_design_interfaces/srv/AssignTask
ros2 interface show ros2_comm_design_interfaces/action/ExecuteArmPath
ros2_ws/src/ros2_comm_ai_nodes/
  package.xml
  setup.py
  setup.cfg
  resource/ros2_comm_ai_nodes
  ros2_comm_ai_nodes/__init__.py
  ros2_comm_ai_nodes/perception_node.py
  ros2_comm_ai_nodes/dispatcher_node.py
  ros2_comm_ai_nodes/arm_action_server.py

创建包与编译(示例命令)

cd ~/ros2_ws/src
ros2 pkg create ros2_comm_ai_nodes --build-type ament_python --dependencies rclpy geometry_msgs action_msgs ros2_comm_design_interfaces
cd ~/ros2_ws
colcon build --packages-select ros2_comm_design_interfaces ros2_comm_ai_nodes
source install/setup.bash

1.3 可复制运行代码(基于上节课接口)

setup.py

from setuptools import setup

package_name = "ros2_comm_ai_nodes"

setup(
    name=package_name,
    version="0.0.0",
    packages=[package_name],
    data_files=[
        ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
        ("share/" + package_name, ["package.xml"]),
    ],
    install_requires=["setuptools"],
    zip_safe=True,
    entry_points={
        "console_scripts": [
            "perception_node = ros2_comm_ai_nodes.perception_node:main",
            "dispatcher_node = ros2_comm_ai_nodes.dispatcher_node:main",
            "arm_action_server = ros2_comm_ai_nodes.arm_action_server:main",
        ],
    },
)

setup.cfg

[develop]
script_dir=$base/lib/ros2_comm_ai_nodes
[install]
install_scripts=$base/lib/ros2_comm_ai_nodes

package.xml(最小依赖)

<?xml version="1.0"?>
<package format="3">
  <name>ros2_comm_ai_nodes</name>
  <version>0.0.0</version>
  <description>AI assisted ROS2 communication nodes based on ros2_comm_design_interfaces</description>
  <maintainer email="[email protected]">teacher</maintainer>
  <license>Apache-2.0</license>

  <exec_depend>rclpy</exec_depend>
  <exec_depend>geometry_msgs</exec_depend>
  <exec_depend>action_msgs</exec_depend>
  <exec_depend>ros2_comm_design_interfaces</exec_depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

ros2_comm_ai_nodes/perception_node.py(Topic 发布:DetectedItemArray)

import random

import rclpy
from geometry_msgs.msg import Pose
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data

from ros2_comm_design_interfaces.msg import DetectedItem, DetectedItemArray

class PerceptionNode(Node):
    def __init__(self) -> None:
        super().__init__("perception_node")
        self._publisher = self.create_publisher(
            DetectedItemArray,
            "/sorting/perception/detections",
            qos_profile_sensor_data,
        )
        self._seq = 0
        self._timer = self.create_timer(0.033, self._on_timer)

    def _on_timer(self) -> None:
        self._seq += 1
        msg = DetectedItemArray()
        msg.stamp = self.get_clock().now().to_msg()
        msg.task_id = f"detect_{self._seq:06d}"
        msg.frame_id = "camera_link"

        item = DetectedItem()
        item.stamp = msg.stamp
        item.task_id = msg.task_id
        item.class_id = "apple"
        item.score = float(random.random())

        pose = Pose()
        pose.position.x = float(random.uniform(0.2, 0.6))
        pose.position.y = float(random.uniform(-0.2, 0.2))
        pose.position.z = float(random.uniform(0.1, 0.4))
        pose.orientation.w = 1.0
        item.pose = pose

        msg.items = [item]
        self._publisher.publish(msg)
        if self._seq % 30 == 0:
            self.get_logger().info(f"published detections task_id={msg.task_id} frame_id={msg.frame_id} items={len(msg.items)}")

def main() -> None:
    rclpy.init()
    node = PerceptionNode()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

ros2_comm_ai_nodes/dispatcher_node.py(Service 服务:AssignTask)

import uuid

import rclpy
from rclpy.node import Node

from ros2_comm_design_interfaces.srv import AssignTask

class DispatcherNode(Node):
    def __init__(self) -> None:
        super().__init__("dispatcher_node")
        self._service = self.create_service(
            AssignTask,
            "/sorting/dispatcher/assign_task",
            self._handle_assign_task,
        )

    def _handle_assign_task(self, request: AssignTask.Request, response: AssignTask.Response) -> AssignTask.Response:
        if not request.request_id.strip():
            response.ok = False
            response.task_id = request.task_id
            response.error_code = 2003
            response.message = "request_id is empty"
            self.get_logger().error(f"assign_task failed request_id=<empty> task_id={request.task_id} error_code={response.error_code}")
            return response

        if not request.target_class.strip():
            response.ok = False
            response.task_id = request.task_id
            response.error_code = 2001
            response.message = "target_class is empty"
            self.get_logger().error(f"assign_task failed request_id={request.request_id} task_id={request.task_id} error_code={response.error_code}")
            return response

        if request.target_count <= 0:
            response.ok = False
            response.task_id = request.task_id
            response.error_code = 2002
            response.message = "target_count must be > 0"
            self.get_logger().error(f"assign_task failed request_id={request.request_id} task_id={request.task_id} error_code={response.error_code}")
            return response

        task_id = request.task_id.strip()
        if not task_id:
            task_id = f"t_{uuid.uuid4().hex[:8]}"

        response.ok = True
        response.task_id = task_id
        response.error_code = 0
        response.message = "assigned"
        self.get_logger().info(
            f"assign_task ok request_id={request.request_id} task_id={response.task_id} target_class={request.target_class} target_count={request.target_count}"
        )
        return response

def main() -> None:
    rclpy.init()
    node = DispatcherNode()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

ros2_comm_ai_nodes/arm_action_server.py(Action 服务:ExecuteArmPath)

import time

import rclpy
from action_msgs.msg import GoalStatus
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node

from ros2_comm_design_interfaces.action import ExecuteArmPath

class ArmActionServer(Node):
    def __init__(self) -> None:
        super().__init__("arm_action_server")
        self._action_server = ActionServer(
            self,
            ExecuteArmPath,
            "/sorting/arm/execute_path",
            execute_callback=self._execute,
            goal_callback=self._goal_callback,
            cancel_callback=self._cancel_callback,
        )

    def _goal_callback(self, goal_request: ExecuteArmPath.Goal) -> GoalResponse:
        return GoalResponse.ACCEPT

    def _cancel_callback(self, goal_handle) -> CancelResponse:
        return CancelResponse.ACCEPT

    def _execute(self, goal_handle):
        goal = goal_handle.request

        if not goal.task_id.strip():
            goal_handle.abort()
            result = ExecuteArmPath.Result()
            result.ok = False
            result.error_code = 4005
            result.message = "task_id is empty"
            result.final_state = GoalStatus.STATUS_ABORTED
            return result

        if not goal.path_id.strip():
            goal_handle.abort()
            result = ExecuteArmPath.Result()
            result.ok = False
            result.error_code = 4006
            result.message = "path_id is empty"
            result.final_state = GoalStatus.STATUS_ABORTED
            return result

        if goal.speed <= 0.0 or goal.speed > 1.0:
            goal_handle.abort()
            result = ExecuteArmPath.Result()
            result.ok = False
            result.error_code = 4001
            result.message = "invalid speed (expected 0.0 < speed <= 1.0)"
            result.final_state = GoalStatus.STATUS_ABORTED
            return result

        if goal.timeout_sec <= 0.0:
            goal_handle.abort()
            result = ExecuteArmPath.Result()
            result.ok = False
            result.error_code = 4004
            result.message = "invalid timeout_sec (expected > 0)"
            result.final_state = GoalStatus.STATUS_ABORTED
            return result

        steps = 10
        step_sleep = 0.2
        start_time = time.time()
        self.get_logger().info(f"execute_path start task_id={goal.task_id} path_id={goal.path_id} speed={goal.speed} timeout_sec={goal.timeout_sec}")

        feedback = ExecuteArmPath.Feedback()
        for i in range(steps):
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result = ExecuteArmPath.Result()
                result.ok = False
                result.error_code = 4002
                result.message = "canceled"
                result.final_state = GoalStatus.STATUS_CANCELED
                self.get_logger().warn(f"execute_path canceled task_id={goal.task_id} path_id={goal.path_id} error_code={result.error_code}")
                return result

            if (time.time() - start_time) > float(goal.timeout_sec):
                goal_handle.abort()
                result = ExecuteArmPath.Result()
                result.ok = False
                result.error_code = 4003
                result.message = "timeout"
                result.final_state = GoalStatus.STATUS_ABORTED
                self.get_logger().error(f"execute_path timeout task_id={goal.task_id} path_id={goal.path_id} error_code={result.error_code}")
                return result

            feedback.progress = float(i + 1) / float(steps)
            feedback.current_step = "EXECUTING"
            feedback.safety_state = 0
            goal_handle.publish_feedback(feedback)
            time.sleep(step_sleep)

        goal_handle.succeed()
        result = ExecuteArmPath.Result()
        result.ok = True
        result.error_code = 0
        result.message = "succeeded"
        result.final_state = GoalStatus.STATUS_SUCCEEDED
        self.get_logger().info(f"execute_path succeeded task_id={goal.task_id} path_id={goal.path_id}")
        return result

def main() -> None:
    rclpy.init()
    node = ArmActionServer()
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    try:
        executor.spin()
    finally:
        node.destroy_node()
        rclpy.shutdown()
source ~/ros2_ws/install/setup.bash

ros2 run ros2_comm_ai_nodes perception_node

新终端:

source ~/ros2_ws/install/setup.bash
ros2 topic echo /sorting/perception/detections --once

再开终端运行服务端并调用:

source ~/ros2_ws/install/setup.bash
ros2 run ros2_comm_ai_nodes dispatcher_node

新终端:

source ~/ros2_ws/install/setup.bash
ros2 service call /sorting/dispatcher/assign_task ros2_comm_design_interfaces/srv/AssignTask "{request_id: 'r1', task_id: '', target_class: 'apple', target_count: 3}"

再开终端运行 Action 服务端并测试:

source ~/ros2_ws/install/setup.bash
ros2 run ros2_comm_ai_nodes arm_action_server

新终端:

source ~/ros2_ws/install/setup.bash
ros2 action send_goal /sorting/arm/execute_path ros2_comm_design_interfaces/action/ExecuteArmPath "{task_id: 't001', path_id: 'p001', speed: 0.5, timeout_sec: 10.0}" --feedback

2. AI 输出检查点(拿到初稿先看什么)

任务:以小组自选题项目为主(无项目则用分拣系统),每组只做 1 条最小闭环链路,完成“需求描述 → AI 生成代码初稿”。

1. 语法规范审计(能跑与可维护)

2. 逻辑正确性审计(能跑但不对)

3. 业务适配性审计(对不对项目)

1. 最小测试矩阵(必须跑)

对应命令参考(示例口径)

# Service 参数错误
ros2 service call /sorting/dispatcher/assign_task ros2_comm_design_interfaces/srv/AssignTask "{request_id: 'r_bad', task_id: '', target_class: 'apple', target_count: 0}"

# Action 超时(timeout_sec 小于总执行时间)
ros2 action send_goal /sorting/arm/execute_path ros2_comm_design_interfaces/action/ExecuteArmPath "{task_id: 't_timeout', path_id: 'p001', speed: 0.5, timeout_sec: 0.5}" --feedback

2. 推荐命令(示例口径)

现象描述 → 收集证据(命令输出/日志)→ 提出假设(可能原因)→ 验证(补充命令/对比)→ 修复 → 回归验证

2. 常见故障与证据点(速查)

3. 故障定位提示词模板(可直接复制)

我遇到 ROS2 Humble 通信故障,请按“现象→证据→假设→验证→修复→回归”的结构回答,不要只给猜测。

现象:
- (例如)topic echo 收不到 /sorting/perception/detections

证据:
- ros2 node list 输出:...
- ros2 topic list -t 输出:...
- 节点日志关键行:...

约束:
- 我使用的是 ROS2 Humble
- 需要给出具体可执行的验证命令
- 修复建议要落到具体改动点(代码/命令/配置)