читатель __consumer_offsets читает нечитаемое сообщение - PullRequest
0 голосов
/ 11 сентября 2018

Я пытаюсь использовать информацию из темы __consumer_offsets, так как казалось, что это может быть самый простой способ получить метрики kafka о потребителях, таких как задержка сообщений и т. Д. Идеальный способ - получить к нему доступ из jmx, но сначала я хотел попробовать это, а также сообщения, которые возвращаться в зашифрованном виде или в нечитаемой форме. Попытался также добавить свойство stringDeserializer. У кого-нибудь есть предложения как это исправить? Снова ссылка на то, что это дубликат

дубликат потребителя_смещение

бесполезен, поскольку он не ссылается на мою проблему, которая заключается в чтении сообщения в виде строки в Java. Также обновил код, чтобы попытаться выполнить потребитель Record с помощью потребителя kafka.client.

consumerProps.put("exclude.internal.topics",  false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);


consumerProps.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");  
consumerProps.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(
       consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
   consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for ( KafkaStream stream : streams) {

     ConsumerIterator<byte[], byte[]> it = stream.iterator();

     //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");


   while (it.hasNext())
     {

         try{

                 String mesg = new String(it.next().message());
                 System.out.println( mesg);

изменения кода:

 try
    {


   // errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");    
    Properties consumerProps = new Properties();
    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , "test");
    consumerProps.put("bootstrap.servers", servers);
    consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    //ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    //ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    //       consumerConfig);

    //Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    //topicCountMap.put(topic, new Integer(1));
    //Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);



    KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps); 
    kconsumer.subscribe(Arrays.asList(topic)); 


    try {
          while (true) {
            ConsumerRecords<String, String> records = kconsumer.poll(10);

            for (ConsumerRecord<String, String> record : records)

              System.out.println(record.offset() + ": " + record.value());
          }
        } finally {
          kconsumer.close();
    }    

И снимок того, как выглядит сообщение ниже; внизу изображения:

consumer offset

1 Ответ

0 голосов
/ 11 сентября 2018

Хотя можно напрямую читать из темы __consumer_offsets, это не рекомендуемый или самый простой способ.

Если вы можете использовать Kafka 2.0, лучше всего использовать API AdminClient для описания групп:


В случае, если вам абсолютно необходимо прочитать непосредственно форму __consumer_offset, вам нужно расшифровать записи, чтобы сделать их читаемыми человеком.Это можно сделать с помощью класса GroupMetadataManager:

  • GroupMetadataManager.readMessageKey () можно использовать для декодирования ключа сообщения и получения раздела темы этой записиотносится к.Это может вернуть 2 типа объектов, для потребительских позиций вас интересуют только OffsetKey объекты.

  • GroupMetadataManager.readOffsetMessageValue () может использоваться для декодированиязначения сообщений (для ключей, которые были OffsetKey) и найти информацию о смещениях.

Этот ответ на вопрос, который вы связали, содержит код скелета для выполнения всего этого.

Также обратите внимание, что не следует десериализовать записи как строки, а вместо этого хранить их как необработанные байты, чтобы эти методы могли правильно их декодировать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...