Планировщик - неправильный инструмент для вашего случая использования.
Альтернативой является Sink.actorRefWithAck
от Akka Stream (приведенный ниже код адаптирован из примера в связанной документации и заимствует определенные здесь служебные классы). Вам необходимо настроить рабочего субъекта так, чтобы он обрабатывал несколько сообщений, связанных со статусом потока, и отвечал сообщением подтверждения. Сообщение подтверждения действует как сигнал противодавления и указывает, что субъект готов обработать следующее сообщение MessageToTheActor
. Рабочий актер будет выглядеть примерно так:
enum Ack {
INSTANCE;
}
static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
private final Throwable cause;
public StreamFailure(Throwable cause) { this.cause = cause; }
public Throwable getCause() { return cause; }
}
public class MyWorker extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StreamInitialized.class, init -> {
log().info("Stream initialized");
sender().tell(Ack.INSTANCE, self());
})
.match(MessageToTheActor.class, msg -> {
log().info("Received message: {}", msg);
// do something with the message...
sender().tell(Ack.INSTANCE, self());
})
.match(StreamCompleted.class, completed -> {
log().info("Stream completed");
})
.match(StreamFailure.class, failed -> {
log().error(failed.getCause(),"Stream failed!");
})
.build();
}
}
Для использования Sink.actorRefWithAck
с вышеуказанным актером:
final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);
ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));
Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());
Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
workerActor,
new StreamInitialized(),
Ack.INSTANCE,
new StreamCompleted(),
ex -> new StreamFailure(ex)
);
messages.runWith(sink, materializer);
Обратите внимание на использование Source.repeat
, которое в этом случае постоянно генерирует сообщение MessageToTheActor
. Использование Sink.actorRefWithAck
гарантирует, что субъект не получит другого сообщения, пока не завершит обработку текущего сообщения.
Требуется следующий импорт (очевидно, так же, как и Akka Streams зависимость):
import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;