Весенняя загрузка Kafka не работает - потребитель не получает сообщения - PullRequest
1 голос
/ 01 апреля 2019

Я пытаюсь запустить простое приложение Spring Boot Kafka, но не могу заставить его работать. Я следовал различным учебникам, сейчас я реализую этот , но когда я запускаю приложение, вот что происходит:

enter image description here

Я могу написать в консоли, но потребитель не получает никаких сообщений.
Это мой класс SpringApplication:

@SpringBootApplication(scanBasePackages = "com.springmiddleware")
@ComponentScan("com.springmiddleware")
@EnableAutoConfiguration
@EntityScan("com.springmiddleware")
public class SpringMiddlewareApplication implements CommandLineRunner{



    public static void main(String[] args) throws Exception {

        SpringApplication.run(SpringMiddlewareApplication.class, args);

    }

    @Autowired
    private Producer sender;

    @Override 
    public void run (String... strings) {
        sender.send("Hello world");
    }

}

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:8080

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

Класс Consumer, класс Producer и класс их конфигураций совпадают с написанными в руководстве.
В моем файле server.properties у меня есть:

zookeeper.connect=localhost:8080

и в zookeeper.properties:

clientPort=8080

Тот же порт, указанный в application.yml. Перед запуском приложения я запускаю

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

и

.\bin\windows\kafka-server-start.bat config\server.properties

UPDATE

Это ReceiverConfig класс:

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Это класс SenderConfig:

    @Configuration
public class SenderConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

И это метод listen, который находится в классе Consumer

@KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        System.out.println("Received " + message);
    }

Класс производителя:

@Service
public class Producer {

     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;

     @Value("${app.topic.foo}")
        private String topic;

     public void send(String message){
            kafkaTemplate.send(topic, message);
        }
}

ОБНОВЛЕНИЕ 2

[2019-04-01 17: 23: 52,492] ИНФОРМАЦИЯ Установленный сеанс 0x100435950880000 с согласованным временем ожидания 6000 для клиента / 0: 0: 0: 0: 0: 0: 0: 1: 60079 (org.apache.zookeeper. server.ZooKeeperServer) [2019-04-01 17: 23: 52,539] ИНФОРМАЦИЯ Получено исключение KeeperEx уровня пользователя при обработке sessionid: 0x100435950880000 тип: create cxid: 0x1 zxid: 0xef txntype: -1 reqpath: n / a Путь ошибки: / consumer Ошибка: KeeperErrorCode = NodeExists для / consumer (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,555] ИНФОРМАЦИЯ Получено исключение уровня пользователя KeeperException при обработке sessionid: 0x100435950880000 тип: create cxid: 0x2 zxid: 0xf0 txntype: -1 reqpath: n / a Путь ошибки: / brokers / ids Ошибка: KeeperErrorCode = NodeExists для / brokers / ids (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,555] ИНФОРМАЦИЯ Получено исключение уровня пользователя KeeperException при обработке sessionid: 0x100435950880000 тип: create cxid: 0x3 zxid: 0xf1 txntype: -1 reqpath: n / a Путь ошибки: / brokers / themes Ошибка: KeeperErrorCode = NodeExists для / brokers / themes (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,555] ИНФОРМАЦИЯ Получено KeeperException уровня пользователя при обработке sessionid: 0x100435950880000 тип: create cxid: 0x4 zxid: 0xf2 txntype: -1 reqpath: n / a Путь ошибки: / config / changes Error: KeeperErrorCode = NodeExists для / config / changes (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,570] ИНФОРМАЦИЯ Получено исключение уровня пользователя KeeperException при обработке sessionid: 0x100435950880000 тип: create cxid: 0x5 zxid: 0xf3 txntype: -1 reqpath: n / a Путь ошибки: / admin / delete_topics Ошибка: KeeperErrorCode = NodeExists для / admin / delete_topics (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,570] ИНФОРМАЦИЯ Получено KeeperException уровня пользователя при обработке sessionid: 0x100435950880000 тип: create cxid: 0x6 zxid: 0xf4 txntype: -1 reqpath: n / a Путь ошибки: / brokers / seqid Ошибка: KeeperErrorCode = NodeExists для / brokers / seqid (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,586] ИНФОРМАЦИЯ Получено исключение KeeperEx уровня пользователя при обработке sessionid: 0x100435950880000 тип: create cxid: 0x7 zxid: 0xf5 txntype: -1 reqpath: n / a Путь ошибки: / isr_change_notification Error: KeeperErrorCode = NodeExists для / isr_change_notification (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,586] ИНФОРМАЦИЯ При обработке sessionid: 0x100435950880000 тип: создайте cxid: 0x8 zxid: 0xf6 txntype: -1 reqpath: n / a Ошибка: путь: / latest_producer_id_block Ошибка: / KeeperErrorCode = получена информация об уровне пользователя KeeperException на уровне пользователя. NodeExists для / latest_producer_id_block (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,586] ИНФОРМАЦИЯ Получена исключительная ситуация KeeperExser уровня пользователя при обработке sessionid: 0x100435950880000 тип: create cxid: 0x9 zxid: 0xf7 txntype: -1 reqpath: n / a Путь ошибки: / log_dir_event_notification Error: KeeperErrorCode = NodeExists для / log_dir_event_notification (org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17: 23: 52,602] ИНФОРМАЦИЯ Получено KeeperException уровня пользователя при обработке sessionid: 0x100435950880000 тип: create cxid: 0xa zxid: 0xf8 txntype: -1 reqpath: n / a Путь ошибки: / config / themes Ошибка:KeeperErrorCode = NodeExists для / config / themes (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,602] INFO Получено исключение уровня пользователя KeeperException при обработке sessionid: 0x100435950880000 тип: create cxid: 0xb zxid:0xf9 txntype: -1 reqpath: n / a Путь ошибки: / config / clients Ошибка: KeeperErrorCode = NodeExists для / config / clients (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,617]ИНФОРМАЦИЯ Получил KeeperException уровня пользователя при обработке sessionid: 0x100435950880000 тип: create cxid: 0xc zxid: 0xfa txntype: -1 reqpath: n / a Путь ошибки: / config / users Ошибка: KeeperErrorCode = NodeExists для / config / users (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17: 23: 52,617] ИНФОРМАЦИЯ Получено исключение уровня пользователя KeeperException при обработке sessionid: 0x100435950880000 тип: create cxid: 0xdzxid: 0xfb txntype: -1 reqpath: n / a Путь ошибки: / config / brokers Ошибка: KeeperErrorCode = NodeExists для / config / brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:53,564] ИНФОРМАЦИЯ Получил пользовательское KeeperException на уровне обработки при обработке sessionid: 0x100435950880000 тип: multi cxid: 0x3a zxid: 0xff txntype: -1 reqpath: н / д прерывание оставшихся мультиопераций.Путь ошибки: / admin / предпочитаемый_replica_election Ошибка: KeeperErrorCode = NoNode для / admin / предпочитаемый_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)

1 Ответ

3 голосов
/ 01 апреля 2019

В вашем application.yml вы указали порт zookeeper вместо порта брокера kafka

spring:
  kafka:
    bootstrap-servers: localhost:8080

Ввыше вы должны определить порт брокера kafka , то есть значение port= файла server.properties .

Запускается загрузочное приложение Springпо умолчанию для порта 8080 , поэтому, пожалуйста, не используйте его для порта Zookeeper, если только вы не изменили порт по умолчанию для загрузочного приложения Spring.

Итак, на сервере .properties , имеют port=9092 и zookeeper.connect=localhost:2181, а в application.yml имеют следующие значения:

spring:
  kafka:
    bootstrap-servers: localhost:9092

Затем в zookeeper.properties, есть clientPort=2181.Затем перезапустите Zookeeper, сервер Kafka и загрузочное приложение Spring в том же порядке.

Обновление:

В более новых версиях Kafka вместо * 1040 используется listeners=PLAINTEXT://localhost:9092* в файле server.properties.Так что попробуйте заменить это.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...