Проблемы с печатью содержимого потоков кафки - PullRequest
0 голосов
/ 28 февраля 2019

Я пытаюсь реализовать потребителя Kafka на Java, чтобы получать и обрабатывать записи из тем Kafka.Прежде всего, я хотел бы начать с печати содержимого потока на стандартном выходе.

Моя проблема в том, что содержимое потока всегда пусто, даже если длина данных никогда не равна нулю.

Это мой код:

public class PrintData {

      private static Consumer<Long, String> createConsumer() {
          final Properties props = new Properties();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


          // Create the consumer using props.
          final Consumer<Long, String> consumer = new KafkaConsumer<>(props);

          // Subscribe to the topic.
          consumer.subscribe(Collections.singletonList("print_label"));
          return consumer;
      }

    static void runConsumer() throws InterruptedException {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.out.println(record.toString());              
                System.out.println("Record value: " + record.value());
                System.out.println("Value length: " + record.value().length());
                System.out.println("");

            });

            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }


    public static void main(String... args) throws Exception {
        runConsumer();
    }
}

И это мой вывод при вводе данных в Kafka:

ConsumerRecord(topic = print_label, partition = 0, leaderEpoch = 0, offset = 41, CreateTime = 1551367936305, serialized key size = -1, serialized value size = 144, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 
Record value:
Value length: 141

Что я делаю не так?

...