Как сказал Лиор Чага в своем комментарии, вы вручную назначаете разделы тем вашему потребителю.Это не рекомендуемый способ сделать это.Кроме того, кажется, что все ваши потребители используют один и тот же точный идентификатор группы.В этой конфигурации с двумя потребляющими потоками, если хотя бы один из потребителей получил конкретное сообщение, none других потоков не получит это.Если вы хотите, чтобы все потребительские потоки каждый получали свой «набор» сообщений, не прерывая друг друга, то вам нужно дать им разные group.id
s.
Чтобы подписаться на тему, чтобы онаобработайте для вас автобалансировку, а затем потребляйте, вы должны сделать что-то вроде этого (взято из javadoc KafkaConsumer, связанного ниже):
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Официальные javadoc Kafka имеют гораздо более подробные объяснения: https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html