Akka Cluster: как ждать n маршрутов для регистрации на роутере, прежде чем маршрутизировать сообщения - PullRequest
0 голосов
/ 30 июня 2018

У меня есть кластер Akka, который требует 3 узла:

cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551"
      ,"akka.tcp://ClusterSystem@127.0.0.1:2552"]

    min-nr-of-members = 3
}

А затем я использую Cluster Singleton для запуска актера, который запускает маршрутизатор с поддержкой кластера:

deployment {
        /parent/singleton/router1 {
              router = round-robin-pool
              nr-of-instances = 3
              cluster {
                enabled = on
                max-nr-of-instances-per-node = 1
                allow-local-routees = on
         }
}

Вот как я инициализирую синглтон:

@Override
public void preStart()
{
    ActorRef router =
        getContext().actorOf(
            WorkerActor.props().withRouter(FromConfig.getInstance()),
            "router1");

    Runnable r = () -> {
        for (int i = 0; i < 20; i++) {
            WorkerActor.Message message = WorkerActor.Message.addAccount(
                WorkerActor.Account.of("username_" + i,
                                       "password_" + i)
            );

            router.tell(message, ActorRef.noSender());
        }
    };

    getContext().getSystem().scheduler()
                .scheduleOnce(Duration.ZERO, r,
                              getContext().dispatcher());
}

Когда я запускаю 3 узла кластера, синглтон создается и начинает посылать сообщения на его маршруты, но кажется, что на данный момент существует только один, поэтому все сообщения идут на этот. Я хочу дождаться, пока будут готовы 3 маршрута, и затем перебрать сообщения малиновки между ними.

Единственный способ, которым я смог достичь этого, - это добавить задержку к scheduleOnce, как это

getContext().getSystem().scheduler()
                .scheduleOnce(Duration.ofSeconds(5) , r,
                              getContext().dispatcher());

Но это все равно что скрестить пальцы, что 3 экземпляра будут готовы, прежде чем вы начнете отправлять сообщения.

Итак, вопрос в том, как правильно ожидать готовности всех nr-of-instances требуемых экземпляров Routee, прежде чем начать отправлять им сообщения?

Ответы [ 2 ]

0 голосов
/ 06 июля 2018

Хорошо, я, вероятно, должен прочитать основную документацию более внимательно, прежде чем погружаться в кластеризацию.

Ответ на этот вопрос - отправить сообщение GetRoutees маршрутизатору, а затем обработать ответ, чтобы проверить, сколько маршрутов уже добавлено.

GetRoutees задокументировано здесь: https://doc.akka.io/docs/akka/current/routing.html#management-messages

Отправка akka.routing.GetRoutees субъекту-маршрутизатору заставит его отправить обратно используемые в настоящее время маршруты в сообщении akka.routing.Routees.

И реализация может выглядеть примерно так:

@Override
public void preStart()
{
    ActorRef router =
        getContext()
            .actorOf(Props.empty().withRouter(FromConfig.getInstance()),
                     "router1");

    router.tell(GetRoutees.getInstance(), self());

}

@Override
public Receive createReceive()
{
    return
        ReceiveBuilder.create()
                      .match(Routees.class, r -> {
                          int noOfRoutees = r.getRoutees().size();
                          log().info("No of routees: {}", noOfRoutees);
                          if (noOfRoutees < 3)
                              getSender().tell(GetRoutees.getInstance(),
                                               getSelf());
                          else
                              distributeMessages(getSender());
                      })
                      .matchAny(o -> log().info(
                          "oops, I don't understand this message {}",
                          o.getClass()))
                      .build();
}
0 голосов
/ 30 июня 2018

Возможно, вы захотите использовать обратный вызов registerOnMemberUp. Ниже приведены основные моменты от Akka doc :

С помощью опции конфигурации вы можете определить необходимое количество членов до того, как лидер изменит статус участника "Присоединения" к статусу "Вверх".:

  akka.cluster.min-nr-of-members = 3

Вы можете запустить актеров в обратном вызове registerOnMemberUp, который будет вызвано, когда текущий статус участника изменится на «Вверх», то есть кластер имеет по крайней мере определенное количество членов.

  Cluster.get(system).registerOnMemberUp(new Runnable() {
    @Override
    public void run() {
      // e.g. Send messages to cluster members
    }
  });
...