ROS 2 паттерны — Говорим на языке роботов
ROS 2 — это не операционная система, а язык общения для роботов. Представь, что робот — это команда людей, где каждый отвечает за свою задачу. ROS 2 помогает им общаться и работать вместе.
4 способа общения в ROS 2
| Способ | Аналогия из жизни | Когда использовать |
|---|---|---|
| Topics | Радио или чат в Telegram | Постоянный поток данных (сенсоры, видео) |
| Services | Телефонный звонок | Вопрос-ответ, короткие команды |
| Actions | Заказ в ресторане | Длинные задачи с прогрессом и возможностью отмены |
| Parameters | Настройки в телефоне | Конфигурация, настройки |
1. Topics (Топики) — Радио для роботов
Простая аналогия:
- Издатель (Publisher) — ведущий на радиостанции
- Топик (Topic) — частота радиостанции (например, 107.5 FM)
- Подписчик (Subscriber) — радиослушатель
Пример: Датчик температуры публикует данные
# Издатель: датчик температуры (публикует каждую секунду)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import random
class TemperatureSensor(Node):
def __init__(self):
super().__init__('temperature_sensor')
# Создаём издателя на топик "temperature"
self.publisher = self.create_publisher(Float32, 'temperature', 10)
# Таймер: публикуем каждую секунду
self.timer = self.create_timer(1.0, self.publish_temperature)
self.get_logger().info('Датчик температуры запущен!')
def publish_temperature(self):
# Генерируем случайную температуру 20-25°C
temp = 20 + random.random() * 5
msg = Float32()
msg.data = temp
# Публикуем сообщение
self.publisher.publish(msg)
self.get_logger().info(f'Отправил: {temp:.1f}°C')
# Подписчик: кондиционер (слушает температуру)
class AirConditioner(Node):
def __init__(self):
super().__init__('air_conditioner')
# Подписываемся на топик "temperature"
self.subscription = self.create_subscription(
Float32,
'temperature',
self.temperature_callback,
10
)
self.get_logger().info('Кондиционер запущен!')
def temperature_callback(self, msg):
temp = msg.data
self.get_logger().info(f'Получил температуру: {temp:.1f}°C')
# Логика работы кондиционера
if temp > 23:
self.get_logger().info('Включил охлаждение!')
elif temp < 21:
self.get_logger().info('Включил обогрев!')
else:
self.get_logger().info('Температура идеальная')
# Запуск в разных терминалах:
# terminal 1: ros2 run my_package temperature_sensor
# terminal 2: ros2 run my_package air_conditioner
Что происходит:
- Датчик каждую секунду публикует температуру
- Кондиционер получает температуру и принимает решение
- Они не знают друг о друге! Можно добавить обогреватель, и он тоже будет слушать
2. Services (Сервисы) — Телефонный звонок
Простая аналогия:
- Клиент (Client) — ты звонишь в пиццерию
- Сервер (Server) — оператор в пиццерии
- Запрос (Request) — “Можно пиццу?”
- Ответ (Response) — “Да, будет через 30 минут”
Пример: Робот-официант принимает заказы
# Сервер: робот-официант (принимает заказы)
import rclpy
from rclpy.node import Node
from my_robot_srvs.srv import OrderFood # Наш кастомный сервис
class WaiterRobot(Node):
def __init__(self):
super().__init__('waiter_robot')
# Создаём сервис "order_food"
self.service = self.create_service(
OrderFood,
'order_food',
self.handle_order
)
self.get_logger().info('Робот-официант готов принимать заказы!')
def handle_order(self, request, response):
# request — что заказали
# response — что отвечаем
dish = request.dish
table = request.table_number
self.get_logger().info(f'Заказ: {dish} для стола {table}')
# Проверяем, есть ли блюдо в меню
menu = ['пицца', 'бургер', 'салат', 'суп']
if dish.lower() in menu:
response.success = True
response.message = f'Заказ "{dish}" принят! Принесу через 5 минут.'
response.order_id = hash(dish + str(table)) % 1000
else:
response.success = False
response.message = f'Извините, "{dish}" нет в меню.'
response.order_id = -1
return response
# Клиент: посетитель (делает заказ)
class Customer(Node):
def __init__(self):
super().__init__('customer')
# Создаём клиента для сервиса "order_food"
self.client = self.create_client(OrderFood, 'order_food')
# Ждём, пока сервис станет доступен
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Жду робота-официанта...')
self.get_logger().info('Робот найден! Делаю заказ...')
self.order_pizza()
def order_pizza(self):
# Создаём запрос
request = OrderFood.Request()
request.dish = 'пицца'
request.table_number = 5
# Отправляем запрос и ЖДЁМ ответ
future = self.client.call_async(request)
# Ожидаем ответ (блокируемся)
rclpy.spin_until_future_complete(self, future)
# Получаем ответ
response = future.result()
if response.success:
self.get_logger().info(f'Ура! {response.message}')
self.get_logger().info(f'Номер заказа: {response.order_id}')
else:
self.get_logger().info(f'Ой: {response.message}')
# Определение сервиса (файл OrderFood.srv):
# string dish # Что заказали
# int32 table_number # Номер стола
# ---
# bool success # Успех?
# string message # Сообщение
# int32 order_id # Номер заказа
Важные особенности сервисов:
- Синхронность — клиент ждёт ответа
- Один-к-одному — один запрос, один ответ
- Гарантия доставки — либо ответ, либо ошибка
3. Actions (Действия) — Длинные задачи с прогрессом
Простая аналогия:
Заказ в ресторане:
- Goal (Цель) — “Хочу стейк с картошкой”
- Feedback (Обратная связь) — “Стейк жарится, осталось 5 минут”
- Result (Результат) — “Вот ваш стейк, приятного аппетита!”
- Cancel (Отмена) — “Извините, я передумал”
Пример: Робот-уборщик убирает комнату
# Сервер действия: робот-уборщик
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from my_robot_actions.action import CleanRoom
import time
class CleanerRobot(Node):
def __init__(self):
super().__init__('cleaner_robot')
# Создаём Action Server "clean_room"
self.action_server = ActionServer(
self,
CleanRoom,
'clean_room',
self.execute_cleanup
)
self.get_logger().info('Робот-уборщик готов к работе!')
async def execute_cleanup(self, goal_handle):
# Получаем цель: какую комнату убирать
room = goal_handle.request.room_name
self.get_logger().info(f'Начинаю уборку комнаты: {room}')
# Этапы уборки
stages = [
'Подметаю пол...',
'Протираю пыль...',
'Мою пол...',
'Выношу мусор...',
'Проветриваю...'
]
feedback_msg = CleanRoom.Feedback()
result = CleanRoom.Result()
for i, stage in enumerate(stages):
# Проверяем, не отменили ли задачу
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result.success = False
result.message = 'Уборка отменена'
return result
# Делаем работу
self.get_logger().info(stage)
time.sleep(2) # Имитируем работу
# Отправляем обратную связь
feedback_msg.progress = (i + 1) * 20 # 20%, 40%, ...
feedback_msg.current_stage = stage
goal_handle.publish_feedback(feedback_msg)
# Задача завершена
goal_handle.succeed()
result.success = True
result.message = f'Комната {room} идеально чиста!'
return result
# Клиент действия: хозяин (дает команду убраться)
class Owner(Node):
def __init__(self):
super().__init__('owner')
# Создаём Action Client
self.client = ActionClient(self, CleanRoom, 'clean_room')
# Ждём сервер
self.get_logger().info('Ищу робота-уборщика...')
self.client.wait_for_server()
# Отправляем цель
self.clean_kitchen()
def clean_kitchen(self):
# Создаём цель
goal_msg = CleanRoom.Goal()
goal_msg.room_name = 'кухня'
# Отправляем цель асинхронно
future = self.client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Робот отказался убираться')
return
self.get_logger().info('Робот принял задачу!')
# Ждём результат
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.result_callback)
def feedback_callback(self, feedback_msg):
# Получаем обратную связь во время выполнения
progress = feedback_msg.feedback.progress
stage = feedback_msg.feedback.current_stage
self.get_logger().info(f'Прогресс: {progress}% - {stage}')
def result_callback(self, future):
result = future.result().result
if result.success:
self.get_logger().info(f'Ура! {result.message}')
else:
self.get_logger().info(f'Неудача: {result.message}')
# Определение действия (файл CleanRoom.action):
# string room_name # Какую комнату убирать
# ---
# bool success # Успешно?
# string message # Сообщение
# ---
# int32 progress # Прогресс в %
# string current_stage # Текущий этап
Почему Actions лучше Services для длинных задач:
- Обратная связь — видишь прогресс
- Возможность отмены — передумал? Отмени!
- Асинхронность — не блокируешься на всё время выполнения
4. Parameters (Параметры) — Настройки робота
Простая аналогия:
Настройки в телефоне — яркость, громкость, язык. Меняешь в одном месте, применяется везде.
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import ParameterDescriptor
class ConfigurableRobot(Node):
def __init__(self):
super().__init__('configurable_robot')
# Объявляем параметры со значениями по умолчанию
self.declare_parameter('max_speed', 1.0)
self.declare_parameter('color', 'синий')
self.declare_parameter('auto_mode', True)
# Получаем параметры
self.max_speed = self.get_parameter('max_speed').value
self.color = self.get_parameter('color').value
self.auto_mode = self.get_parameter('auto_mode').value
self.get_logger().info(f'Скорость: {self.max_speed} м/с')
self.get_logger().info(f'Цвет: {self.color}')
self.get_logger().info(f'Авторежим: {"вкл" if self.auto_mode else "выкл"}')
# Подписываемся на изменения параметров
self.add_on_set_parameters_callback(self.parameters_callback)
def parameters_callback(self, parameters):
# Вызывается при изменении параметров
for param in parameters:
if param.name == 'max_speed':
self.max_speed = param.value
self.get_logger().info(f'Новая скорость: {self.max_speed}')
elif param.name == 'color':
self.color = param.value
self.get_logger().info(f'Новый цвет: {self.color}')
return SetParametersResult(successful=True)
# Изменение параметров из командной строки:
# ros2 param set /configurable_robot max_speed 2.0
# ros2 param set /configurable_robot color "красный"
# ros2 param dump /configurable_robot # Сохранить все параметры в файл
Практическое задание: Робот-патрульный
Создадим робота, который:
- Издаёт данные с лидара (Topics)
- Отвечает на запросы статуса (Services)
- Выполняет патрулирование по маршруту (Actions)
- Настраивается параметрами (Parameters)
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from sensor_msgs.msg import LaserScan
from my_robot_srvs.srv import GetStatus
from my_robot_actions.action import Patrol
class PatrolRobot(Node):
def __init__(self):
super().__init__('patrol_robot')
# 1. PUBLISHER (Топик) - данные лидара
self.lidar_pub = self.create_publisher(LaserScan, 'scan', 10)
# 2. SERVICE (Сервис) - статус робота
self.status_service = self.create_service(
GetStatus, 'get_status', self.handle_status
)
# 3. ACTION (Действие) - патрулирование
self.patrol_action = ActionServer(
self, Patrol, 'patrol', self.execute_patrol
)
# 4. PARAMETERS (Параметры) - настройки
self.declare_parameter('patrol_speed', 0.5)
self.declare_parameter('battery_level', 100)
self.get_logger().info('Робот-патрульный запущен!')
def publish_lidar_data(self):
# Имитация данных лидара
msg = LaserScan()
msg.ranges = [1.0, 1.1, 1.2, 0.5, 2.0] # Расстояния в метрах
self.lidar_pub.publish(msg)
def handle_status(self, request, response):
# Сервис: отвечаем на запрос статуса
response.battery = self.get_parameter('battery_level').value
response.is_patrolling = self.is_patrolling
response.position = [0.0, 0.0, 0.0]
return response
async def execute_patrol(self, goal_handle):
# Действие: выполняем патрулирование
points = ['A', 'B', 'C', 'D']
feedback_msg = Patrol.Feedback()
for i, point in enumerate(points):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return Patrol.Result(success=False)
self.get_logger().info(f'Еду к точке {point}')
feedback_msg.current_point = point
feedback_msg.progress = (i + 1) * 25
goal_handle.publish_feedback(feedback_msg)
# Имитация движения
await asyncio.sleep(2)
goal_handle.succeed()
return Patrol.Result(success=True, message='Патруль завершён')
def main():
rclpy.init()
robot = PatrolRobot()
# Главный цикл
try:
rclpy.spin(robot)
except KeyboardInterrupt:
pass
robot.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Как выбрать правильный паттерн?
Диаграмма принятия решений:
Нужно передавать данные постоянно? → Да → TOPICS
↓ Нет
Нужен ответ? → Да → Нужна обратная связь? → Нет → SERVICES
↓ Да
Долгая задача? → Да → ACTIONS
↓ Нет
SERVICES
Шпаргалка:
| Ситуация | Паттерн | Пример |
|---|---|---|
| Данные идут потоком | Topics | Видео с камеры, показания датчиков |
| Нужно быстро получить ответ | Services | Запрос заряда батареи, включение света |
| Долгая задача с прогрессом | Actions | Движение к цели, сканирование помещения |
| Настройки для всего робота | Parameters | Максимальная скорость, цвет подсветки |
Полезные команды ROS 2
# Посмотреть все узлы
ros2 node list
# Посмотреть все топики
ros2 topic list
# Посмотреть сообщения в топике
ros2 topic echo /temperature
# Отправить сообщение в топик
ros2 topic pub /temperature std_msgs/msg/Float32 "{data: 24.5}"
# Посмотреть все сервисы
ros2 service list
# Вызвать сервис
ros2 service call /get_status my_robot_srvs/srv/GetStatus
# Посмотреть все действия
ros2 action list
# Посмотреть параметры узла
ros2 param list /patrol_robot
# Изменить параметр
ros2 param set /patrol_robot patrol_speed 0.8
Что делать дальше?
Простые проекты для начала:
- Два узла, один топик — датчик температуры + дисплей
- Сервис-калькулятор — клиент шлёт числа, сервер возвращает сумму
- Робот с действиями — движение по квадрату с обратной связью
Более сложные:
- Робот-художник — принимает изображение (Service), рисует (Action), публикует прогресс (Topic)
- Умный дом на ROS 2 — датчики (Topics), управление (Services), сценарии (Actions)
- Робот-футболист — видит мяч (CV), планирует путь (Services), бежит (Action)
Советы:
- Начинай с малого — один паттерн за раз
- Используй готовые сообщения (
std_msgs) сначала - Делай много логов (
self.get_logger().info()) - Тестируй через командную строку перед написанием клиента
Помни: ROS 2 — это просто язык. Чем больше ты на нём “говоришь”, тем лучше понимаешь, как думают современные роботы!
