JSON файл данных в тему кафки - PullRequest
0 голосов
/ 13 февраля 2019

Как вставить данные файла json в тему kafka, используя kafka-console -roduction?Может ли каждый набор данных json быть сохранен как сообщение?

example-

{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}

Если вы используете эту команду -

cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic  stream-test-topic

Каждая строка рассматривается как отдельнаясообщение.

Как правильно это сделать?

Спасибо!

ps-

Тема читается с помощью Elastic search.Пример файла сообщений JSON -

[{
"id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}, {
  "id": 2,
  "first_name": "Peter",
  "last_name": "Friz",
  "email": "Friz3@gmail.com",
  "gender": "Male",
  "ip_address": "4.5.6.7"
}, {
  "id": 3,
  "first_name": "Dell",
  "last_name": "Chang",
  "email": "Dellc@gmail.com",
  "gender": "Female",
  "ip_address": "8.9.10.11"
}, {
"id": 4,
  "first_name": "Lolita",
  "last_name": "John",
  "email": "LolitaJ@gmail.com",
  "gender": "Female",
  "ip_address": "12.13.14.15"
}, {
"id": 5,
  "first_name": "Pele",
  "last_name": "Wang",
  "email": "Pele@gmail.com",
  "gender": "Male",
  "ip_address": "16.17.18.19"
}, {
  "id": 6,
  "first_name": "Rene",
  "last_name": "Charm",
  "email": "Rene3@gmail.com",
  "gender": "Male",
  "ip_address": "20.21.22.23"

Ответы [ 2 ]

0 голосов
/ 13 февраля 2019

Если в файле есть сообщения JSON, вы можете использовать следующий способ записи в теме kafka:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json

Производители Kafka читают сообщения построчно, используя значение по умолчанию LineMessageReader.Ключи по умолчанию и сериализаторы значений StringSerializer.Он не будет проверять, есть ли правильный json или нет, вместо этого он будет рассматриваться как необработанный строковый объект как публикация в теме kafka.Но если вы хотите проверить, вы можете определить нижеприведенную конфигурацию в команде console-provider.

key.serializer
value.serializer

Пример:

kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer 

На стороне потребителя вы можете сделать аналогичный подход.Используйте JsonDeserializer для чтения данных.

0 голосов
/ 13 февраля 2019

С точки зрения Кафки каждое сообщение является массивом байтов.Это зависит от приложения клиента (производителя, потребителя и т. Д.), От того, как оно к нему относится.Kafka Producer, Consumer использует Deserializer, Serializer для преобразования из / в массив байтов в / из бизнес-объекта (String, POJO)

Проблема, с которой вы сталкиваетесь, заключается в том, что производитель Kafka Console читает сообщения из стандартного ввода.По умолчанию используется LineMessageReader, который обрабатывает каждую строку как новое сообщение.Вы можете реализовать свой собственный или перед отправкой перевести каждый символ новой строки в json в какой-либо другой пробел.

Например, вы можете использовать следующую команду:

jq -rc . sampledata.json | kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...