У меня есть тема customer-avro в кластере kafka со списком брокеров kafka1.com:9092,kafka2.com:9092,kafka3.com:9092
.
Я могу произвести запись в тему из Java и извлечь ее из консоли, используя следующую команду.
$ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false}
В приведенной выше команде очевидно, что в теме есть запись. Поэтому я написал ниже потребительскую логику для извлечения записей.
package com.example;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerV1 {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");
try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props)) {
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records) {
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);
}
consumer.commitSync();
}
}
}
}
Но потребитель не извлекает записи и всегда отображает «ожидание данных».
В чем ошибка в моем коде Java. Как я могу получить данные, используя потребитель Java.