Skip to main content

ROS 2 паттерны — Говорим на языке роботов

ROS 2 — это не операционная система, а язык общения для роботов. Представь, что робот — это команда людей, где каждый отвечает за свою задачу. ROS 2 помогает им общаться и работать вместе.

4 способа общения в ROS 2

СпособАналогия из жизниКогда использовать
TopicsРадио или чат в TelegramПостоянный поток данных (сенсоры, видео)
ServicesТелефонный звонокВопрос-ответ, короткие команды
ActionsЗаказ в ресторанеДлинные задачи с прогрессом и возможностью отмены
ParametersНастройки в телефонеКонфигурация, настройки

1. Topics (Топики) — Радио для роботов

Простая аналогия:

  • Издатель (Publisher) — ведущий на радиостанции
  • Топик (Topic) — частота радиостанции (например, 107.5 FM)
  • Подписчик (Subscriber) — радиослушатель

Пример: Датчик температуры публикует данные

# Издатель: датчик температуры (публикует каждую секунду)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import random

class TemperatureSensor(Node):
    def __init__(self):
        super().__init__('temperature_sensor')
        
        # Создаём издателя на топик "temperature"
        self.publisher = self.create_publisher(Float32, 'temperature', 10)
        
        # Таймер: публикуем каждую секунду
        self.timer = self.create_timer(1.0, self.publish_temperature)
        self.get_logger().info('Датчик температуры запущен!')
    
    def publish_temperature(self):
        # Генерируем случайную температуру 20-25°C
        temp = 20 + random.random() * 5
        msg = Float32()
        msg.data = temp
        
        # Публикуем сообщение
        self.publisher.publish(msg)
        self.get_logger().info(f'Отправил: {temp:.1f}°C')

# Подписчик: кондиционер (слушает температуру)
class AirConditioner(Node):
    def __init__(self):
        super().__init__('air_conditioner')
        
        # Подписываемся на топик "temperature"
        self.subscription = self.create_subscription(
            Float32,
            'temperature',
            self.temperature_callback,
            10
        )
        self.get_logger().info('Кондиционер запущен!')
    
    def temperature_callback(self, msg):
        temp = msg.data
        self.get_logger().info(f'Получил температуру: {temp:.1f}°C')
        
        # Логика работы кондиционера
        if temp > 23:
            self.get_logger().info('Включил охлаждение!')
        elif temp < 21:
            self.get_logger().info('Включил обогрев!')
        else:
            self.get_logger().info('Температура идеальная')

# Запуск в разных терминалах:
# terminal 1: ros2 run my_package temperature_sensor
# terminal 2: ros2 run my_package air_conditioner

Что происходит:

  1. Датчик каждую секунду публикует температуру
  2. Кондиционер получает температуру и принимает решение
  3. Они не знают друг о друге! Можно добавить обогреватель, и он тоже будет слушать

2. Services (Сервисы) — Телефонный звонок

Простая аналогия:

  • Клиент (Client) — ты звонишь в пиццерию
  • Сервер (Server) — оператор в пиццерии
  • Запрос (Request) — “Можно пиццу?”
  • Ответ (Response) — “Да, будет через 30 минут”

Пример: Робот-официант принимает заказы

# Сервер: робот-официант (принимает заказы)
import rclpy
from rclpy.node import Node
from my_robot_srvs.srv import OrderFood  # Наш кастомный сервис

class WaiterRobot(Node):
    def __init__(self):
        super().__init__('waiter_robot')
        
        # Создаём сервис "order_food"
        self.service = self.create_service(
            OrderFood,
            'order_food',
            self.handle_order
        )
        self.get_logger().info('Робот-официант готов принимать заказы!')
    
    def handle_order(self, request, response):
        # request — что заказали
        # response — что отвечаем
        
        dish = request.dish
        table = request.table_number
        
        self.get_logger().info(f'Заказ: {dish} для стола {table}')
        
        # Проверяем, есть ли блюдо в меню
        menu = ['пицца', 'бургер', 'салат', 'суп']
        
        if dish.lower() in menu:
            response.success = True
            response.message = f'Заказ "{dish}" принят! Принесу через 5 минут.'
            response.order_id = hash(dish + str(table)) % 1000
        else:
            response.success = False
            response.message = f'Извините, "{dish}" нет в меню.'
            response.order_id = -1
        
        return response

# Клиент: посетитель (делает заказ)
class Customer(Node):
    def __init__(self):
        super().__init__('customer')
        
        # Создаём клиента для сервиса "order_food"
        self.client = self.create_client(OrderFood, 'order_food')
        
        # Ждём, пока сервис станет доступен
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Жду робота-официанта...')
        
        self.get_logger().info('Робот найден! Делаю заказ...')
        self.order_pizza()
    
    def order_pizza(self):
        # Создаём запрос
        request = OrderFood.Request()
        request.dish = 'пицца'
        request.table_number = 5
        
        # Отправляем запрос и ЖДЁМ ответ
        future = self.client.call_async(request)
        
        # Ожидаем ответ (блокируемся)
        rclpy.spin_until_future_complete(self, future)
        
        # Получаем ответ
        response = future.result()
        if response.success:
            self.get_logger().info(f'Ура! {response.message}')
            self.get_logger().info(f'Номер заказа: {response.order_id}')
        else:
            self.get_logger().info(f'Ой: {response.message}')

# Определение сервиса (файл OrderFood.srv):
# string dish          # Что заказали
# int32 table_number   # Номер стола
# ---
# bool success         # Успех?
# string message       # Сообщение
# int32 order_id       # Номер заказа

Важные особенности сервисов:

  1. Синхронность — клиент ждёт ответа
  2. Один-к-одному — один запрос, один ответ
  3. Гарантия доставки — либо ответ, либо ошибка

3. Actions (Действия) — Длинные задачи с прогрессом

Простая аналогия:

Заказ в ресторане:

  • Goal (Цель) — “Хочу стейк с картошкой”
  • Feedback (Обратная связь) — “Стейк жарится, осталось 5 минут”
  • Result (Результат) — “Вот ваш стейк, приятного аппетита!”
  • Cancel (Отмена) — “Извините, я передумал”

Пример: Робот-уборщик убирает комнату

# Сервер действия: робот-уборщик
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from my_robot_actions.action import CleanRoom
import time

class CleanerRobot(Node):
    def __init__(self):
        super().__init__('cleaner_robot')
        
        # Создаём Action Server "clean_room"
        self.action_server = ActionServer(
            self,
            CleanRoom,
            'clean_room',
            self.execute_cleanup
        )
        self.get_logger().info('Робот-уборщик готов к работе!')
    
    async def execute_cleanup(self, goal_handle):
        # Получаем цель: какую комнату убирать
        room = goal_handle.request.room_name
        self.get_logger().info(f'Начинаю уборку комнаты: {room}')
        
        # Этапы уборки
        stages = [
            'Подметаю пол...',
            'Протираю пыль...',
            'Мою пол...',
            'Выношу мусор...',
            'Проветриваю...'
        ]
        
        feedback_msg = CleanRoom.Feedback()
        result = CleanRoom.Result()
        
        for i, stage in enumerate(stages):
            # Проверяем, не отменили ли задачу
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result.success = False
                result.message = 'Уборка отменена'
                return result
            
            # Делаем работу
            self.get_logger().info(stage)
            time.sleep(2)  # Имитируем работу
            
            # Отправляем обратную связь
            feedback_msg.progress = (i + 1) * 20  # 20%, 40%, ...
            feedback_msg.current_stage = stage
            goal_handle.publish_feedback(feedback_msg)
        
        # Задача завершена
        goal_handle.succeed()
        result.success = True
        result.message = f'Комната {room} идеально чиста!'
        return result

# Клиент действия: хозяин (дает команду убраться)
class Owner(Node):
    def __init__(self):
        super().__init__('owner')
        
        # Создаём Action Client
        self.client = ActionClient(self, CleanRoom, 'clean_room')
        
        # Ждём сервер
        self.get_logger().info('Ищу робота-уборщика...')
        self.client.wait_for_server()
        
        # Отправляем цель
        self.clean_kitchen()
    
    def clean_kitchen(self):
        # Создаём цель
        goal_msg = CleanRoom.Goal()
        goal_msg.room_name = 'кухня'
        
        # Отправляем цель асинхронно
        future = self.client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )
        future.add_done_callback(self.goal_response_callback)
    
    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Робот отказался убираться')
            return
        
        self.get_logger().info('Робот принял задачу!')
        
        # Ждём результат
        result_future = goal_handle.get_result_async()
        result_future.add_done_callback(self.result_callback)
    
    def feedback_callback(self, feedback_msg):
        # Получаем обратную связь во время выполнения
        progress = feedback_msg.feedback.progress
        stage = feedback_msg.feedback.current_stage
        self.get_logger().info(f'Прогресс: {progress}% - {stage}')
    
    def result_callback(self, future):
        result = future.result().result
        if result.success:
            self.get_logger().info(f'Ура! {result.message}')
        else:
            self.get_logger().info(f'Неудача: {result.message}')

# Определение действия (файл CleanRoom.action):
# string room_name          # Какую комнату убирать
# ---
# bool success              # Успешно?
# string message            # Сообщение
# ---
# int32 progress            # Прогресс в %
# string current_stage      # Текущий этап

Почему Actions лучше Services для длинных задач:

  1. Обратная связь — видишь прогресс
  2. Возможность отмены — передумал? Отмени!
  3. Асинхронность — не блокируешься на всё время выполнения

4. Parameters (Параметры) — Настройки робота

Простая аналогия:

Настройки в телефоне — яркость, громкость, язык. Меняешь в одном месте, применяется везде.

import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import ParameterDescriptor

class ConfigurableRobot(Node):
    def __init__(self):
        super().__init__('configurable_robot')
        
        # Объявляем параметры со значениями по умолчанию
        self.declare_parameter('max_speed', 1.0)
        self.declare_parameter('color', 'синий')
        self.declare_parameter('auto_mode', True)
        
        # Получаем параметры
        self.max_speed = self.get_parameter('max_speed').value
        self.color = self.get_parameter('color').value
        self.auto_mode = self.get_parameter('auto_mode').value
        
        self.get_logger().info(f'Скорость: {self.max_speed} м/с')
        self.get_logger().info(f'Цвет: {self.color}')
        self.get_logger().info(f'Авторежим: {"вкл" if self.auto_mode else "выкл"}')
        
        # Подписываемся на изменения параметров
        self.add_on_set_parameters_callback(self.parameters_callback)
    
    def parameters_callback(self, parameters):
        # Вызывается при изменении параметров
        for param in parameters:
            if param.name == 'max_speed':
                self.max_speed = param.value
                self.get_logger().info(f'Новая скорость: {self.max_speed}')
            elif param.name == 'color':
                self.color = param.value
                self.get_logger().info(f'Новый цвет: {self.color}')
        
        return SetParametersResult(successful=True)

# Изменение параметров из командной строки:
# ros2 param set /configurable_robot max_speed 2.0
# ros2 param set /configurable_robot color "красный"
# ros2 param dump /configurable_robot  # Сохранить все параметры в файл

Практическое задание: Робот-патрульный

Создадим робота, который:

  1. Издаёт данные с лидара (Topics)
  2. Отвечает на запросы статуса (Services)
  3. Выполняет патрулирование по маршруту (Actions)
  4. Настраивается параметрами (Parameters)
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from sensor_msgs.msg import LaserScan
from my_robot_srvs.srv import GetStatus
from my_robot_actions.action import Patrol

class PatrolRobot(Node):
    def __init__(self):
        super().__init__('patrol_robot')
        
        # 1. PUBLISHER (Топик) - данные лидара
        self.lidar_pub = self.create_publisher(LaserScan, 'scan', 10)
        
        # 2. SERVICE (Сервис) - статус робота
        self.status_service = self.create_service(
            GetStatus, 'get_status', self.handle_status
        )
        
        # 3. ACTION (Действие) - патрулирование
        self.patrol_action = ActionServer(
            self, Patrol, 'patrol', self.execute_patrol
        )
        
        # 4. PARAMETERS (Параметры) - настройки
        self.declare_parameter('patrol_speed', 0.5)
        self.declare_parameter('battery_level', 100)
        
        self.get_logger().info('Робот-патрульный запущен!')
    
    def publish_lidar_data(self):
        # Имитация данных лидара
        msg = LaserScan()
        msg.ranges = [1.0, 1.1, 1.2, 0.5, 2.0]  # Расстояния в метрах
        self.lidar_pub.publish(msg)
    
    def handle_status(self, request, response):
        # Сервис: отвечаем на запрос статуса
        response.battery = self.get_parameter('battery_level').value
        response.is_patrolling = self.is_patrolling
        response.position = [0.0, 0.0, 0.0]
        return response
    
    async def execute_patrol(self, goal_handle):
        # Действие: выполняем патрулирование
        points = ['A', 'B', 'C', 'D']
        feedback_msg = Patrol.Feedback()
        
        for i, point in enumerate(points):
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                return Patrol.Result(success=False)
            
            self.get_logger().info(f'Еду к точке {point}')
            feedback_msg.current_point = point
            feedback_msg.progress = (i + 1) * 25
            goal_handle.publish_feedback(feedback_msg)
            
            # Имитация движения
            await asyncio.sleep(2)
        
        goal_handle.succeed()
        return Patrol.Result(success=True, message='Патруль завершён')

def main():
    rclpy.init()
    robot = PatrolRobot()
    
    # Главный цикл
    try:
        rclpy.spin(robot)
    except KeyboardInterrupt:
        pass
    
    robot.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Как выбрать правильный паттерн?

Диаграмма принятия решений:

Нужно передавать данные постоянно? → Да → TOPICS
                               ↓ Нет
                          Нужен ответ? → Да → Нужна обратная связь? → Нет → SERVICES
                                              ↓ Да
                                              Долгая задача? → Да → ACTIONS
                                                        ↓ Нет
                                                      SERVICES

Шпаргалка:

СитуацияПаттернПример
Данные идут потокомTopicsВидео с камеры, показания датчиков
Нужно быстро получить ответServicesЗапрос заряда батареи, включение света
Долгая задача с прогрессомActionsДвижение к цели, сканирование помещения
Настройки для всего роботаParametersМаксимальная скорость, цвет подсветки

Полезные команды ROS 2

# Посмотреть все узлы
ros2 node list

# Посмотреть все топики
ros2 topic list

# Посмотреть сообщения в топике
ros2 topic echo /temperature

# Отправить сообщение в топик
ros2 topic pub /temperature std_msgs/msg/Float32 "{data: 24.5}"

# Посмотреть все сервисы
ros2 service list

# Вызвать сервис
ros2 service call /get_status my_robot_srvs/srv/GetStatus

# Посмотреть все действия
ros2 action list

# Посмотреть параметры узла
ros2 param list /patrol_robot

# Изменить параметр
ros2 param set /patrol_robot patrol_speed 0.8

Что делать дальше?

Простые проекты для начала:

  1. Два узла, один топик — датчик температуры + дисплей
  2. Сервис-калькулятор — клиент шлёт числа, сервер возвращает сумму
  3. Робот с действиями — движение по квадрату с обратной связью

Более сложные:

  1. Робот-художник — принимает изображение (Service), рисует (Action), публикует прогресс (Topic)
  2. Умный дом на ROS 2 — датчики (Topics), управление (Services), сценарии (Actions)
  3. Робот-футболист — видит мяч (CV), планирует путь (Services), бежит (Action)

Советы:

  1. Начинай с малого — один паттерн за раз
  2. Используй готовые сообщения (std_msgs) сначала
  3. Делай много логов (self.get_logger().info())
  4. Тестируй через командную строку перед написанием клиента

Помни: ROS 2 — это просто язык. Чем больше ты на нём “говоришь”, тем лучше понимаешь, как думают современные роботы!