Фабрика данных Azure: вывод информации о скопированных файлах и папках из операции копирования - PullRequest
0 голосов
/ 31 декабря 2018

Я использую среду автономной интеграции в фабрике данных Azure для копирования данных из локального источника (обычная файловая система) в место назначения хранилища BLOB-объектов Azure.После передачи я хочу обработать файлы автоматически, подключив Блокнот, работающий в кластере Databricks.Конвейер работает нормально, но мой вопрос касается вывода операции копирования.

Есть ли способ получить информацию о переданных файлах и папках для каждого прогона? Я бы передал эту информациюв качестве параметров для ноутбука.

Глядя на документацию, кажется, что доступна только агрегированная информация:

https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-overview

Какой тип имеет смысл, если вы переводите огромныйколичество файлов.Если это невозможно, я полагаю, что альтернативным подходом было бы просто оставить процесс копирования себе и создать еще один конвейер на основе событий учетной записи хранения?Или, может быть, сохранить информацию о новом файле и папке для каждого прогона в фиксированном текстовом файле, перенести его также и прочитать в записной книжке?

Ответы [ 2 ]

0 голосов
/ 04 января 2019

Решение на самом деле было довольно простым в этом случае.Я только что создал еще один конвейер в фабрике данных Azure, который был вызван событием Blob Created , а папка и filename были переданы в качестве параметров в мою записную книжку.Кажется, работает хорошо, и требуется минимальное количество конфигурации или кода.С помощью события можно выполнить базовую фильтрацию, а остальное - за ноутбуком.

Для всех, кто сталкивается с этим сценарием, подробности приведены ниже:

https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-event-trigger

0 голосов
/ 03 января 2019

Если вы хотите получить информацию о файлах или каталогах, считываемых из фабрики данных, это можно сделать с помощью действия Get Metadata Activity, см. Следующий пример answer .

Другой подходдля обнаружения новых файлов в вашей записной книжке будет использоваться структурированная потоковая передача с файловыми источниками.Это работает довольно хорошо, и вы просто вызываете блокнот после операции копирования.

Для этого вы определяете потоковый фрейм входных данных:

streamingInputDF = (
   spark
     .readStream                     
     .schema(pqtSchema)               
     .parquet(inputPath) 
 )

с inputPath , указывающим на входной каталог в хранилище BLOB-объектов.Поддерживаемые форматы файлов: текстовый, csv, json, orc, parquet, поэтому это будет зависеть от вашего конкретного сценария, будет ли это работать для вас.

Важно, чтобы на цели вы использовали опцию триггер один раз, поэтому записная книжкане нужно запускать постоянно, например:

streamingOutputDF \
    .repartition(1) \
    .writeStream \
    .format("parquet") \
    .partitionBy('Id') \
    .option("checkpointLocation", adlpath +  "spark/checkpointlocation/data/trusted/sensorreadingsdelta") \
    .option("path", targetPath + "delta") \
    .trigger(once=True) \
    .start()

Другой подход может заключаться в использовании хранилища очередей Azure (AQS), см. следующую документацию .

...