Для многих ситуаций в больших данных предпочтительно работать с небольшим буфером записей на ходу, а не с одной записью за раз.
Естественным примером является вызов некоторого внешнего API, который поддерживает пакетную обработку дляэффективность.
Как мы можем сделать это в Кафка-Стримс?В API я не могу найти ничего похожего на то, что я хочу.
Пока у меня есть:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
Что я хочу:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
В Scalaи Akka Streams функция называется grouped
или batch
.В Spark Structured Streaming мы можем сделать mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
.