setConsumerRebalanceListener как получить потребителя - PullRequest
0 голосов
/ 01 сентября 2018

Я использую spring-kafka 1.1.3.RELEASE и kafka-clients 0.10.0.0, и я хочу установить setConsumerRebalanceListener на заводе, как это, но я не знаю, как заставить потребителя сохранить потребительский раздел. Спасибо за любые предложения!

@Bean   

KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerBatchContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            factory.getContainerProperties().setConsumerTaskExecutor(execD());
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

            factory.getContainerProperties().setSyncCommits(true);

            factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {

                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    for (TopicPartition partition:collection){
                    //TODO how to get consumer?    saveOffsetInExternalStore(consumer,partition.partition());
                    }
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                     for (TopicPartition partition:collection){
                    //TODO how to get consumer?      
                       consumer.seek();
                    }
                }
            });
            factory.setBatchListener(true);

            return factory;
        }

Я использую фабрику вот так:

 @KafkaListener(group = "CID_alikafka_B024",topicPattern = "data_.*",containerFactory = "kafkaListenerBatchContainerFactory")
    public void receive2(List<String> data,Acknowledgment acknowledgment,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                         @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                         @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topicName,
                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> messageKeys) {
        logger.info("start of batch receive");

    }

Я знаю, что у spring kafka 2.1.9 есть ConsumerAwareRebalanceListener, как это, но я хочу использовать spring kafka 1.1.3.RELEASE для совместимого kafka 0.10.0.0, у нас kafka - версия 0.10.0.0

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
            store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
            consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});

Мой пом:

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.3.RELEASE</version>
            <exclusions>
                <!-- exclude kafka version problem-->
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>

1 Ответ

0 голосов
/ 01 сентября 2018

1.1.3 чрезвычайно устарел в быстро меняющемся мире Apache Kafka. Вы не можете получить доступ к потребителю там.

Spring для Apache Kafka добавил ConsumerAwareRebalanceListener в 2.0. Текущая версия 2.1.8.

Более новые версии клиентов Kafka могут общаться с более старыми брокерами, если вы не можете обновить брокера (но вам следует, 0.10.0.0 также очень старый).

См. документацию .

Также см. Страницу проекта , которая объясняет совместимость. Со страницы слияния:

Брокер 0.10.0 Базовая совместимость клиента: Java: клиенты <= 0.10.0 или> = 0.10.2

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...