Я пытаюсь использовать информацию из темы __consumer_offsets, так как казалось, что это может быть самый простой способ получить метрики kafka о потребителях, таких как задержка сообщений и т. Д. Идеальный способ - получить к нему доступ из jmx, но сначала я хотел попробовать это, а также сообщения, которые возвращаться в зашифрованном виде или в нечитаемой форме. Попытался также добавить свойство stringDeserializer. У кого-нибудь есть предложения как это исправить? Снова ссылка на то, что это дубликат
дубликат потребителя_смещение
бесполезен, поскольку он не ссылается на мою проблему, которая заключается в чтении сообщения в виде строки в Java. Также обновил код, чтобы попытаться выполнить потребитель Record с помощью потребителя kafka.client.
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for ( KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
while (it.hasNext())
{
try{
String mesg = new String(it.next().message());
System.out.println( mesg);
изменения кода:
try
{
// errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");
Properties consumerProps = new Properties();
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , "test");
consumerProps.put("bootstrap.servers", servers);
consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
//ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
// consumerConfig);
//Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//topicCountMap.put(topic, new Integer(1));
//Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps);
kconsumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = kconsumer.poll(10);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
kconsumer.close();
}
И снимок того, как выглядит сообщение ниже; внизу изображения:
![consumer offset](https://i.stack.imgur.com/yPCkN.png)