org.springframework.context.ApplicationContextException: не удалось запустить bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry - PullRequest
0 голосов
/ 14 декабря 2018

Я разрабатываю Spring Boot + Apache Kafka + Apache Zookeeper пример.Я установил / setup Apache Zookeeper and Apache Kafka на мой локальный компьютер с Windows.Я взял ссылку из ссылки: https://www.tutorialspoint.com/spring_boot/spring_boot_apache_kafka.htm и выполнил код как:

Настройка: https://medium.com/@shaaslam/installing-apache-kafka-on-windows-495f6f2fd3c8

Ошибка:

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at com.tutorialspoint.SpringKafkaTutorialspointApplication.main(SpringKafkaTutorialspointApplication.java:21) [classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

Журналы с сервера Zookeeper:

2018-12-14 22:16:35,352 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60197 (no session established for client)
2018-12-14 22:16:36,260 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60200
2018-12-14 22:16:36,260 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:36,262 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60200 (no session established for client)
2018-12-14 22:16:37,473 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60204
2018-12-14 22:16:37,473 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:37,476 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60204 (no session established for client)
2018-12-14 22:16:38,383 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60207
2018-12-14 22:16:38,384 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:38,388 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60207 (no session established for client)
2018-12-14 22:16:39,494 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60211
2018-12-14 22:16:39,494 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:39,497 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60211 (no session established for client)
2018-12-14 22:16:40,506 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60214
2018-12-14 22:16:40,507 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:40,509 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60214 (no session established for client)
2018-12-14 22:16:41,519 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60217
2018-12-14 22:16:41,519 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:41,525 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60217 (no session established for client)

KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaConsumerConfig.java

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

SpringKafkaTutorialspointApplication.java

@SpringBootApplication
public class SpringKafkaTutorialspointApplication implements CommandLineRunner{

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("tutorialspoint", message);
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaTutorialspointApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        sendMessage("Hi Welcome to Spring For Apache Kafka @@@@@@@@@");
    }

    @KafkaListener(topics = "tutorialspoint", groupId = "group-id")
    public void listen(String message) {
        System.out.println("Received Messasge in group - group-id: " + message);
    }
}

Примечание: Пожалуйста, дайте мне знать, если вам нужны какие-либо другие данные.

Любая быстрая помощь?

Ответы [ 2 ]

0 голосов
/ 08 июля 2019

Возможно, проблема вызвана несоответствием загрузочной версии Spring и версии Kafka.Мне удалось разрешить с помощью следующей матрицы:

Версия загрузки Spring:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

Зависимость maven версии Kafka от версии загрузки Spring

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency> 

Версия Kafka: kafka_2.12-2.2.0
Версия Zookeeper: apache-zookeeper-3.5.5

Подробности файла Application.yml, как показано ниже:

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers:
      - localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers:
      - localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
0 голосов
/ 15 декабря 2018

Проведя много часов, я обнаружил, что должен использовать ниже также и в KafkaConsumerConfig.

С приведенными ниже изменениями код работает нормально.

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...