Как структурированный поток динамически анализирует данные json Кафки - PullRequest
1 голос
/ 15 октября 2019

Я пытаюсь читать данные из Кафки, используя структурированную потоковую передачу. Данные, полученные от Кафки, представлены в формате json. Мой код выглядит следующим образом: в коде я использую функцию from_json для преобразования json в фрейм данных для дальнейшей обработки.

val **schema**: StructType = new StructType()
    .add("time", LongType)
    .add(id", LongType)
    .add("properties",new StructType()
      .add("$app_version", StringType)
      .
      .
    )
val df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","...")
    .option("subscribe","...")
    .load()
    .selectExpr("CAST(value AS STRING) as value")
    .select(from_json(col("value"), **schema**))

Моя проблема в том, что если поле увеличивается, я не могу остановить программу spark, чтобы вручную добавить эти поля, то как я могу динамически анализировать эти поля, я попытался schema_of_json () , для вывода типа поля может потребоваться только первая строка, и он не подходит для многоуровневых вложенных структур данных json.

1 Ответ

0 голосов
/ 15 октября 2019

Моя проблема в том, что если поле увеличивается, я не могу остановить программу spark, чтобы вручную добавить эти поля, то как я могу динамически анализировать эти поля

Это невозможно в Spark Структурированный Поток (или даже Spark SQL) из коробки. Однако есть несколько решений.

Изменение схемы в коде и возобновление потокового запроса

Вам просто нужно остановить потоковый запрос, изменить код в соответствии с текущей схемой и возобновить его. Это возможно в Spark Structured Streaming с источниками данных, которые поддерживают восстановление с контрольной точки. Источник данных Kafka его поддерживает.

Пользовательская функция (UDF)

Вы можете написать пользовательскую функцию (UDF), которая будет выполнять этот динамический анализ JSON для вас. Это также один из самых простых вариантов.

Новый источник данных (MicroBatchReader)

Другой вариант - создать расширение для встроенного источника данных Kafka, которое будет выполнять динамический анализ JSON (аналогичноДесериализаторы кафки). Это требует немного больше развития, но, безусловно, выполнимо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...