SpringBoot Kafka Consumer: не удалось запустить внутренний компонент KafkaListenerEndpointRegistry TimeoutException - PullRequest
0 голосов
/ 02 октября 2019

У меня есть приложение весенней загрузки, которое использует kafka, но я не могу его запустить, потому что я только что реализовал потребителя kafka, который прослушивает сервер, который находится в автономном режиме. Когда я запускаю его, я получаю:

org.springframework.context.ApplicationContextException: не удалось запустить bean-компонент 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';вложенное исключение: org.apache.kafka.common.errors.TimeoutException: истекло время ожидания при извлечении метаданных темы

, так как kafka не работает.

Как настроить приложение весенней загрузки вкак он запускается, даже если сервер kafka не работает?

ниже моей конфигурации Kafka Consumer:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${app.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${HOSTNAME:NO_HOSTNAME}")
    private String groupId;

    @Value(value = "${spring.profiles.active}")
    private String activeSpringProfile;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,String.format("RANDOM_GROUP_ID_%s_%s", groupId, RandomUtils.nextInt()));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Спасибо!

1 Ответ

1 голос
/ 02 октября 2019

В последних версиях свойство контейнера missingTopicsFatal равно true, что и вызывает эту проблему. Вы можете отключить его ...

@Component
class ContainerFactoryConfigurer {

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
        factory.getContainerProperties().setMissingTopicsFatal(false);
    }

}
...