Как предотвратить потерю записей при использовании Akka BroadcastHub - PullRequest
0 голосов
/ 09 апреля 2019

Я нашел следующий пример на https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html:

// A simple producer that publishes a new "message" every second
Source<String, Cancellable> producer =
    Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "New message");

// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
RunnableGraph<Source<String, NotUsed>> runnableGraph =
    producer.toMat(BroadcastHub.of(String.class, 256), Keep.right());

// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<String, NotUsed> fromProducer = runnableGraph.run(materializer);

// Print out messages from the producer in two independent consumers
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer).toCompletableFuture().get();

Он в основном делает то, что должен.

Я сменил тогда "производителя" следующим образом

// A simple producer that publishes a new "message" every second
Source<String, NotUsed> producer =
    Source.range(1, 100)
          .map(i -> "" + i)
          .wireTap(param -> System.out.println("param: " + param));

// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
RunnableGraph<Source<String, NotUsed>> runnableGraph =
    producer
        .initialDelay(Duration.ofSeconds(1))
        .toMat(BroadcastHub.of(String.class, 256), Keep.right());

// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<String, NotUsed> fromProducer = runnableGraph.run(materializer);

// Print out messages from the producer in two independent consumers
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer)
            .toCompletableFuture().get();

Код abover работает как положено. Но когда я удаляю строку «initialDelay (Duration.ofSeconds (1))», она больше не работает.

Из-за WireTap я вижу, что сообщения действительно генерируются, но не потребляются потребителями. Мне кажется, что сообщения производятся до того, как потребители присоединяются.

Я довольно новичок в akka-streams, поэтому я могу кое-что здесь упустить.

Как я могу предотвратить это без использования «initialDelay»? Есть ли способ «заблокировать» источник, пока потребители не начнут «вытягивать» сообщения?

Спасибо Hansjörg

...