Я получаю потоковый набор данных из концентраторов событий Azure. Данные поступают в следующем формате:
[
[
{
"data": "sampledata1",
"addressdata": {
"isTest": false,
"address": "washington",
"zipcode": 98119,
"city": "seattle",
"randomstring": "abcdabcd:ghkjnkasd:asdasdasd"
},
"profession": "engineer",
"party": "democrat"
},
{
"data": "sampledata2",
"addressdata": {
"isTest": false,
"address": "virginia",
"zipcode": 20120,
"city": "Centreville",
"randomstring": "zabcdabcd:tghkjnkasd:gasdasdasd"
},
"profession": "teacher",
"party": "republican"
}
]
]
Из следующей статьи я могу получить json в виде необработанной строки.
https://docs.databricks.com/spark/latest/structured-streaming/streaming-event-hubs.html
Но я не могу извлечь отдельный элемент из строки, используя get_jon_object. Я считаю, что проблема заключается в том, что строка не является одним объектом JSON, это массив массива JSON. Таким образом, get_json_object не может его проанализировать.
val outputDf = streamingInputDf.select(
get_json_object(($"body").cast("string"), "$.data").alias("data"),
get_json_object(($"body").cast("string"), "$.addressdata").alias("addressdata"),
get_json_object(($"body").cast("string"), "$.profession").alias("profession"),
get_json_object(($"body").cast("string"), "$.party").alias("party"),
date_format($"enqueuedTime", "dd.MM.yyyy").alias("day"),
date_format($"enqueuedTime", "HH").cast("int").alias("hour") ,
when(date_format($"enqueuedTime", "mm").cast("int")<=15,1)
.when(date_format($"enqueuedTime", "mm").cast("int")>15 && date_format($"enqueuedTime", "mm").cast("int")<=30,2)
.when(date_format($"enqueuedTime", "mm").cast("int")>30 && date_format($"enqueuedTime", "mm").cast("int")<=45,3)
.otherwise(4).alias("minute")
)
У кого-нибудь есть предложения, как изящно разобрать данные и извлечь индивидуальную информацию из строки? Любой метод, похожий на get_json_object, который может извлечь данные из массива json?
PS: Я получаю массив JSON в одну строку. не так, как указано выше.