Kafka Streams: как получить размер пакета для опроса внутри кода Streamer - потребитель высокого уровня - PullRequest
0 голосов
/ 13 мая 2019

У меня есть требование сделать размер пакета опроса равным 500 и выполнить пакетную фиксацию после обработки 500 сообщений.Таким образом, если последний набор содержит менее 500 сообщений, мне нужно зафиксировать его после обработки последнего сообщения в пакете.Есть ли способ узнать, сколько сообщений было выбрано в опросе, если количество сообщений, оставшихся для обработки в теме, оказалось меньше размера опроса.

1 Ответ

1 голос
/ 13 мая 2019

Потоки на самом деле не настроены для поддержки подобного варианта использования, хотя это часто обсуждается под заголовком «асинхронная обработка», и мы хотели бы разработать его в будущем.

ПравильноТеперь, если вы действительно хотите использовать Streams, лучше всего было бы обернуть логику сохранения БД внутри собственного процессора или преобразователя, который также буферизует записи и отправляет пакеты, когда этого достаточно.

ОднакоЕсли вам действительно нужно просто «скопировать» данные из темы в БД, вы можете получить больше пользы от использования Connector или даже непосредственно от Kafka Consumer.

Надеюсь, это поможет!

...