Я пытаюсь читать данные из Кафки, используя структурированную потоковую передачу. Данные, полученные от Кафки, представлены в формате json. Мой код выглядит следующим образом: в коде я использую функцию from_json для преобразования json в фрейм данных для дальнейшей обработки.
val **schema**: StructType = new StructType()
.add("time", LongType)
.add(id", LongType)
.add("properties",new StructType()
.add("$app_version", StringType)
.
.
)
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","...")
.option("subscribe","...")
.load()
.selectExpr("CAST(value AS STRING) as value")
.select(from_json(col("value"), **schema**))
Моя проблема в том, что если поле увеличивается, я не могу остановить программу spark, чтобы вручную добавить эти поля, то как я могу динамически анализировать эти поля, я попытался schema_of_json () , для вывода типа поля может потребоваться только первая строка, и он не подходит для многоуровневых вложенных структур данных json.