Искривление потоковой структуризации JSON (взрыв + поворот невозможен) - PullRequest
0 голосов
/ 17 июня 2019

У меня есть структурированное потоковое задание, которое берет сообщения json из очереди kafka и сохраняет их в parquet / hdfs. Оттуда сообщения принимаются и анализируются по расписанию и сохраняются в базе данных mysql.

Теперь я хочу реализовать потоковую передачу, где я беру сообщение json от kafka, анализирую его и сохраняю в mysql.

Моя проблема сейчас заключается в том, что структурированная потоковая передача Spark не дает мне возможности использовать pivot.

Мой код сейчас выглядит так:

val df = readFromHdfsParquet
val df_exploded= df.select($"device", $"owner", $"timestamp", explode($"channels").as("exploded")
.withColumn("channelName", $"exploded.channelName")
.withColumn($"value", $"exploded.value")
.drop("exploded")

val df_grouped = df_exploded
.groupBy($"device, $"owner", $"timestamp")
.pivot("channelName")
.agg(first($"value", false)

, что приводит к требуемой структуре вывода, содержащей все доступные каналы.

Мой Json выглядит так:

{
  "device": "TypeA",
  "owner": "me",
  "timestamp": "2019-05-12 17:27:59",
  "channels": [
    {
      "channelName": "temperature",
      "state": 0,
      "value": "27"
    },
    {
      "channelName": "humidity",
      "state": 0,
      "value": "10"
    }
  ]
}

Длина массива каналов не установлена ​​и может изменяться от устройства к устройству.

То, что я хочу, это информационный фрейм со следующей структурой и сохранить его в mysql.

|device|owner|timestamp          |temperature|humidity|
|TypeA |me   |2019-05-12 17:27:59|27         |10      |

Как нечто подобное возможно при структурированной потоковой передаче? Также было бы достаточно получить несколько каналов вместо всех, явно выбрав их. (например, температура только без влажности)

...