Запросы к файлам данных avro, хранящимся в Azure Data Lake, напрямую с помощью raw SQL из Databricks - PullRequest
0 голосов
/ 19 июня 2020

Я использую записные книжки Databricks для чтения файлов avro, хранящихся в Azure Data Lake Gen2. Файлы avro создаются с помощью захвата концентратора событий и представляют собой определенную схему c. Из этих файлов мне нужно извлечь только поле Body, в котором фактически хранятся интересующие меня данные.

Я уже реализовал это в Python, и он работает, как ожидалось:

path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path) # 1
df1 = df0.select(df0.Body.cast('string')) # 2
rdd1 = df1.rdd.map(lambda x: x[0]) # 3
data = spark.read.json(rdd1) # 4

Теперь мне нужно перевести это в необработанный SQL, чтобы фильтровать данные непосредственно в SQL запрос. Учитывая 4 шага выше, шаги 1 и 2 с SQL выглядят следующим образом:

CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")

WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro)

SELECT * FROM body_array

С этим частичным запросом я получаю то же, что и df1 выше (шаг 2 с Python):

Body
[{"id":"a123","group":"0","value":1.0,"timestamp":"2020-01-01T00:00:00.0000000"},
{"id":"a123","group":"0","value":1.5,"timestamp":"2020-01-01T00:01:00.0000000"},
{"id":"a123","group":"0","value":2.3,"timestamp":"2020-01-01T00:02:00.0000000"},
{"id":"a123","group":"0","value":1.8,"timestamp":"2020-01-01T00:03:00.0000000"}]
[{"id":"b123","group":"0","value":2.0,"timestamp":"2020-01-01T00:00:01.0000000"},
{"id":"b123","group":"0","value":1.2,"timestamp":"2020-01-01T00:01:01.0000000"},
{"id":"b123","group":"0","value":2.1,"timestamp":"2020-01-01T00:02:01.0000000"},
{"id":"b123","group":"0","value":1.7,"timestamp":"2020-01-01T00:03:01.0000000"}]
...

Мне нужно знать, как ввести шаги 3 и 4 в запрос SQL, чтобы проанализировать строки на объекты json и, наконец, получить желаемый фрейм данных с идентификатором столбца, группой, значением и меткой времени. Спасибо.

1 Ответ

0 голосов
/ 27 июня 2020

Один из способов, которым я нашел это с raw SQL, заключается в следующем: с использованием встроенной функции from_ json Spark SQL и схемы поля Body:

CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")

WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro),
data1 AS (SELECT from_json(Body, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') FROM body_array),
data2 AS (SELECT explode(*) FROM data1),
data3 AS (SELECT col.* FROM data2)
SELECT * FROM data3 WHERE id = "a123"     --FILTERING BY CHANNEL ID

It работает быстрее, чем код Python, который я разместил в вопросе, наверняка из-за использования from_ json и схемы Body для извлечения данных внутри него. Моя версия этого подхода в PySpark выглядит следующим образом:

path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path)
df1 = df0.selectExpr("cast(Body as string) as json_data")
df2 = df1.selectExpr("from_json(json_data, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') as parsed_json")
data = df2.selectExpr("explode(parsed_json) as json").select("json.*")

...