Я использую записные книжки Databricks для чтения файлов avro, хранящихся в Azure Data Lake Gen2. Файлы avro создаются с помощью захвата концентратора событий и представляют собой определенную схему c. Из этих файлов мне нужно извлечь только поле Body, в котором фактически хранятся интересующие меня данные.
Я уже реализовал это в Python, и он работает, как ожидалось:
path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path) # 1
df1 = df0.select(df0.Body.cast('string')) # 2
rdd1 = df1.rdd.map(lambda x: x[0]) # 3
data = spark.read.json(rdd1) # 4
Теперь мне нужно перевести это в необработанный SQL, чтобы фильтровать данные непосредственно в SQL запрос. Учитывая 4 шага выше, шаги 1 и 2 с SQL выглядят следующим образом:
CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")
WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro)
SELECT * FROM body_array
С этим частичным запросом я получаю то же, что и df1 выше (шаг 2 с Python):
Body
[{"id":"a123","group":"0","value":1.0,"timestamp":"2020-01-01T00:00:00.0000000"},
{"id":"a123","group":"0","value":1.5,"timestamp":"2020-01-01T00:01:00.0000000"},
{"id":"a123","group":"0","value":2.3,"timestamp":"2020-01-01T00:02:00.0000000"},
{"id":"a123","group":"0","value":1.8,"timestamp":"2020-01-01T00:03:00.0000000"}]
[{"id":"b123","group":"0","value":2.0,"timestamp":"2020-01-01T00:00:01.0000000"},
{"id":"b123","group":"0","value":1.2,"timestamp":"2020-01-01T00:01:01.0000000"},
{"id":"b123","group":"0","value":2.1,"timestamp":"2020-01-01T00:02:01.0000000"},
{"id":"b123","group":"0","value":1.7,"timestamp":"2020-01-01T00:03:01.0000000"}]
...
Мне нужно знать, как ввести шаги 3 и 4 в запрос SQL, чтобы проанализировать строки на объекты json и, наконец, получить желаемый фрейм данных с идентификатором столбца, группой, значением и меткой времени. Спасибо.