как просматривать заголовки кафки - PullRequest
2 голосов
/ 15 марта 2019

Мы отправляем сообщение с заголовками в Kafka, используя org.apache.kafka.clients.producer.ProducerRecord

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    this(topic, partition, (Long)null, key, value, headers);
}

Как на самом деле я вижу эти заголовки, используя команду.kafka-console-consumer.sh показывает только полезную нагрузку и никаких заголовков.

Ответы [ 2 ]

7 голосов
/ 15 марта 2019

Вы можете использовать отличный инструмент kafkacat .

Пример команды:

kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

Пример вывода:

Key (-1 bytes):
  Value (13 bytes): {foo:"bar 5"}
  Timestamp: 1548350164096
  Partition: 0
  Offset: 34
  Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
 due to serialization error:

Опция заголовка kafkacat доступна только в последних сборках kafkacat; вы можете собрать из основной ветки самостоятельно, если ваша текущая версия не включает его.

2 голосов
/ 15 марта 2019

С kafka-console-consumer.sh script:

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

src: https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh

В kafka.tools.ConsoleConsumer заголовок передается в средство форматирования, но ни один из существующих средств форматирования не используетit:

formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
                                     msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
                                     output)

src: https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala

В нижней части вышеуказанной ссылки вы можете увидеть существующие средства форматирования.

Если вы хотите напечатать заголовки, вам нужнореализовать свой собственный kafka.common.MessageFormatter и, в частности, его метод записи:

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

, а затем запустить потребителя консоли с --formatter, предоставляя свой собственный форматер (он также должен присутствовать в пути к классам).

Другой, более простой и быстрый способ - реализовать собственную мини-программу с использованием KafkaConsumer и проверять заголовки при отладке.

...