У меня есть рабочий в весенней загрузке, который слушает kafka topi c с 20 разделами. Я создал следующий прослушиватель:
@KafkaListener(topics = "mytopic")
public void listen(@Payload(required = true) IncomingMessage msg,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
В этом прослушивателе я распечатываю раздел и обнаруживаю, что все мои сообщения приходят с одного номера раздела - 17. Это означает, что сторона-производитель записывает все сообщения в тот же раздел.
Моя фабрика контейнеров ConcurrentKafkaListenerContainerFactory , и поэтому я хочу иметь возможность обрабатывать несколько событий одновременно, это означает, что мне нужны разные разделы для также есть события в них ..
Сторона производителя в Linux машине kafkacat , которая производит события в этой топи c. Кажется, что у kafkacat нет возможности отправлять на разделы в стиле циклического перебора.
Проблема в том, что я должен использовать какой-то инструмент CLI для создания событий, а не службы. Есть ли способ преодолеть эту проблему с помощью инструмента CLI? Я не смог найти ни одного инструмента cli, который бы это делал.
Я думал о том, чтобы сохранить номер раздела в файле, прочитать его, а затем указать номер раздела в команде kafkacat, а затем увеличить ( partition-number)% разделов, но это не потокобезопасно.
Примечание: topi c не имеет ключа, я создаю только сообщения Value и не заботится о ключе.