Akka Scheduler: запускать только после завершения текущего запуска - PullRequest
0 голосов
/ 29 июня 2018

Планирование работы с помощью Akka Scheduler выглядит примерно так (по крайней мере, из документации):

system.scheduler().schedule(
    Duration.Zero(),
    Duration.create(5, TimeUnit.SECONDS),
    workerActor,
    new MessageToTheActor(),
    system.dispatcher(), ActorRef.noSender());

Я, однако, не понимаю, как я могу гарантировать, что следующий прогон произойдет только после завершения текущего прогона. Я безуспешно искал вокруг: (

Ответы [ 3 ]

0 голосов
/ 29 июня 2018
system.scheduler().schedule(
    Duration.Zero(),
    Duration.create(5, TimeUnit.SECONDS),
    workerActor,
    new MessageToTheActor(),
    system.dispatcher(), ActorRef.noSender());

Над кодом означает, что каждые 5 секунд планировщик будет отправлять message актеру workerActor.

И, как вы знаете, субъект по умолчанию действует только с одним потоком (если вы не сконфигурируете с nr-of-instance> 1), это означает, что все ваше сообщение, отправленное на workActor, будет буферизовано в mailbox, потому что только один поток может вызвать receive функцию actor.

Другими словами, вы всегда можете быть уверены, что следующий прогон произойдет только тогда, когда текущий прогон завершен, поскольку для актера по умолчанию одновременно работал только один поток.

0 голосов
/ 29 июня 2018

Планировщик - неправильный инструмент для вашего случая использования.

Альтернативой является Sink.actorRefWithAck от Akka Stream (приведенный ниже код адаптирован из примера в связанной документации и заимствует определенные здесь служебные классы). Вам необходимо настроить рабочего субъекта так, чтобы он обрабатывал несколько сообщений, связанных со статусом потока, и отвечал сообщением подтверждения. Сообщение подтверждения действует как сигнал противодавления и указывает, что субъект готов обработать следующее сообщение MessageToTheActor. Рабочий актер будет выглядеть примерно так:

enum Ack {
  INSTANCE;
}

static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
  private final Throwable cause;
  public StreamFailure(Throwable cause) { this.cause = cause; }

  public Throwable getCause() { return cause; }
}

public class MyWorker extends AbstractLoggingActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(StreamInitialized.class, init -> {
        log().info("Stream initialized");
        sender().tell(Ack.INSTANCE, self());
      })
      .match(MessageToTheActor.class, msg -> {
        log().info("Received message: {}", msg);
        // do something with the message...
        sender().tell(Ack.INSTANCE, self());
      })
      .match(StreamCompleted.class, completed -> {
        log().info("Stream completed");
      })
      .match(StreamFailure.class, failed -> {
        log().error(failed.getCause(),"Stream failed!");
      })
      .build();
  }
}

Для использования Sink.actorRefWithAck с вышеуказанным актером:

final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);

ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));

Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());

Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
  workerActor,
  new StreamInitialized(),
  Ack.INSTANCE,
  new StreamCompleted(),
  ex -> new StreamFailure(ex)
);

messages.runWith(sink, materializer);

Обратите внимание на использование Source.repeat, которое в этом случае постоянно генерирует сообщение MessageToTheActor. Использование Sink.actorRefWithAck гарантирует, что субъект не получит другого сообщения, пока не завершит обработку текущего сообщения.

Требуется следующий импорт (очевидно, так же, как и Akka Streams зависимость):

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;
0 голосов
/ 29 июня 2018

Планировщик означает, что вы хотите что-то периодически, теперь, если ваш second прогон зависит от вашего first прогона, то почему вы даже хотите создать планировщик.

Просто создайте двух актеров, one manager actor и других child actor.

Когда заданием является success, child actor отправляет сообщение об успешном выполнении на parent actor, поэтому родительский субъект просит child actor запустить задание во второй раз. Это гарантирует, что задачи выполняются в периодическом порядке, а также, когда предыдущая была успешной.

Таким образом, вы должны реализовать соответствующий класс case в ваших методах получения вашего actors.

Надеюсь, это поможет!

...