В локальном коде работает нормально. Я могу использовать сообщение kafka, но в dev, получаю сообщение об ошибке:
2020-01-21T10:49:48.373749251Z [21-01-2020 10:49:48.373] WARN [] org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext: Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2020-01-21T10:49:48.433030131Z [21-01-2020 10:49:48.428] ERROR [] org.springframework.boot.SpringApplication: Application run failed
2020-01-21T10:49:48.433064831Z org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2020-01-21T10:49:48.433071331Z Wrapped by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2020-01-21T10:49:48.433076931Z at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
2020-01-21T10:49:48.433082131Z at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
Azure devops, используя в качестве хоста dev
Код:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> kafkaConfig = new HashMap<>();
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,config.getEwok().getBootstrapServers());
kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "work");
kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(kafkaConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = "attribute")
public void consume(String message) {
//Logic
}
Не могли бы вы предложить. Спасибо