У меня есть структурированное потоковое задание, которое берет сообщения json из очереди kafka и сохраняет их в parquet / hdfs. Оттуда сообщения принимаются и анализируются по расписанию и сохраняются в базе данных mysql.
Теперь я хочу реализовать потоковую передачу, где я беру сообщение json от kafka, анализирую его и сохраняю в mysql.
Моя проблема сейчас заключается в том, что структурированная потоковая передача Spark не дает мне возможности использовать pivot.
Мой код сейчас выглядит так:
val df = readFromHdfsParquet
val df_exploded= df.select($"device", $"owner", $"timestamp", explode($"channels").as("exploded")
.withColumn("channelName", $"exploded.channelName")
.withColumn($"value", $"exploded.value")
.drop("exploded")
val df_grouped = df_exploded
.groupBy($"device, $"owner", $"timestamp")
.pivot("channelName")
.agg(first($"value", false)
, что приводит к требуемой структуре вывода, содержащей все доступные каналы.
Мой Json выглядит так:
{
"device": "TypeA",
"owner": "me",
"timestamp": "2019-05-12 17:27:59",
"channels": [
{
"channelName": "temperature",
"state": 0,
"value": "27"
},
{
"channelName": "humidity",
"state": 0,
"value": "10"
}
]
}
Длина массива каналов не установлена и может изменяться от устройства к устройству.
То, что я хочу, это информационный фрейм со следующей структурой и сохранить его в mysql.
|device|owner|timestamp |temperature|humidity|
|TypeA |me |2019-05-12 17:27:59|27 |10 |
Как нечто подобное возможно при структурированной потоковой передаче?
Также было бы достаточно получить несколько каналов вместо всех, явно выбрав их. (например, температура только без влажности)