Я пытаюсь реализовать потребителя Kafka на Java, чтобы получать и обрабатывать записи из тем Kafka.Прежде всего, я хотел бы начать с печати содержимого потока на стандартном выходе.
Моя проблема в том, что содержимое потока всегда пусто, даже если длина данных никогда не равна нулю.
Это мой код:
public class PrintData {
private static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList("print_label"));
return consumer;
}
static void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100; int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.println(record.toString());
System.out.println("Record value: " + record.value());
System.out.println("Value length: " + record.value().length());
System.out.println("");
});
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
public static void main(String... args) throws Exception {
runConsumer();
}
}
И это мой вывод при вводе данных в Kafka:
ConsumerRecord(topic = print_label, partition = 0, leaderEpoch = 0, offset = 41, CreateTime = 1551367936305, serialized key size = -1, serialized value size = 144, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value =
Record value:
Value length: 141
Что я делаю не так?