Показывает недопустимые символы при использовании при использовании консоли kafka - PullRequest
0 голосов
/ 04 февраля 2019

При использовании из темы Kafka с использованием консоли Kafka Consumer или kt (инструмент GoLang CLI для Kafka) я получаю недопустимые символы.

...
\u0000\ufffd?\u0006app\u0000\u0000\u0000\u0000\u0000\u0000\u003e@\u0001
\u0000\u000cSec-39\u001aSome Actual Value Text\ufffd\ufffd\ufffd\ufffd\ufffd
\ufffd\u0015@\ufffd\ufffd\ufffd\ufffd\ufffd\ufff
...

Даже при том, что подключение Kafka может фактически поглотить нужноеданные в базу данных SQL .

Ответы [ 2 ]

0 голосов
/ 04 февраля 2019

Учитывая, что вы говорите,

Kafka connect может фактически передавать нужные данные в базу данных SQL.

Я предполагаю, что вы используете сериализацию Avro дляданные по теме.Правильно настроенный Kafka Connect будет принимать данные Avro и десериализовывать их.

Однако , такие консольные инструменты, как kafka-console-consumer, kt, kafkacat и др., Не поддерживают Avro, и поэтому вы получите кучу странных символов, если будете использовать их длячитать данные из темы, которая закодирована в Avro.

Для чтения данных Avro в командную строку вы можете использовать kafka-avro-console-consumer:

kafka-avro-console-consumer
         --bootstrap-server kafka:29092\
         --topic test_topic_avro \
         --property schema.registry.url=http://schema-registry:8081

Редактировать: Добавление предложения из @CodeGeas тоже :

В качестве альтернативы, чтение данных с использованием REST Proxy может быть выполнено с помощью следующего:

# Create a consumer for JSON data
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         -H "Accept: application/vnd.kafka.v2+json" \
         --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \

# Subscribe the consumer to a topic
         http://kafka-rest-instance:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"topics":["YOUR-TOPIC-NAME"]}' \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

# Then consume some data from a topic using the base URL in the first response.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Позже, чтобы впоследствии удалить потребителя:

curl -X DELETE -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance
0 голосов
/ 04 февраля 2019

По умолчанию консольные потребительские инструменты десериализуют и ключ сообщения, и значение, используя ByteArrayDeserializer, но затем, очевидно, пытаются печатать данные в командной строке с использованием средства форматирования по умолчанию.

Этот инструмент, однако, позволяет настроитьИспользуются десериализаторы и форматеры.См. Следующий фрагмент справочного вывода:

--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--property <String: prop>                The properties to initialize the
                                           message formatter. Default
                                           properties include:
                                            print.timestamp=true|false
                                            print.key=true|false
                                            print.value=true|false
                                            key.separator=<key.separator>
                                            line.separator=<line.separator>
                                            key.deserializer=<key.deserializer>
                                            value.deserializer=<value.
                                           deserializer>
                                         Users can also pass in customized
                                           properties for their formatter; more
                                           specifically, users can pass in
                                           properties keyed with 'key.
                                           deserializer.' and 'value.
                                           deserializer.' prefixes to configure
                                           their deserializers.
--key-deserializer <String:
  deserializer for key>
--value-deserializer <String:
  deserializer for values>

Используя эти настройки, вы сможете изменить вывод на то, что вам нужно.

...