ROS 2 Communication Model
Nodes
In ROS 2, a Node is the fundamental unit of execution. Nodes are processes that perform computation and communicate with other nodes through messages. Each node typically performs a specific task or function within the larger robotic system.
Node Characteristics
- Process-based: Each node runs as a separate process
- Single-threaded execution: By default, nodes execute callbacks in a single thread
- Namespaced: Nodes can have namespaces for organization
- Composable: Multiple nodes can be combined into a single process for efficiency
Creating Nodes
Nodes are created by inheriting from the Node class in your chosen client library (rclpy for Python, rclcpp for C++):
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node_name')
# Initialize node-specific components here
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Topics and Publish/Subscribe Data Flow
Topics are named buses over which nodes exchange messages. The publish/subscribe pattern is the most common communication paradigm in ROS 2.
Publishers and Subscribers
- Publishers: Send messages to topics
- Subscribers: Receive messages from topics
- Anonymous: Publishers and subscribers don't know about each other
- Decoupled: Publishers and subscribers can start and stop at any time
Example: Publisher
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Talker(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'chatter', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main(args=None):
rclpy.init(args=args)
talker = Talker()
rclpy.spin(talker)
talker.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Example: Subscriber
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Listener(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String,
'chatter',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info(f'I heard: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
listener = Listener()
rclpy.spin(listener)
listener.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Services
Services provide a request/reply communication pattern where one node sends a request and another node provides a response.
Service Characteristics
- Synchronous: The client waits for a response
- Request/Response: Defined message types for request and response
- One-to-one: Typically one server responds to one client at a time
Example: Service Server
from add_two_ints_srv.srv import AddTwoInts
import rclpy
from rclpy.node import Node
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(f'Returning: {response.sum}')
return response
def main(args=None):
rclpy.init(args=args)
minimal_service = MinimalService()
rclpy.spin(minimal_service)
rclpy.shutdown()
if __name__ == '__main__':
main()
Example: Service Client
import sys
from add_two_ints_srv.srv import AddTwoInts
import rclpy
from rclpy.node import Node
class MinimalClientAsync(Node):
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
self.req = AddTwoInts.Request()
def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
def main(args=None):
rclpy.init(args=args)
minimal_client = MinimalClientAsync()
response = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
minimal_client.get_logger().info(f'Result of add_two_ints: {response.sum}')
minimal_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Bridging Python AI Agents to Robot Controllers using rclpy
One of the most powerful aspects of ROS 2 is its ability to bridge AI agents written in Python with robot controllers. This allows sophisticated AI algorithms to control physical robots through standardized interfaces.
rclpy Overview
rclpy is the Python client library for ROS 2. It provides Python bindings for the ROS 2 client library (rcl) and the underlying middleware.
Integration Pattern
The typical pattern for bridging AI agents to robot controllers involves:
- Perception Nodes: Process sensor data and publish to topics
- AI Decision Node: Subscribes to sensor data, runs AI algorithms, publishes commands
- Controller Nodes: Subscribe to commands and control robot actuators
Example: AI Agent Bridge
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
class AIAgentBridge(Node):
def __init__(self):
super().__init__('ai_agent_bridge')
# Subscribe to sensor data
self.subscription = self.create_subscription(
LaserScan,
'scan',
self.laser_callback,
10)
# Publish commands to robot
self.publisher = self.create_publisher(Twist, 'cmd_vel', 10)
# Timer for AI decision making
self.timer = self.create_timer(0.1, self.ai_decision_callback)
self.laser_data = None
def laser_callback(self, msg):
# Store laser scan data for AI processing
self.laser_data = msg.ranges
def ai_decision_callback(self):
if self.laser_data is None:
return
# Simple AI decision making
# Find minimum distance in front of robot
front_distances = self.laser_data[0:30] + self.laser_data[-30:]
min_distance = min(front_distances)
# Create velocity command based on AI decision
cmd_vel = Twist()
if min_distance < 1.0: # Too close to obstacle
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = 0.5 # Turn away
else:
cmd_vel.linear.x = 0.5 # Move forward
cmd_vel.angular.z = 0.0
self.publisher.publish(cmd_vel)
def main(args=None):
rclpy.init(args=args)
ai_agent = AIAgentBridge()
rclpy.spin(ai_agent)
ai_agent.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
The Perception → Decision → Action Loop
The perception-decision-action loop is a fundamental concept in robotics and AI that describes how intelligent agents interact with their environment.
Loop Components
- Perception: Gathering information from sensors about the environment
- Decision: Processing sensor data to determine appropriate actions
- Action: Executing commands that affect the environment
- Feedback: New sensor data reflecting the results of actions
ROS 2 Implementation
In ROS 2, this loop is typically implemented using:
- Perception: Sensor drivers publishing to topics
- Decision: AI nodes subscribing to sensor data and publishing commands
- Action: Controller nodes executing commands on the robot
- Feedback: Sensor data reflecting the results of actions
Example Complete Loop
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, LaserScan
from geometry_msgs.msg import Twist
from cv_bridge import CvBridge
import cv2
import numpy as np
class PerceptionDecisionActionLoop(Node):
def __init__(self):
super().__init__('pda_loop')
# Initialize components
self.cv_bridge = CvBridge()
# Perception: Subscribe to sensor data
self.image_sub = self.create_subscription(
Image, 'camera/image_raw', self.image_callback, 10)
self.laser_sub = self.create_subscription(
LaserScan, 'scan', self.laser_callback, 10)
# Action: Publish commands
self.cmd_pub = self.create_publisher(Twist, 'cmd_vel', 10)
# Timer for decision making
self.timer = self.create_timer(0.1, self.decision_callback)
# Internal state
self.latest_image = None
self.latest_laser = None
def image_callback(self, msg):
"""Perception: Process camera image"""
try:
cv_image = self.cv_bridge.imgmsg_to_cv2(msg, "bgr8")
self.latest_image = cv_image
except Exception as e:
self.get_logger().error(f'Image conversion failed: {e}')
def laser_callback(self, msg):
"""Perception: Process laser scan"""
self.latest_laser = msg.ranges
def decision_callback(self):
"""Decision: Make decisions based on sensor data"""
if self.latest_laser is None:
return
# Simple decision logic
cmd_vel = Twist()
# Analyze laser data for obstacles
front_ranges = self.latest_laser[0:30] + self.latest_laser[-30:]
min_front = min([r for r in front_ranges if r > 0.1], default=float('inf'))
# Decision making
if min_front < 0.8: # Obstacle detected
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = 0.8 # Turn right
else:
cmd_vel.linear.x = 0.4 # Move forward
cmd_vel.angular.z = 0.0
# Action: Execute command
self.cmd_pub.publish(cmd_vel)
def main(args=None):
rclpy.init(args=args)
loop_node = PerceptionDecisionActionLoop()
rclpy.spin(loop_node)
loop_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Summary
The ROS 2 communication model provides a flexible and robust framework for building complex robotic systems. The combination of nodes, topics, and services enables the creation of distributed systems where AI agents can seamlessly interface with robot controllers. The perception-decision-action loop provides a fundamental pattern for implementing intelligent behavior in robotic systems.