Мне было предложено построить конвейер ETL в Azure. Этот конвейер должен
- прочитать файл OR C, отправленный поставщиком в ADLS
- , проанализировать поле PARAMS, существующее в структуре OR C, где структура JSON сохранить и добавить его как два новых поля (KEY, VALUE) к выводу
- записать вывод в базу данных Azure SQL
Проблема в том, что там это разные типы структур JSON, используемые разными типами записей. Я не хочу писать пользовательские выражения для каждого класса JSON struct (их было бы как сотни). Скорее, я ищу общий механизм c, который сможет анализировать их по типу структуры ввода JSON.
На данный момент, чтобы выполнить это требование, я был используя встроенный разъем ADF для ИЛИ C. Процесс в его текущем дизайне:
- Используйте операцию копирования, которая читает ИЛИ C и перемещает данные в Azure SQL базу данных
Используйте следующие T SQL оператор как часть хранимой процедуры, выполняемой после 1. для анализа содержимого поля PARAMS
SELECT uuid,
AttrName = a1.[key] +
COALESCE('.' + a2.[key], '') +
COALESCE('.' + a3.[key], '') +
COALESCE('.' + a4.[key], ''),
AttrValue = COALESCE(a4.value, a3.value, a2.value, a1.value)
FROM ORC.EventsSnapshot_RawData
OUTER APPLY OPENJSON(params) a1
OUTER APPLY
(
SELECT [key],
value,
type
FROM OPENJSON(a1.value)
WHERE ISJSON(a1.value) = 1
) a2
OUTER APPLY
(
SELECT [key],
value,
type
FROM OPENJSON(a2.value)
WHERE ISJSON(a2.value) = 1
) a3
OUTER APPLY
(
SELECT [key],
value,
type
FROM OPENJSON(a3.value)
WHERE ISJSON(a3.value) = 1
) a4
Количество требуемых операторов OUTER APPLY определяется в начале путем подсчета вхождений «[» в значении поля PARAMS и затем используется для динамического генерирования SQL, выполняемого через sp_executesql
К сожалению, этот подход весьма неэффективен с точки зрения времени выполнения, так как для 11 ММ записей он занимает c .a. 3,5 часа до финиша sh
Кто-то предложил мне использовать Data Bricks. Итак, я:
создал блокнот со следующим python кодом для чтения ИЛИ C из ADLS и материализации его в таблицу Data Bricks
orcfile = "/mnt/adls/.../Input/*.orc"
eventDf = spark.read.orc(orcfile)
#spark.sql("drop table if exists ORC.Events_RawData")
eventDf.write.mode("overwrite").saveAsTable("ORC.Events_Raw")
- сейчас я пытаюсь найти код, который дал бы результат, который я получаю от T SQL OPENJSON. Я начал с кода Python, который использует рекурсию для анализа атрибута PARAMS, однако он даже более неэффективен с точки зрения скорости выполнения, чем T SQL.
Может пожалуйста, предложите мне правильный способ достижения цели, то есть преобразование атрибута PARAMS в атрибуты KEY, VALUE в общем c виде?