Я пытаюсь прочитать сообщение от Кафки.Сообщение в следующем формате (пример):
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
Также обратите внимание, что в теме есть сообщения из разных таблиц, а не только из одной таблицы.
Что яПопытка добиться - это прочитать вышеупомянутое сообщение из темы Kafka с помощью структурированной потоковой передачи Spark и создать информационный кадр с именами столбцов и значением, которое будет исходить из самого сообщения JSON.
Я не хочу явно определять схему, используя регистрclass или StructType.
Я пробовал это:
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()
val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")
Когда я просматриваю Y (который является фреймом данных), он появляется как 1 столбец со значением в полезной нагрузке как JSON в этом столбце.
Как получить отдельный столбец в кадре данных?Я не достигаю этого.
(Снова повторяю, я не могу использовать универсальный класс наблюдений или StructType для части схемы, поскольку сообщения, поступающие через сообщение Kafka, поступают из разных таблиц, поэтому я хочу, чтобы более динамическая схема создавалась из самого JSON вбег.)