Как написать метод, который проверяет, выполнил ли агент задачу в Osbrain? - PullRequest
1 голос
/ 26 мая 2020

У меня есть вопрос, как написать правильную функцию, которая проверяет, выполнил ли агент свою задачу в Osbrain. У меня есть три агента: агент транспорта, агент узла и агент координатора. Основная задача агента-координатора - синхронизировать действия других агентов. Агенты координатора связываются с SYNC_PUB, а узлы и транспортные агенты SUB - с агентом координатора. Моя первоначальная реализация зависла после первого временного шага / итерации. Я неправильно реализую метод status_checker ?

from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = 'coordinator1'


class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        # time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
        self.status_list = []

    def first_synchronization(self, time_step, iteration):
        self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
                  topic='first_synchronization')

    def status_handler(self, message):
        yield 'I have added you to the status_list'
        self.status_list.append(message)

    def status_checker(self):
        count = 0
        while len(self.status_list) < 2:
            count += 1
            time.sleep(1)
            return
        self.status_list.clear()


    def init_environment(self):
        self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)

        self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)

        self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                                    handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
        self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                               handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})


if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
            synchronizer_coordinator_agent.status_checker()
    time.sleep(1)

Он печатает это, а затем зависает

(NetworkAgent): {'time_step': 0, 'iteration': 1}

(RMOAgent): {'time_step': 0, 'iteration': 1}

1 Ответ

0 голосов
/ 26 мая 2020

Ага, похоже, ваш метод status_checker() сломан. Я предполагаю, что вы хотите, чтобы этот метод блокировался до тех пор, пока в status_list не будет 2 сообщения (одно от агента узла, а другое от транспортного агента).

Так что вы можете искать что-то вроде:

def status_checker(self):
    while True:
        if len(self.status_list) == 2:
            break
        time.sleep(1)
    self.status_list.clear()

Однако, когда вы вызываете этот метод из прокси:

synchronizer_coordinator_agent.status_checker()

Координатор блокирует выполнение этого вызова, поэтому он не будет обрабатывать другие входящие сообщения. Быстрый и грязный обходной путь - использовать unsafe вызов , например:

synchronizer_coordinator_agent.unsafe.status_checker()

Основная проблема, которую я вижу здесь, - это то, как вы справляетесь с этим. проверка статуса от __main__. Вы должны переместить вашу синхронизацию / шаги в вашего координатора. Это означает:

  • Вызвать метод .start_iterations() на вашем координаторе из __main__, чтобы координатор выполнил первый шаг
  • Затем сделайте вашего координатора реактивным на входящие сообщения (как только он получает 2 сообщения, он выполняет следующий шаг)
  • Координатор продолжает выполнять шаги до завершения
  • Из основного, просто периодически отслеживайте своего агента-координатора, чтобы видеть, когда он done
  • Завершите работу вашей системы с помощью ns.shutdown()

Ваш main может выглядеть следующим образом:

if __name__ == "__main__":

    ns = run_nameserver()

    coordinator = run_agent(...)
    coordinator.init_environment()
    coordinator.start_iterations()

    while not coordinator.finished():
        time.sleep(0.5)

    ns.shutdown()

На самом деле это не связано с этим, но учтите, что ваш current range(1, 2) приведет только к одной итерации (хотя это может быть сделано намеренно). Если вам нужно 2 итерации, вы можете использовать range(2).

...