14 ROS2 理论课-AI 大模型辅助开发通信节点:生成、审计、优化与故障分析
ROS2 AI 大模型辅助开发通信节点:生成、审计、优化与故障分析
关联:索引
- 场景提问:同样是“发消息”,为什么 Topic/Service/Action 不能混用?为什么“能运行”不等价于“业务正确”?
1. 需求描述 → 代码生成 → 审计优化 → 测试验证(四步闭环)
- 代码生成:让 AI 输出“文件结构 + 关键代码 + 运行命令 + 测试命令”。
- 审计优化:按清单逐项过(语法规范、逻辑正确性、业务适配性)。
你是 ROS2 Humble 通信节点开发专家。请基于以下业务生成可运行的 ROS2 Python 节点代码,并给出运行/测试命令。
业务背景:分拣系统
模块:
- perception_node:发布检测结果 Topic
- dispatcher_node:提供任务分配 Service
- arm_action_server:提供机械臂执行 Action(可反馈/可取消)
约束:
0)实现范围:只实现 1 条最小闭环链路(Topic + Service + Action),不要扩展其它接口或额外节点。
1)命名统一:topic/service/action 名称遵循 /sorting/<module>/<name>
2)接口契约必须严格对齐上节课自定义接口(不得自造字段/类型):
- Topic:/sorting/perception/detections 使用 ros2_comm_design_interfaces/msg/DetectedItemArray
- 必填字段:stamp, task_id, frame_id, items[]
- items[] 元素类型 DetectedItem(至少填 stamp, task_id, class_id, score, pose)
- Service:/sorting/dispatcher/assign_task 使用 ros2_comm_design_interfaces/srv/AssignTask
- Request:request_id, task_id, target_class, target_count
- Response:ok, task_id, error_code, message
- Action:/sorting/arm/execute_path 使用 ros2_comm_design_interfaces/action/ExecuteArmPath
- Goal:task_id, path_id, speed, timeout_sec
- Result:ok, error_code, message, final_state(final_state 需对齐 action_msgs/msg/GoalStatus 的 STATUS_* 语义)
- Feedback:progress(0.0~1.0), current_step, safety_state
3)每条交互必须可验收:给出 ros2 CLI 测试命令与期望关键输出(至少 Topic/Service/Action 各 1 条)
4)必须处理异常:Service 参数校验失败;Action 取消与超时(timeout_sec 生效);失败必须返回 error_code!=0 且 message 可解释
5)日志要可定位:关键路径打印 task_id/request_id 与 error_code(包含正常与异常路径)
6)性能与 QoS:Topic 使用 qos_profile_sensor_data;发布频率 30Hz(约 0.033s)
输出要求:
- 给出 3 个节点的完整代码(可放在一个包内的多个文件)
- 给出 package 结构建议(minimal)
- 给出如何运行每个节点的命令
- 给出 Topic/Service/Action 的测试命令(至少各 1 条)
- 本讲仅实现 1 条最小闭环链路(方案 A):用尽量少的代码把 Topic/Service/Action 跑通,并把“生成→审计→优化→测试→故障分析”的证据链跑完整。
- Topic:perception_node 发布
/sorting/perception/detections - Service:dispatcher_node 提供
/sorting/dispatcher/assign_task - Action:arm_action_server 提供
/sorting/arm/execute_path(含 feedback、result、cancel)
1.1 接口契约绑定(来自上节课,先对齐再生成)
本讲不再“现场编接口字段”,而是直接复用上节课的接口契约,并要求 AI 生成代码时严格按字段与类型实现:
ros2_comm_design_interfaces/msg/DetectedItemArray(包含stamp/task_id/frame_id/items)ros2_comm_design_interfaces/srv/AssignTask(Request:request_id/task_id/target_class/target_count;Response:ok/task_id/error_code/message)ros2_comm_design_interfaces/action/ExecuteArmPath(Goal:task_id/path_id/speed/timeout_sec;Result:ok/error_code/message/final_state;Feedback:progress/current_step/safety_state)
colcon build --packages-select ros2_comm_design_interfaces
source install/setup.bash
ros2 interface show ros2_comm_design_interfaces/msg/DetectedItemArray
ros2 interface show ros2_comm_design_interfaces/srv/AssignTask
ros2 interface show ros2_comm_design_interfaces/action/ExecuteArmPath
ros2_ws/src/ros2_comm_ai_nodes/
package.xml
setup.py
setup.cfg
resource/ros2_comm_ai_nodes
ros2_comm_ai_nodes/__init__.py
ros2_comm_ai_nodes/perception_node.py
ros2_comm_ai_nodes/dispatcher_node.py
ros2_comm_ai_nodes/arm_action_server.py
创建包与编译(示例命令)
cd ~/ros2_ws/src
ros2 pkg create ros2_comm_ai_nodes --build-type ament_python --dependencies rclpy geometry_msgs action_msgs ros2_comm_design_interfaces
cd ~/ros2_ws
colcon build --packages-select ros2_comm_design_interfaces ros2_comm_ai_nodes
source install/setup.bash
1.3 可复制运行代码(基于上节课接口)
setup.py
from setuptools import setup
package_name = "ros2_comm_ai_nodes"
setup(
name=package_name,
version="0.0.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
entry_points={
"console_scripts": [
"perception_node = ros2_comm_ai_nodes.perception_node:main",
"dispatcher_node = ros2_comm_ai_nodes.dispatcher_node:main",
"arm_action_server = ros2_comm_ai_nodes.arm_action_server:main",
],
},
)
setup.cfg
[develop]
script_dir=$base/lib/ros2_comm_ai_nodes
[install]
install_scripts=$base/lib/ros2_comm_ai_nodes
package.xml(最小依赖)
<?xml version="1.0"?>
<package format="3">
<name>ros2_comm_ai_nodes</name>
<version>0.0.0</version>
<description>AI assisted ROS2 communication nodes based on ros2_comm_design_interfaces</description>
<maintainer email="[email protected]">teacher</maintainer>
<license>Apache-2.0</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>action_msgs</exec_depend>
<exec_depend>ros2_comm_design_interfaces</exec_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
ros2_comm_ai_nodes/perception_node.py(Topic 发布:DetectedItemArray)
import random
import rclpy
from geometry_msgs.msg import Pose
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from ros2_comm_design_interfaces.msg import DetectedItem, DetectedItemArray
class PerceptionNode(Node):
def __init__(self) -> None:
super().__init__("perception_node")
self._publisher = self.create_publisher(
DetectedItemArray,
"/sorting/perception/detections",
qos_profile_sensor_data,
)
self._seq = 0
self._timer = self.create_timer(0.033, self._on_timer)
def _on_timer(self) -> None:
self._seq += 1
msg = DetectedItemArray()
msg.stamp = self.get_clock().now().to_msg()
msg.task_id = f"detect_{self._seq:06d}"
msg.frame_id = "camera_link"
item = DetectedItem()
item.stamp = msg.stamp
item.task_id = msg.task_id
item.class_id = "apple"
item.score = float(random.random())
pose = Pose()
pose.position.x = float(random.uniform(0.2, 0.6))
pose.position.y = float(random.uniform(-0.2, 0.2))
pose.position.z = float(random.uniform(0.1, 0.4))
pose.orientation.w = 1.0
item.pose = pose
msg.items = [item]
self._publisher.publish(msg)
if self._seq % 30 == 0:
self.get_logger().info(f"published detections task_id={msg.task_id} frame_id={msg.frame_id} items={len(msg.items)}")
def main() -> None:
rclpy.init()
node = PerceptionNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
ros2_comm_ai_nodes/dispatcher_node.py(Service 服务:AssignTask)
import uuid
import rclpy
from rclpy.node import Node
from ros2_comm_design_interfaces.srv import AssignTask
class DispatcherNode(Node):
def __init__(self) -> None:
super().__init__("dispatcher_node")
self._service = self.create_service(
AssignTask,
"/sorting/dispatcher/assign_task",
self._handle_assign_task,
)
def _handle_assign_task(self, request: AssignTask.Request, response: AssignTask.Response) -> AssignTask.Response:
if not request.request_id.strip():
response.ok = False
response.task_id = request.task_id
response.error_code = 2003
response.message = "request_id is empty"
self.get_logger().error(f"assign_task failed request_id=<empty> task_id={request.task_id} error_code={response.error_code}")
return response
if not request.target_class.strip():
response.ok = False
response.task_id = request.task_id
response.error_code = 2001
response.message = "target_class is empty"
self.get_logger().error(f"assign_task failed request_id={request.request_id} task_id={request.task_id} error_code={response.error_code}")
return response
if request.target_count <= 0:
response.ok = False
response.task_id = request.task_id
response.error_code = 2002
response.message = "target_count must be > 0"
self.get_logger().error(f"assign_task failed request_id={request.request_id} task_id={request.task_id} error_code={response.error_code}")
return response
task_id = request.task_id.strip()
if not task_id:
task_id = f"t_{uuid.uuid4().hex[:8]}"
response.ok = True
response.task_id = task_id
response.error_code = 0
response.message = "assigned"
self.get_logger().info(
f"assign_task ok request_id={request.request_id} task_id={response.task_id} target_class={request.target_class} target_count={request.target_count}"
)
return response
def main() -> None:
rclpy.init()
node = DispatcherNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
ros2_comm_ai_nodes/arm_action_server.py(Action 服务:ExecuteArmPath)
import time
import rclpy
from action_msgs.msg import GoalStatus
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from ros2_comm_design_interfaces.action import ExecuteArmPath
class ArmActionServer(Node):
def __init__(self) -> None:
super().__init__("arm_action_server")
self._action_server = ActionServer(
self,
ExecuteArmPath,
"/sorting/arm/execute_path",
execute_callback=self._execute,
goal_callback=self._goal_callback,
cancel_callback=self._cancel_callback,
)
def _goal_callback(self, goal_request: ExecuteArmPath.Goal) -> GoalResponse:
return GoalResponse.ACCEPT
def _cancel_callback(self, goal_handle) -> CancelResponse:
return CancelResponse.ACCEPT
def _execute(self, goal_handle):
goal = goal_handle.request
if not goal.task_id.strip():
goal_handle.abort()
result = ExecuteArmPath.Result()
result.ok = False
result.error_code = 4005
result.message = "task_id is empty"
result.final_state = GoalStatus.STATUS_ABORTED
return result
if not goal.path_id.strip():
goal_handle.abort()
result = ExecuteArmPath.Result()
result.ok = False
result.error_code = 4006
result.message = "path_id is empty"
result.final_state = GoalStatus.STATUS_ABORTED
return result
if goal.speed <= 0.0 or goal.speed > 1.0:
goal_handle.abort()
result = ExecuteArmPath.Result()
result.ok = False
result.error_code = 4001
result.message = "invalid speed (expected 0.0 < speed <= 1.0)"
result.final_state = GoalStatus.STATUS_ABORTED
return result
if goal.timeout_sec <= 0.0:
goal_handle.abort()
result = ExecuteArmPath.Result()
result.ok = False
result.error_code = 4004
result.message = "invalid timeout_sec (expected > 0)"
result.final_state = GoalStatus.STATUS_ABORTED
return result
steps = 10
step_sleep = 0.2
start_time = time.time()
self.get_logger().info(f"execute_path start task_id={goal.task_id} path_id={goal.path_id} speed={goal.speed} timeout_sec={goal.timeout_sec}")
feedback = ExecuteArmPath.Feedback()
for i in range(steps):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result = ExecuteArmPath.Result()
result.ok = False
result.error_code = 4002
result.message = "canceled"
result.final_state = GoalStatus.STATUS_CANCELED
self.get_logger().warn(f"execute_path canceled task_id={goal.task_id} path_id={goal.path_id} error_code={result.error_code}")
return result
if (time.time() - start_time) > float(goal.timeout_sec):
goal_handle.abort()
result = ExecuteArmPath.Result()
result.ok = False
result.error_code = 4003
result.message = "timeout"
result.final_state = GoalStatus.STATUS_ABORTED
self.get_logger().error(f"execute_path timeout task_id={goal.task_id} path_id={goal.path_id} error_code={result.error_code}")
return result
feedback.progress = float(i + 1) / float(steps)
feedback.current_step = "EXECUTING"
feedback.safety_state = 0
goal_handle.publish_feedback(feedback)
time.sleep(step_sleep)
goal_handle.succeed()
result = ExecuteArmPath.Result()
result.ok = True
result.error_code = 0
result.message = "succeeded"
result.final_state = GoalStatus.STATUS_SUCCEEDED
self.get_logger().info(f"execute_path succeeded task_id={goal.task_id} path_id={goal.path_id}")
return result
def main() -> None:
rclpy.init()
node = ArmActionServer()
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
finally:
node.destroy_node()
rclpy.shutdown()
source ~/ros2_ws/install/setup.bash
ros2 run ros2_comm_ai_nodes perception_node
新终端:
source ~/ros2_ws/install/setup.bash
ros2 topic echo /sorting/perception/detections --once
再开终端运行服务端并调用:
source ~/ros2_ws/install/setup.bash
ros2 run ros2_comm_ai_nodes dispatcher_node
新终端:
source ~/ros2_ws/install/setup.bash
ros2 service call /sorting/dispatcher/assign_task ros2_comm_design_interfaces/srv/AssignTask "{request_id: 'r1', task_id: '', target_class: 'apple', target_count: 3}"
再开终端运行 Action 服务端并测试:
source ~/ros2_ws/install/setup.bash
ros2 run ros2_comm_ai_nodes arm_action_server
新终端:
source ~/ros2_ws/install/setup.bash
ros2 action send_goal /sorting/arm/execute_path ros2_comm_design_interfaces/action/ExecuteArmPath "{task_id: 't001', path_id: 'p001', speed: 0.5, timeout_sec: 10.0}" --feedback
2. AI 输出检查点(拿到初稿先看什么)
-
是否给出清晰文件结构(例如
nodes/perception_node.py等)。 -
是否包含节点启动入口(
rclpy.init()、spin()、destroy_node())。 -
是否给出可执行的测试命令(topic echo / service call / action send_goal)。
-
是否明确 task_id/request_id 的来源与贯穿路径。
-
是否给出取消与超时语义(Action 必须有 cancel 处理)。
-
Topic:能
ros2 topic list看到话题;ros2 topic echo --once能看到结构化输出。 -
Service:能
ros2 service list看到服务;ros2 service call能返回ok/error_code/message。 -
Action:能
ros2 action list看到动作;ros2 action send_goal --feedback能看到 feedback;能取消并得到明确 final_state。
任务:以小组自选题项目为主(无项目则用分拣系统),每组只做 1 条最小闭环链路,完成“需求描述 → AI 生成代码初稿”。
- AI 生成的代码片段/文件结构(至少覆盖 1 个 Topic + 1 个 Service + 1 个 Action)
- 测试命令列表(Topic/Service/Action 各至少 1 条)
1. 语法规范审计(能跑与可维护)
- 导入与依赖:是否缺包/错包;是否使用 Humble 兼容 API。
- 节点生命周期:是否正确 init/spin/shutdown;是否资源泄露。
- 日志规范:关键路径必须打印 task_id/request_id;错误必须打印 error_code 与建议动作。
- 参数与常量:硬编码是否可接受;关键 topic/service/action 名称是否集中管理。
2. 逻辑正确性审计(能跑但不对)
- Topic:发布频率是否符合需求;数据字段是否完整;时间戳/坐标系是否合理。
- Service:返回语义是否清晰;失败原因是否可定位(error_code/message);幂等/重复请求是否有口径。
- Action:feedback 是否合理;超时与取消是否区分;最终 result 是否表达成功/失败/取消/超时。
3. 业务适配性审计(对不对项目)
-
命名是否符合项目约定(统一路径、统一接口名风格)。
-
状态码/错误码是否有码表口径;是否能用于上层策略(重试/停机/报警)。
-
与你们链路的主键是否一致(task_id/fruit_id 的贯穿策略是否写清)。
-
问题清单:问题描述 → 影响 → 修改建议 → 修改后证据(命令/日志)
-
修改记录:保留 AI 原输出 + 人工修改差异点(至少 3 条)
1. 最小测试矩阵(必须跑)
-
正常路径:Topic 能收到;Service 正常返回;Action 有反馈并最终成功。
-
异常路径(至少 1 个):Service 参数错误/超时;Action 取消/超时/失败之一。
-
回归:修复后重复跑一次正常路径,避免“修一处坏一处”。
-
Service 参数错误:把
target_count设为0,要求返回ok=false且error_code!=0,并能从message看出原因。 -
Action 取消:执行过程中另开终端取消目标,要求最终
final_state=5(CANCELED),并且ok=false、error_code!=0。 -
Action 超时:发送
timeout_sec很小的目标,要求最终final_state=6(ABORTED),并且ok=false、error_code!=0,日志能定位为 timeout。
对应命令参考(示例口径)
# Service 参数错误
ros2 service call /sorting/dispatcher/assign_task ros2_comm_design_interfaces/srv/AssignTask "{request_id: 'r_bad', task_id: '', target_class: 'apple', target_count: 0}"
# Action 超时(timeout_sec 小于总执行时间)
ros2 action send_goal /sorting/arm/execute_path ros2_comm_design_interfaces/action/ExecuteArmPath "{task_id: 't_timeout', path_id: 'p001', speed: 0.5, timeout_sec: 0.5}" --feedback
2. 推荐命令(示例口径)
- Topic:
ros2 topic echo /xxx --once - Service:
ros2 service call /xxx <pkg>/srv/<Srv> "{...}" - Action:
ros2 action send_goal /xxx <pkg>/action/<Action> "{...}" --feedback - 观察:
ros2 node list、ros2 topic list -t、ros2 interface show ...
现象描述 → 收集证据(命令输出/日志)→ 提出假设(可能原因)→ 验证(补充命令/对比)→ 修复 → 回归验证
2. 常见故障与证据点(速查)
-
找不到话题/服务/动作:命名不一致、节点没启动、namespace 不一致。
-
收不到消息:QoS 不匹配、domain id 不一致、未 source 环境、网络隔离。
-
调用失败:类型不匹配、接口包没编译、服务端异常退出、请求字段缺失。
-
Action 无反馈/无法取消:未实现 feedback 发布、cancel 回调缺失、final_state 语义混乱。
-
字段名写错:例如把
target_count写成count,导致 service call 失败或逻辑误判。 -
类型不一致:例如把
progress当成 int 或百分比 0100,导致上层判断错误(接口卡里必须写清 0.01.0)。 -
语义错但不报错:例如 Action 成功/取消/失败都写同一个
final_state,导致调度策略做错。
3. 故障定位提示词模板(可直接复制)
我遇到 ROS2 Humble 通信故障,请按“现象→证据→假设→验证→修复→回归”的结构回答,不要只给猜测。
现象:
- (例如)topic echo 收不到 /sorting/perception/detections
证据:
- ros2 node list 输出:...
- ros2 topic list -t 输出:...
- 节点日志关键行:...
约束:
- 我使用的是 ROS2 Humble
- 需要给出具体可执行的验证命令
- 修复建议要落到具体改动点(代码/命令/配置)