Получил пустую тему разделов при использовании весенней кафки ConsumerAwareRebalanceListener - PullRequest
0 голосов
/ 03 июля 2019

проблема

использование Spring Kafka версия: org.springframework.kafka: spring-kafka: 2.0.0.RELEASE.

поиск смещения с помощью ConsumerAwareRebalanceListener.onPartitionsAssigned, но получил пустую TopicPartitions.

пробовал ConsumerSeekAware, но также получил пустое назначение.

код

Map<String, Object> configs = new HashMap<String, Object>(); 
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getConsumerBootstrapServers());
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, config.getConsumerGroupId());
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getConsumerMaxPollRecords()); 
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<Object, Object>(configs);

        ContainerProperties containerProperties = new ContainerProperties(config.getTopic());
        containerProperties.setAckMode(AckMode.RECORD);
        containerProperties.setMessageListener(messageListener);
        containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
                    Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                log.info("onPartitionsAssigned_start,topic:{},partitionOffset:{},"
                        + "consumer:{},topicPartitions:{},assignment:{}",
                        config.getTopic(),JSON.toJSONString(partitionOffset),
                        JSON.toJSONString(consumer),JSON.toJSONString(partitions),JSON.toJSONString(consumer.assignment()));
                for(TopicPartition tp:partitions) {
                    if(!tp.topic().equals(config.getTopic())) {
                        continue;
                    }
                    Long offsetStart = partitionOffset.get(tp.partition());
                    if(offsetStart==null) {
                        continue;
                    }
                    consumer.seek(tp, offsetStart);
                    log.info("consumer_seek,TopicPartition:{},offsetStart:{}",
                            JSON.toJSONString(tp),offsetStart);
                }

            }


        });


        container = new ConcurrentMessageListenerContainer<>(factory,containerProperties);
        container.setConcurrency(config.getConcurrency());

        container.start();

я использую API неправильно?Любые предложения будут с благодарностью!

...