Один источник
Для указанного вами конкретного варианта использования вы можете использовать BroadcastHub
, чтобы "разветвить" каждый элемент данных от kafka
до каждого из Sink
значений, которые вы перечислили:
type Data = ???
val kafkaSource : Source[Data, _] = ???
val runnableGraph: RunnableGraph[Source[Data, NotUsed]] =
kafkaSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
val kafkaHub : Source[Data, NotUsed] = runnableGraph.run()
val s3Sink : Sink[Data, _] = ???
val postgresSink : Sink[Data, _] = ???
kafkaHub.to(s3Sink).run()
kafkaHub.to(postgresSink).run()
Несколько источников
Один важный недостаток вышеупомянутой реализации заключается в том, что «ставка производителя будет автоматически адаптирована для самого медленного потребителя».
Поэтому, если вы сможете сделать несколько подключений к первичному источнику, это, вероятно, будет более производительным за счет максимального параллелизма:
val kafkaSource : () => Source[Data,_] = ???
//stream 1
kafkaSource().to(s3Sink).run()
//stream 2
kafkaSource().to(postgresSink).run()