👥 Команда • 🔄 Синхронизация • 💻 Программирование • 🎯 Задачи
7 класс • Технология • 45 минут
mw285748 • 15.06.2025
💡 Научимся:
🤖 Результат: Функционирующая команда роботов для совместного решения задач!
Один робот VS Команда роботов:
📊 Производительность:
\[P_{team} = \sum_{i=1}^{n} P_i \times k_{coordination} \times k_{specialization}\]где:
⚡ Время выполнения:
\[T_{team} = \frac{T_{single}}{n_{effective}} + T_{coordination}\]🏗️ Строительные роботы:
📦 Складские системы:
🔬 Исследование Марса:
Структура:
class CentralizedSystem:
def __init__(self):
self.coordinator = MainController()
self.robots = [Robot(i) for i in range(3)]
def execute_task(self, task):
# Центральный планировщик
subtasks = self.coordinator.split_task(task)
# Распределение подзадач
for i, subtask in enumerate(subtasks):
self.robots[i].assign_task(subtask)
# Мониторинг выполнения
while not self.all_completed():
status = self.coordinator.monitor_progress()
if status.need_rebalancing:
self.coordinator.rebalance_tasks()
Применение: Складские системы, производственные линии
Принцип роевого интеллекта:
class SwarmRobot:
def __init__(self, robot_id):
self.id = robot_id
self.neighbors = []
self.local_goal = None
def swarm_behavior(self):
while True:
# Обмен информацией с соседями
neighbor_info = self.communicate_with_neighbors()
# Локальное принятие решений
local_decision = self.make_local_decision(neighbor_info)
# Выполнение действия
self.execute_action(local_decision)
# Адаптация поведения
self.adapt_behavior()
def make_local_decision(self, neighbor_info):
# Простые правила роевого поведения
if self.too_crowded(neighbor_info):
return "spread_out"
elif self.isolated():
return "join_group"
else:
return "continue_task"
Применение: Дроны-исследователи, поисковые операции
Иерархическая структура:
class HybridSystem:
def __init__(self):
self.global_coordinator = GlobalPlanner()
self.local_groups = [LocalGroup(i) for i in range(3)]
def execute_mission(self, mission):
# Глобальное планирование
global_plan = self.global_coordinator.create_plan(mission)
# Распределение между группами
for group, task in zip(self.local_groups, global_plan.tasks):
group.assign_group_task(task)
# Локальная координация в группах
for group in self.local_groups:
group.coordinate_locally()
class LocalGroup:
def coordinate_locally(self):
# Децентрализованная координация внутри группы
for robot in self.robots:
robot.negotiate_with_neighbors()
robot.execute_local_plan()
Статусные сообщения:
class StatusMessage:
def __init__(self, sender_id):
self.sender_id = sender_id
self.timestamp = time.now()
self.position = get_current_position()
self.battery_level = read_battery()
self.current_task = get_active_task()
self.status = "active" # active, busy, error, idle
def to_json(self):
return {
"type": "status",
"sender": self.sender_id,
"data": {
"position": self.position,
"battery": self.battery_level,
"task": self.current_task,
"status": self.status
}
}
Запросы координации:
class CoordinationRequest:
def __init__(self, sender_id, request_type):
self.sender_id = sender_id
self.request_type = request_type # help, permission, negotiation
def create_help_request(self, task_description, required_robots=1):
return {
"type": "help_request",
"sender": self.sender_id,
"task": task_description,
"robots_needed": required_robots,
"priority": self.calculate_priority(),
"deadline": self.estimate_deadline()
}
Барьерная синхронизация:
class BarrierSync:
def __init__(self, robot_count):
self.total_robots = robot_count
self.arrived_robots = set()
def wait_for_all(self, robot_id):
self.arrived_robots.add(robot_id)
# Оповещение о прибытии
self.broadcast(f"Robot {robot_id} reached checkpoint")
# Ожидание остальных
while len(self.arrived_robots) < self.total_robots:
time.sleep(0.1)
# Все роботы готовы - продолжаем
self.broadcast("All robots synchronized - continue")
return True
# Использование в роботе
def synchronized_movement():
move_to_checkpoint()
barrier.wait_for_all(my_robot_id) # Ждем всех
continue_mission() # Продолжаем вместе
Консенсус-алгоритм:
class ConsensusProtocol:
def __init__(self, robot_id, neighbors):
self.robot_id = robot_id
self.neighbors = neighbors
self.proposals = {}
def propose_action(self, action):
# Предлагаем действие соседям
proposal = {
"action": action,
"sender": self.robot_id,
"votes": 0
}
# Рассылка предложения
for neighbor in self.neighbors:
neighbor.send("proposal", proposal)
# Сбор голосов
votes = self.collect_votes(proposal["id"])
# Принятие решения по большинству
if votes > len(self.neighbors) / 2:
return self.execute_action(action)
else:
return self.wait_for_alternative()
Обнаружение потери связи:
class CommunicationMonitor:
def __init__(self):
self.heartbeat_interval = 1.0 # секунда
self.timeout_threshold = 3.0 # 3 секунды
self.last_seen = {}
def monitor_robots(self, robot_list):
while True:
current_time = time.now()
for robot_id in robot_list:
last_contact = self.last_seen.get(robot_id, 0)
if current_time - last_contact > self.timeout_threshold:
self.handle_lost_robot(robot_id)
time.sleep(0.5)
def handle_lost_robot(self, robot_id):
print(f"Lost contact with robot {robot_id}")
# Перераспределение задач
lost_tasks = self.get_robot_tasks(robot_id)
self.redistribute_tasks(lost_tasks)
# Уведомление команды
self.broadcast(f"Robot {robot_id} offline - tasks redistributed")
🚚 Задача 1: “Транспортная команда”
🗺️ Задача 2: “Исследователи территории”
🔧 Задача 3: “Сборочная линия”
🏗️ Задача 4: “Строительная бригада”
Класс робота с коммуникацией:
class CollaborativeRobot:
def __init__(self, robot_id, communication_module):
self.id = robot_id
self.comm = communication_module
self.position = Position(0, 0)
self.task_queue = []
self.partners = []
def send_status(self):
status = {
"id": self.id,
"position": self.position,
"battery": self.get_battery_level(),
"available": len(self.task_queue) == 0
}
self.comm.broadcast("status", status)
def receive_message(self, message):
if message["type"] == "task_assignment":
self.assign_task(message["task"])
elif message["type"] == "coordination_request":
self.handle_coordination_request(message)
elif message["type"] == "emergency":
self.handle_emergency(message)
def collaborate_with(self, partner_id, task):
# Установление партнерства
self.partners.append(partner_id)
# Планирование совместной работы
joint_plan = self.plan_collaboration(task)
# Отправка плана партнеру
self.comm.send_to(partner_id, "joint_plan", joint_plan)
# Выполнение своей части
self.execute_plan_part(joint_plan.my_part)
Этап 1: Выбор задачи и планирование (10 мин)
• Выбор проектной задачи
• Определение ролей роботов
• Планирование алгоритма взаимодействия
Этап 2: Разработка протоколов (15 мин)
• Создание схемы коммуникации
• Определение типов сообщений
• Проектирование алгоритмов синхронизации
Этап 3: Программирование (25 мин)
• Написание кода для каждого робота
• Реализация протоколов связи
• Программирование логики координации
Этап 4: Тестирование и отладка (15 мин)
• Проверка взаимодействия роботов
• Отладка синхронизации
• Оптимизация производительности
Этап 5: Демонстрация (10 мин)
• Презентация решения
• Демонстрация работы команды роботов
class TransportTeam:
def __init__(self):
self.leader = Robot("transport_leader")
self.follower = Robot("transport_follower")
self.cargo_length = 50 # см
def synchronized_movement(self, target):
# Лидер планирует маршрут
path = self.leader.plan_path_to(target)
# Отправка плана ведомому
self.leader.send_to_follower("path_plan", path)
# Синхронное движение
for waypoint in path:
# Лидер движется к точке
self.leader.move_to(waypoint)
# Ведомый следует с учетом длины груза
follower_target = self.calculate_follower_position(waypoint)
self.follower.move_to(follower_target)
# Ожидание завершения движения
self.wait_for_synchronization()
def coordinated_turn(self, angle):
# Специальный алгоритм для поворота с грузом
turn_center = self.calculate_turn_center()
# Лидер поворачивает по внешнему радиусу
leader_radius = self.cargo_length / 2 + self.leader.width / 2
# Ведомый - по внутреннему радиусу
follower_radius = self.cargo_length / 2 - self.follower.width / 2
# Синхронное выполнение поворота
self.execute_synchronized_turn(leader_radius, follower_radius, angle)
class ExplorationTeam:
def __init__(self, territory_bounds):
self.robots = [Robot(f"explorer_{i}") for i in range(3)]
self.territory = territory_bounds
self.shared_map = SharedMap()
def divide_territory(self):
# Разделение территории на зоны
zones = self.territory.split_into_zones(len(self.robots))
# Назначение зон роботам
for robot, zone in zip(self.robots, zones):
robot.assign_exploration_zone(zone)
def explore_collaboratively(self):
while not self.territory_fully_explored():
for robot in self.robots:
# Исследование своей зоны
new_discoveries = robot.explore_step()
# Обновление общей карты
self.shared_map.update(new_discoveries)
# Обмен информацией с командой
robot.share_discoveries(new_discoveries)
# Проверка на необходимость помощи
if robot.needs_help():
self.request_assistance(robot.id)
def request_assistance(self, requesting_robot_id):
# Поиск свободного робота для помощи
available_robots = [r for r in self.robots
if r.id != requesting_robot_id and r.is_available()]
if available_robots:
helper = min(available_robots,
key=lambda r: r.distance_to(requesting_robot_id))
helper.assist_robot(requesting_robot_id)
class AssemblyLine:
def __init__(self):
self.stations = [
AssemblyStation("preparation", 0),
AssemblyStation("assembly", 1),
AssemblyStation("quality_check", 2)
]
self.conveyor = Conveyor()
def process_item(self, item):
current_station = 0
while current_station < len(self.stations):
station = self.stations[current_station]
# Обработка на текущей станции
processed_item = station.process(item)
# Уведомление следующей станции
if current_station < len(self.stations) - 1:
next_station = self.stations[current_station + 1]
next_station.prepare_for_item(processed_item)
# Передача на следующую станцию
if current_station < len(self.stations) - 1:
self.conveyor.transfer_item(processed_item, current_station + 1)
current_station += 1
class AssemblyStation:
def process(self, item):
# Ожидание готовности станции
while not self.is_ready():
time.sleep(0.1)
# Выполнение операции
result = self.perform_operation(item)
# Сигнал о завершении
self.signal_completion()
return result
План выступления (4 минуты на команду):
❓ Вопросы для обсуждения:
📊 Матрица оценивания (20 баллов):
| Критерий | Максимум | Описание |
|---|---|---|
| Алгоритмы | 5 | Эффективность, оптимальность, обработка ошибок |
| Техническая реализация | 5 | Код, коммуникация, надежность |
| Выполнение задачи | 5 | Успешность, время, устойчивость |
| Презентация | 5 | Демонстрация, понимание, анализ |
🎯 Шкала оценок:
Проблема синхронизации:
# ❌ Плохо - роботы не ждут друг друга
def bad_synchronization():
robot1.start_task()
robot2.start_task() # Может начать раньше robot1
# ✅ Хорошо - явная синхронизация
def good_synchronization():
barrier = SynchronizationBarrier(2)
robot1.prepare_task()
robot2.prepare_task()
barrier.wait() # Ждем готовности всех
robot1.start_task()
robot2.start_task()
Проблема deadlock (взаимная блокировка):
# ❌ Deadlock - роботы ждут друг друга вечно
def potential_deadlock():
robot1.request_resource(resource_A)
robot1.request_resource(resource_B) # Может зависнуть
robot2.request_resource(resource_B)
robot2.request_resource(resource_A) # Может зависнуть
# ✅ Избежание deadlock - упорядоченный захват ресурсов
def avoid_deadlock():
# Всегда захватываем ресурсы в одинаковом порядке
resources = sorted([resource_A, resource_B])
for robot in [robot1, robot2]:
for resource in resources:
robot.request_resource(resource)
class SystemMonitor:
def __init__(self, robots):
self.robots = robots
self.performance_log = []
def monitor_performance(self):
while True:
metrics = {
"timestamp": time.now(),
"active_robots": len([r for r in self.robots if r.is_active()]),
"task_completion_rate": self.calculate_completion_rate(),
"communication_latency": self.measure_latency(),
"system_efficiency": self.calculate_efficiency()
}
self.performance_log.append(metrics)
# Предупреждения о проблемах
if metrics["communication_latency"] > 1000: # мс
self.alert("High communication latency detected")
if metrics["system_efficiency"] < 0.7:
self.alert("System efficiency below threshold")
time.sleep(5) # Мониторинг каждые 5 секунд
🟢 ЗЕЛЕНЫЙ (что получилось хорошо):
🟡 ЖЕЛТЫЙ (что можно улучшить):
🔴 КРАСНЫЙ (что вызвало затруднения):
➕ ПЛЮС (что взять на заметку):
Оценка работы команды (1-5): ___
📋 Структура отчета:
Предложить улучшения для системы:
✅ Научились:
🧠 Поняли:
“Эффективная команда роботов = Правильные алгоритмы + Надежная связь + Умная координация”
🚀 Следующий шаг: Создание автономных адаптивных систем с машинным обучением
💡 Теперь ваши роботы умеют работать в команде как настоящие профессионалы!