Java-клиент KafkaConsumer не получает записи из темы, но командная строка загружается - PullRequest
0 голосов
/ 11 ноября 2018

У меня есть тема customer-avro в кластере kafka со списком брокеров kafka1.com:9092,kafka2.com:9092,kafka3.com:9092.

Я могу произвести запись в тему из Java и извлечь ее из консоли, используя следующую команду.

    $ sudo bin/kafka-avro-console-consumer     --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092     --topic customer-avro     --property              schema.registry.url=http://schemaregistry1.com:8081     --from-beginning
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    {"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false}

В приведенной выше команде очевидно, что в теме есть запись. Поэтому я написал ниже потребительскую логику для извлечения записей.

    package com.example;

    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.util.Collections;
    import java.util.Properties;

    public class KafkaConsumerV1 {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
            props.setProperty("group.id", "my-avro-consumer");
            props.setProperty("enable.auto.commit", "false");
            props.setProperty("auto.offset.reset", "earliest");
            props.setProperty("key.deserializer", StringDeserializer.class.getName());
            props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
            props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
            props.setProperty("specific.avro.reader", "true");

            try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props)) {
                String topic = "customer-avro";
                consumer.subscribe(Collections.singleton(topic));
                System.out.println("Waiting for data...");
                while (true) {
                    ConsumerRecords<String, Customer> records = consumer.poll(500);
                    for (ConsumerRecord<String, Customer> record : records) {
                        record.key();
                        Customer customer = record.value();
                        System.out.println("customer "+ customer);
                    }
                    consumer.commitSync();

                }
            }
        }
    }

Но потребитель не извлекает записи и всегда отображает «ожидание данных».

В чем ошибка в моем коде Java. Как я могу получить данные, используя потребитель Java.

...