Я использую Kafka в системе чтения-процесса-записи и хочу использовать только один раз обработку.
Для каждого сообщения, которое читается получателем poll
Я получаю экземпляр кэшированного производителя на основе темы и раздела сообщения.Для генерации идентификатора транзакции для производителей я использую {consumer group}-{topic}-{partition}
.Это должно гарантировать, что при возникновении события перебалансировки группы потребителей во время обработки старый экземпляр будет отгорожен, таким образом гарантируя, что его вывод игнорируется.В настоящее время это делается для каждого сообщения, что является неоптимальным.
Очевидное быстрое ускорение состоит в том, чтобы собрать все выходные данные из сообщений из одного и того же тематического раздела и отправить их в виде одной транзакции.Во многих случаях мы читаем из нескольких тем, поэтому мой вопрос таков: есть ли лучший способ обеспечить один раз в этом сценарии, кроме:
- Группировать сообщения отметод потребителя
poll
в их теме / разделе - Доступ к кэшированному производителю с помощью приведенной выше схемы идентификатора транзакции
- Итерация по группам и обработка всех сообщений, отправка всех записей и смещение обновлений всингл
commitTransaction
через продюсера группы 'section'