Я новичок в написании потребителя Kafka, у меня есть сценарий на случай, если у меня два потребителя работают под одним и тем же идентификатором группы, и у меня есть два раздела.
Предположим, что;
Потребитель 1 ===> Связан с ====> Разделом 1
Потребитель 2 ===> Связан с ====> Разделом 2
Если мой потребитель-2 не работает, как я могу убедиться, что мой потребитель-1 перечитал все событие, которое снова пришло в раздел 2, я просто наткнулся на кое-что относительно setConsumerRebalanceListener, поэтому я для этого я установил свойство контейнера и для метода onPartitionsAssigned я задаю consumer.seekToBeginning(consumer.assignment())
Это правильно, означает ли эта строка, что мой потребитель-1 будет читать все события из раздела-2, когда потребитель-2 не работает, а раздел-2 переназначен на потребителя?
Я также буду спрашивать, может ли кто-нибудь поделиться хорошими ссылками, где я могу прочитать основы о ConsumerRebalanceListener
.
public ConcurrentKafkaListenerContainerFactory<String, MultiTenancyOrgDataMessage> kafkaListenerContainerFactory() {
LOG.debug("ConcurrentKafkaListenerContainerFactory executing");
ConcurrentKafkaListenerContainerFactory<String, MultiTenancyOrgDataMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
consumer.seekToBeginning(consumer.assignment()); // read topic from beginning on service restart
}
});