Параллелизм в потоках Акки по сравнению с Аккой - PullRequest
0 голосов
/ 16 ноября 2018

Я пытался узнать больше о потоках akka, но мне не удалось понять, как мы можем добиться подобного параллелизма в способе, которым мы достигаем с помощью Akka. Предположим, Actor A потребляет данные из kafka и записывает их в s3 и другоеАктер B потребляет из kafka и записывает его в postgres, а другой Actor C читает из БД и создает другую тему kafka.Все 3 актера могут находиться в разных системах акторов и не должны зависеть от других.Но как мне добиться того же, используя потоки Akka?Я полагаю, что в потоках Акки есть фазы, где А что-то делает и направляет это к В и так далее, пока мы не достигнем раковины.Я действительно понимаю, что есть mapAsync, который можно использовать для паралеллизации вещей, но я не уверен, как он будет играть в этом контексте, а также с точки зрения упорядочивания гарантий.

1 Ответ

0 голосов
/ 19 ноября 2018

Один источник

Для указанного вами конкретного варианта использования вы можете использовать BroadcastHub, чтобы "разветвить" каждый элемент данных от kafka до каждого из Sink значений, которые вы перечислили:

type Data = ???

val kafkaSource : Source[Data, _] = ???

val runnableGraph: RunnableGraph[Source[Data, NotUsed]] =
  kafkaSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)

val kafkaHub : Source[Data, NotUsed] = runnableGraph.run()

val s3Sink : Sink[Data, _] = ???

val postgresSink : Sink[Data, _] = ???

kafkaHub.to(s3Sink).run()
kafkaHub.to(postgresSink).run()

Несколько источников

Один важный недостаток вышеупомянутой реализации заключается в том, что «ставка производителя будет автоматически адаптирована для самого медленного потребителя».

Поэтому, если вы сможете сделать несколько подключений к первичному источнику, это, вероятно, будет более производительным за счет максимального параллелизма:

val kafkaSource : () => Source[Data,_] = ???

//stream 1
kafkaSource().to(s3Sink).run()

//stream 2
kafkaSource().to(postgresSink).run()
...