Я нашел следующий пример на https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html:
// A simple producer that publishes a new "message" every second
Source<String, Cancellable> producer =
Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "New message");
// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
RunnableGraph<Source<String, NotUsed>> runnableGraph =
producer.toMat(BroadcastHub.of(String.class, 256), Keep.right());
// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<String, NotUsed> fromProducer = runnableGraph.run(materializer);
// Print out messages from the producer in two independent consumers
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer).toCompletableFuture().get();
Он в основном делает то, что должен.
Я сменил тогда "производителя" следующим образом
// A simple producer that publishes a new "message" every second
Source<String, NotUsed> producer =
Source.range(1, 100)
.map(i -> "" + i)
.wireTap(param -> System.out.println("param: " + param));
// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
RunnableGraph<Source<String, NotUsed>> runnableGraph =
producer
.initialDelay(Duration.ofSeconds(1))
.toMat(BroadcastHub.of(String.class, 256), Keep.right());
// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<String, NotUsed> fromProducer = runnableGraph.run(materializer);
// Print out messages from the producer in two independent consumers
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer)
.toCompletableFuture().get();
Код abover работает как положено. Но когда я удаляю строку «initialDelay (Duration.ofSeconds (1))», она больше не работает.
Из-за WireTap я вижу, что сообщения действительно генерируются, но не потребляются потребителями. Мне кажется, что сообщения производятся до того, как потребители присоединяются.
Я довольно новичок в akka-streams, поэтому я могу кое-что здесь упустить.
Как я могу предотвратить это без использования «initialDelay»? Есть ли способ «заблокировать» источник, пока потребители не начнут «вытягивать» сообщения?
Спасибо
Hansjörg