Чтение сообщений JSONConverter Kafka Connect со схемой с использованием Spark Structured Streaming - PullRequest
0 голосов
/ 04 января 2019

Я пытаюсь прочитать сообщение от Кафки.Сообщение в следующем формате (пример):

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

Также обратите внимание, что в теме есть сообщения из разных таблиц, а не только из одной таблицы.

Что яПопытка добиться - это прочитать вышеупомянутое сообщение из темы Kafka с помощью структурированной потоковой передачи Spark и создать информационный кадр с именами столбцов и значением, которое будет исходить из самого сообщения JSON.

Я не хочу явно определять схему, используя регистрclass или StructType.

Я пробовал это:

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()

val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")

Когда я просматриваю Y (который является фреймом данных), он появляется как 1 столбец со значением в полезной нагрузке как JSON в этом столбце.

Как получить отдельный столбец в кадре данных?Я не достигаю этого.

(Снова повторяю, я не могу использовать универсальный класс наблюдений или StructType для части схемы, поскольку сообщения, поступающие через сообщение Kafka, поступают из разных таблиц, поэтому я хочу, чтобы более динамическая схема создавалась из самого JSON вбег.)

1 Ответ

0 голосов
/ 05 июня 2019

Вариант 1: измените источник Kafka Connect на value.converter.schemas.enable=false.Это даст вам только (развернутую полезную нагрузку для начала), затем вы можете перейти к следующему сообщению.

В противном случае после удаления схемы Connect с использованием get_json_object(($"value"), "$.payload").alias("payload") необходимо создать объект схемы, а затем использовать y.get_json($"payload", schema)

Как читать записив формате JSON от Kafka с использованием структурированной потоковой передачи?

Все ваши поля являются строками, поэтому будут выглядеть как

val schema: StructType = StructType(Seq(
  StructField("emp_id", StringType()),
  StructField("emp_name", StringType()),
  StructField("city", StringType()),
  StructField("emp_sal", StringType()),
  StructField("manager_name", StringType())
))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...