Итак, я новичок в kafka, и я следовал некоторым учебникам и искал по inte rnet, и мне удалось установить скрипт производителя и потребителя в Java. Но вместо того, чтобы просто напечатать сообщение (значение), я получаю следующий результат в консоли eclipse:
ConsumerRecord(topic = MyTopic, partition = 0, leaderEpoch = 0, offset = 131, CreateTime = 1579178094212, serialized key size = 10, serialized value size = 31, headers = RecordHeaders(headers = [], isReadOnly = false), key = Simple Key, value = Custom test message)
Вместо этого, если я запускаю потребителя в консоли windows, я получаю только значение, то есть я получаю «Пользовательское тестовое сообщение».
Как выбрать и распечатать только свойство «value» из записи потребителя kafka на консоли eclipse? Или любое другое свойство по этому вопросу.
РЕДАКТИРОВАТЬ :
Для человека, который спросил, как я распечатал все эти данные, это код в моем потребительском скрипте:
package com.okta.javakafka.kafkajava;
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumer {
public static void main(String[] args) throws ClassNotFoundException{
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
root.setLevel(ch.qos.logback.classic.Level.INFO);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("key.deserializer",Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
props.put("value.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
while(true) {
ConsumerRecords<String,String> records = consumer.poll(1000);
for(ConsumerRecord<String,String> record : records)
System.out.println(record.toString());
}
}finally {
consumer.close();
}
}
}