Как определить AmqpSource для подписки на множественный обмен? - PullRequest
0 голосов
/ 04 мая 2018

Сейчас я подписываюсь на единый обмен, используя

AmqpSource.atMostOnceSource(
    NamedQueueSourceSettings(..))

Я хочу иметь возможность подписаться на несколько обменов. Кто-нибудь может мне помочь с этим?

1 Ответ

0 голосов
/ 04 мая 2018

Если для конкретного источника alpakka нет ничего конкретного, вы можете использовать Merge или MerbeHub.

Если вы знаете все источники заранее, вы можете объединить несколько источников в один, используя Merge

Если вы не знаете всех источников заранее, вы можете использовать MergeHub например,

// A simple consumer that will print to the console for now  
val consumer = Sink.foreach(println)

// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
val runnableGraph: RunnableGraph[Sink[String, NotUsed]] =
  MergeHub.source[String](perProducerBufferSize = 16).to(consumer)

// By running/materializing the consumer we get back a Sink, and hence
// now have access to feed elements into it. This Sink can be materialized
// any number of times, and every element that enters the Sink will
// be consumed by our consumer.
val toConsumer: Sink[String, NotUsed] = runnableGraph.run()

// Feeding two independent sources into the hub.
AmqpSource.atMostOnceSource(
   NamedQueueSourceSettings(..)).runWith(toConsumer)
AmqpSource.atMostOnceSource(
   NamedQueueSourceSettings(..)).runWith(toConsumer)
...