🤖 Программирование роботов для совместной работы

От теории к практике многоагентных систем

👥 Команда • 🔄 Синхронизация • 💻 Программирование • 🎯 Задачи
7 класс • Технология • 45 минут
mw285748 • 15.06.2025

🎯 Цель урока

💡 Научимся:

  • Программировать синхронизацию действий роботов
  • Создавать протоколы коммуникации между роботами
  • Распределять задачи между агентами системы
  • Отлаживать распределенные робототехнические системы

🤖 Результат: Функционирующая команда роботов для совместного решения задач!

🌟 От одиночки к команде

🔍 Сравнение подходов

Один робот VS Команда роботов:

📊 Производительность:

\[P_{team} = \sum_{i=1}^{n} P_i \times k_{coordination} \times k_{specialization}\]

где:

  • P_i = производительность i-го робота
  • k_coordination = коэффициент координации (0.7-1.2)
  • k_specialization = коэффициент специализации (1.0-2.0)

⚡ Время выполнения:

\[T_{team} = \frac{T_{single}}{n_{effective}} + T_{coordination}\]

🎬 Примеры совместной работы

🏗️ Строительные роботы:

  • 🏗️ Один робот кладет кирпичи
  • 🚚 Другой доставляет материалы
  • 📐 Третий контролирует качество
  • ⏱️ Результат: здание за 1/3 времени

📦 Складские системы:

  • 🤖 100+ роботов работают одновременно
  • 📍 Каждый знает свою зону ответственности
  • 🚫 Избегают столкновений автоматически
  • 📈 Эффективность на 300% выше

🔬 Исследование Марса:

  • 🚁 Вертолет Ingenuity разведывает маршруты
  • 🚗 Perseverance движется по безопасному пути
  • 📡 Орбитальные спутники передают данные на Землю

🏗️ Архитектуры взаимодействия

👑 Централизованная модель

Структура:

class CentralizedSystem:
    def __init__(self):
        self.coordinator = MainController()
        self.robots = [Robot(i) for i in range(3)]
        
    def execute_task(self, task):
        # Центральный планировщик
        subtasks = self.coordinator.split_task(task)
        
        # Распределение подзадач
        for i, subtask in enumerate(subtasks):
            self.robots[i].assign_task(subtask)
            
        # Мониторинг выполнения
        while not self.all_completed():
            status = self.coordinator.monitor_progress()
            if status.need_rebalancing:
                self.coordinator.rebalance_tasks()

Применение: Складские системы, производственные линии

🌐 Децентрализованная модель

Принцип роевого интеллекта:

class SwarmRobot:
    def __init__(self, robot_id):
        self.id = robot_id
        self.neighbors = []
        self.local_goal = None
        
    def swarm_behavior(self):
        while True:
            # Обмен информацией с соседями
            neighbor_info = self.communicate_with_neighbors()
            
            # Локальное принятие решений
            local_decision = self.make_local_decision(neighbor_info)
            
            # Выполнение действия
            self.execute_action(local_decision)
            
            # Адаптация поведения
            self.adapt_behavior()
            
    def make_local_decision(self, neighbor_info):
        # Простые правила роевого поведения
        if self.too_crowded(neighbor_info):
            return "spread_out"
        elif self.isolated():
            return "join_group"
        else:
            return "continue_task"

Применение: Дроны-исследователи, поисковые операции

🔄 Гибридная модель

Иерархическая структура:

class HybridSystem:
    def __init__(self):
        self.global_coordinator = GlobalPlanner()
        self.local_groups = [LocalGroup(i) for i in range(3)]
        
    def execute_mission(self, mission):
        # Глобальное планирование
        global_plan = self.global_coordinator.create_plan(mission)
        
        # Распределение между группами
        for group, task in zip(self.local_groups, global_plan.tasks):
            group.assign_group_task(task)
            
        # Локальная координация в группах
        for group in self.local_groups:
            group.coordinate_locally()

class LocalGroup:
    def coordinate_locally(self):
        # Децентрализованная координация внутри группы
        for robot in self.robots:
            robot.negotiate_with_neighbors()
            robot.execute_local_plan()

📡 Протоколы коммуникации

💬 Типы сообщений

Статусные сообщения:

class StatusMessage:
    def __init__(self, sender_id):
        self.sender_id = sender_id
        self.timestamp = time.now()
        self.position = get_current_position()
        self.battery_level = read_battery()
        self.current_task = get_active_task()
        self.status = "active"  # active, busy, error, idle
        
    def to_json(self):
        return {
            "type": "status",
            "sender": self.sender_id,
            "data": {
                "position": self.position,
                "battery": self.battery_level,
                "task": self.current_task,
                "status": self.status
            }
        }

Запросы координации:

class CoordinationRequest:
    def __init__(self, sender_id, request_type):
        self.sender_id = sender_id
        self.request_type = request_type  # help, permission, negotiation
        
    def create_help_request(self, task_description, required_robots=1):
        return {
            "type": "help_request",
            "sender": self.sender_id,
            "task": task_description,
            "robots_needed": required_robots,
            "priority": self.calculate_priority(),
            "deadline": self.estimate_deadline()
        }

🔄 Алгоритмы синхронизации

Барьерная синхронизация:

class BarrierSync:
    def __init__(self, robot_count):
        self.total_robots = robot_count
        self.arrived_robots = set()
        
    def wait_for_all(self, robot_id):
        self.arrived_robots.add(robot_id)
        
        # Оповещение о прибытии
        self.broadcast(f"Robot {robot_id} reached checkpoint")
        
        # Ожидание остальных
        while len(self.arrived_robots) < self.total_robots:
            time.sleep(0.1)
            
        # Все роботы готовы - продолжаем
        self.broadcast("All robots synchronized - continue")
        return True

# Использование в роботе
def synchronized_movement():
    move_to_checkpoint()
    barrier.wait_for_all(my_robot_id)  # Ждем всех
    continue_mission()  # Продолжаем вместе

Консенсус-алгоритм:

class ConsensusProtocol:
    def __init__(self, robot_id, neighbors):
        self.robot_id = robot_id
        self.neighbors = neighbors
        self.proposals = {}
        
    def propose_action(self, action):
        # Предлагаем действие соседям
        proposal = {
            "action": action,
            "sender": self.robot_id,
            "votes": 0
        }
        
        # Рассылка предложения
        for neighbor in self.neighbors:
            neighbor.send("proposal", proposal)
            
        # Сбор голосов
        votes = self.collect_votes(proposal["id"])
        
        # Принятие решения по большинству
        if votes > len(self.neighbors) / 2:
            return self.execute_action(action)
        else:
            return self.wait_for_alternative()

🛡️ Обработка ошибок связи

Обнаружение потери связи:

class CommunicationMonitor:
    def __init__(self):
        self.heartbeat_interval = 1.0  # секунда
        self.timeout_threshold = 3.0   # 3 секунды
        self.last_seen = {}
        
    def monitor_robots(self, robot_list):
        while True:
            current_time = time.now()
            
            for robot_id in robot_list:
                last_contact = self.last_seen.get(robot_id, 0)
                
                if current_time - last_contact > self.timeout_threshold:
                    self.handle_lost_robot(robot_id)
                    
            time.sleep(0.5)
            
    def handle_lost_robot(self, robot_id):
        print(f"Lost contact with robot {robot_id}")
        
        # Перераспределение задач
        lost_tasks = self.get_robot_tasks(robot_id)
        self.redistribute_tasks(lost_tasks)
        
        # Уведомление команды
        self.broadcast(f"Robot {robot_id} offline - tasks redistributed")

🛠️ Практическая работа

📋 Проектные задачи

🚚 Задача 1: “Транспортная команда”

  • 2 робота несут длинный груз
  • Синхронизация скорости и направления
  • Координированные повороты
  • Остановка при препятствиях

🗺️ Задача 2: “Исследователи территории”

  • 3 робота исследуют разные зоны
  • Обмен картами местности
  • Избежание дублирования работы
  • Сбор в точке рандеву

🔧 Задача 3: “Сборочная линия”

  • Конвейер из 3 роботов
  • Каждый выполняет свой этап сборки
  • Передача изделий между роботами
  • Контроль качества на каждом этапе

🏗️ Задача 4: “Строительная бригада”

  • Роботы строят башню из блоков
  • Один подает материалы
  • Другой устанавливает блоки
  • Третий контролирует устойчивость

💻 Базовый код для старта

Класс робота с коммуникацией:

class CollaborativeRobot:
    def __init__(self, robot_id, communication_module):
        self.id = robot_id
        self.comm = communication_module
        self.position = Position(0, 0)
        self.task_queue = []
        self.partners = []
        
    def send_status(self):
        status = {
            "id": self.id,
            "position": self.position,
            "battery": self.get_battery_level(),
            "available": len(self.task_queue) == 0
        }
        self.comm.broadcast("status", status)
        
    def receive_message(self, message):
        if message["type"] == "task_assignment":
            self.assign_task(message["task"])
        elif message["type"] == "coordination_request":
            self.handle_coordination_request(message)
        elif message["type"] == "emergency":
            self.handle_emergency(message)
            
    def collaborate_with(self, partner_id, task):
        # Установление партнерства
        self.partners.append(partner_id)
        
        # Планирование совместной работы
        joint_plan = self.plan_collaboration(task)
        
        # Отправка плана партнеру
        self.comm.send_to(partner_id, "joint_plan", joint_plan)
        
        # Выполнение своей части
        self.execute_plan_part(joint_plan.my_part)

⏱️ План работы (60 минут)

Этап 1: Выбор задачи и планирование (10 мин)
• Выбор проектной задачи
• Определение ролей роботов
• Планирование алгоритма взаимодействия

Этап 2: Разработка протоколов (15 мин)
• Создание схемы коммуникации
• Определение типов сообщений
• Проектирование алгоритмов синхронизации

Этап 3: Программирование (25 мин)
• Написание кода для каждого робота
• Реализация протоколов связи
• Программирование логики координации

Этап 4: Тестирование и отладка (15 мин)
• Проверка взаимодействия роботов
• Отладка синхронизации
• Оптимизация производительности

Этап 5: Демонстрация (10 мин)
• Презентация решения
• Демонстрация работы команды роботов

🎯 Алгоритмы для задач

🚚 Транспортная команда

class TransportTeam:
    def __init__(self):
        self.leader = Robot("transport_leader")
        self.follower = Robot("transport_follower")
        self.cargo_length = 50  # см
        
    def synchronized_movement(self, target):
        # Лидер планирует маршрут
        path = self.leader.plan_path_to(target)
        
        # Отправка плана ведомому
        self.leader.send_to_follower("path_plan", path)
        
        # Синхронное движение
        for waypoint in path:
            # Лидер движется к точке
            self.leader.move_to(waypoint)
            
            # Ведомый следует с учетом длины груза
            follower_target = self.calculate_follower_position(waypoint)
            self.follower.move_to(follower_target)
            
            # Ожидание завершения движения
            self.wait_for_synchronization()
            
    def coordinated_turn(self, angle):
        # Специальный алгоритм для поворота с грузом
        turn_center = self.calculate_turn_center()
        
        # Лидер поворачивает по внешнему радиусу
        leader_radius = self.cargo_length / 2 + self.leader.width / 2
        
        # Ведомый - по внутреннему радиусу
        follower_radius = self.cargo_length / 2 - self.follower.width / 2
        
        # Синхронное выполнение поворота
        self.execute_synchronized_turn(leader_radius, follower_radius, angle)

🗺️ Исследователи территории

class ExplorationTeam:
    def __init__(self, territory_bounds):
        self.robots = [Robot(f"explorer_{i}") for i in range(3)]
        self.territory = territory_bounds
        self.shared_map = SharedMap()
        
    def divide_territory(self):
        # Разделение территории на зоны
        zones = self.territory.split_into_zones(len(self.robots))
        
        # Назначение зон роботам
        for robot, zone in zip(self.robots, zones):
            robot.assign_exploration_zone(zone)
            
    def explore_collaboratively(self):
        while not self.territory_fully_explored():
            for robot in self.robots:
                # Исследование своей зоны
                new_discoveries = robot.explore_step()
                
                # Обновление общей карты
                self.shared_map.update(new_discoveries)
                
                # Обмен информацией с командой
                robot.share_discoveries(new_discoveries)
                
                # Проверка на необходимость помощи
                if robot.needs_help():
                    self.request_assistance(robot.id)
                    
    def request_assistance(self, requesting_robot_id):
        # Поиск свободного робота для помощи
        available_robots = [r for r in self.robots 
                          if r.id != requesting_robot_id and r.is_available()]
        
        if available_robots:
            helper = min(available_robots, 
                        key=lambda r: r.distance_to(requesting_robot_id))
            helper.assist_robot(requesting_robot_id)

🔧 Сборочная линия

class AssemblyLine:
    def __init__(self):
        self.stations = [
            AssemblyStation("preparation", 0),
            AssemblyStation("assembly", 1), 
            AssemblyStation("quality_check", 2)
        ]
        self.conveyor = Conveyor()
        
    def process_item(self, item):
        current_station = 0
        
        while current_station < len(self.stations):
            station = self.stations[current_station]
            
            # Обработка на текущей станции
            processed_item = station.process(item)
            
            # Уведомление следующей станции
            if current_station < len(self.stations) - 1:
                next_station = self.stations[current_station + 1]
                next_station.prepare_for_item(processed_item)
                
            # Передача на следующую станцию
            if current_station < len(self.stations) - 1:
                self.conveyor.transfer_item(processed_item, current_station + 1)
                
            current_station += 1
            
class AssemblyStation:
    def process(self, item):
        # Ожидание готовности станции
        while not self.is_ready():
            time.sleep(0.1)
            
        # Выполнение операции
        result = self.perform_operation(item)
        
        # Сигнал о завершении
        self.signal_completion()
        
        return result

🎤 Демонстрация и защита

📊 Презентация решений

План выступления (4 минуты на команду):

  1. Задача - что должна решать команда роботов?
  2. Архитектура - как организовано взаимодействие?
  3. Протоколы - как роботы общаются друг с другом?
  4. Демонстрация - показ работы в действии
  5. Анализ - что получилось, что можно улучшить?

❓ Вопросы для обсуждения:

  • Как система адаптируется к отказу одного робота?
  • Можно ли масштабировать решение на больше роботов?
  • Какие узкие места есть в алгоритме?
  • Как оптимизировать время выполнения задачи?

🏆 Критерии оценки

📊 Матрица оценивания (20 баллов):

КритерийМаксимумОписание
Алгоритмы5Эффективность, оптимальность, обработка ошибок
Техническая реализация5Код, коммуникация, надежность
Выполнение задачи5Успешность, время, устойчивость
Презентация5Демонстрация, понимание, анализ

🎯 Шкала оценок:

  • 18-20: “5” (отлично)
  • 14-17: “4” (хорошо)
  • 10-13: “3” (удовлетворительно)

🔧 Отладка распределенных систем

🐛 Типичные проблемы

Проблема синхронизации:

# ❌ Плохо - роботы не ждут друг друга
def bad_synchronization():
    robot1.start_task()
    robot2.start_task()  # Может начать раньше robot1
    
# ✅ Хорошо - явная синхронизация
def good_synchronization():
    barrier = SynchronizationBarrier(2)
    
    robot1.prepare_task()
    robot2.prepare_task()
    
    barrier.wait()  # Ждем готовности всех
    
    robot1.start_task()
    robot2.start_task()

Проблема deadlock (взаимная блокировка):

# ❌ Deadlock - роботы ждут друг друга вечно
def potential_deadlock():
    robot1.request_resource(resource_A)
    robot1.request_resource(resource_B)  # Может зависнуть
    
    robot2.request_resource(resource_B)
    robot2.request_resource(resource_A)  # Может зависнуть
    
# ✅ Избежание deadlock - упорядоченный захват ресурсов
def avoid_deadlock():
    # Всегда захватываем ресурсы в одинаковом порядке
    resources = sorted([resource_A, resource_B])
    
    for robot in [robot1, robot2]:
        for resource in resources:
            robot.request_resource(resource)

📊 Мониторинг системы

class SystemMonitor:
    def __init__(self, robots):
        self.robots = robots
        self.performance_log = []
        
    def monitor_performance(self):
        while True:
            metrics = {
                "timestamp": time.now(),
                "active_robots": len([r for r in self.robots if r.is_active()]),
                "task_completion_rate": self.calculate_completion_rate(),
                "communication_latency": self.measure_latency(),
                "system_efficiency": self.calculate_efficiency()
            }
            
            self.performance_log.append(metrics)
            
            # Предупреждения о проблемах
            if metrics["communication_latency"] > 1000:  # мс
                self.alert("High communication latency detected")
                
            if metrics["system_efficiency"] < 0.7:
                self.alert("System efficiency below threshold")
                
            time.sleep(5)  # Мониторинг каждые 5 секунд

🤔 Рефлексия “Светофор+”

📝 Анализ работы

🟢 ЗЕЛЕНЫЙ (что получилось хорошо):

  • 🟡 ЖЕЛТЫЙ (что можно улучшить):

  • 🔴 КРАСНЫЙ (что вызвало затруднения):

  • ➕ ПЛЮС (что взять на заметку):

    Оценка работы команды (1-5): ___

    🏠 Домашнее задание

    🎯 Отчет по практической работе

    📋 Структура отчета:

    1. Название проекта многоагентной системы
    2. Цель и задачи системы
    3. Состав команды роботов и их роли
    4. Алгоритмы координации с блок-схемами
    5. Система коммуникации и протоколы
    6. Результаты тестирования и проблемы
    7. Перспективы развития системы
    8. Выводы и полученный опыт

    🌟 Творческое задание

    Предложить улучшения для системы:

    • Добавить машинное обучение для оптимизации
    • Интегрировать с системами IoT
    • Создать веб-интерфейс для мониторинга
    • Разработать алгоритмы самовосстановления

    🎉 Итоги урока

    🏆 Что освоили

    ✅ Научились:

    • Программировать синхронизацию действий роботов
    • Создавать протоколы межроботной коммуникации
    • Распределять задачи между агентами системы
    • Отлаживать сложные распределенные системы

    🧠 Поняли:

    • Координация роботов требует специальных алгоритмов
    • Коммуникация критически важна для совместной работы
    • Отказоустойчивость обеспечивает надежность системы
    • Мониторинг необходим для поддержания производительности

    🌟 Главный принцип

    “Эффективная команда роботов = Правильные алгоритмы + Надежная связь + Умная координация”

    🚀 Следующий шаг: Создание автономных адаптивных систем с машинным обучением

    💡 Теперь ваши роботы умеют работать в команде как настоящие профессионалы!