подача имен файлов в действиях блока данных Azure по одному в конвейер данных Azure. - PullRequest
1 голос
/ 17 июня 2019

Я пытаюсь создать конвейер фабрики данных, в котором одно действие вставляет имена файлов (из контейнера или другой папки) в действие блоков данных один за другим для обработки в порядке поступления.Как мне этого добиться?

Ответы [ 2 ]

0 голосов
/ 18 июня 2019

В зависимости от типа источника и частоты, с которой принимаются файлы, может быть также вариант использования потоковой структурированной искры. Для потоковых источников данных также поддерживаются файлы в качестве источника. Считывает файлы, записанные в каталоге, в виде потока данных. Поддерживаемые форматы файлов: текст, csv, json, orc, parquet (для получения более актуального списка и поддерживаемых параметров для каждого формата файла см. Документацию интерфейса DataStreamReader). Обратите внимание, что файлы должны быть атомарно размещены в заданном каталоге, что в большинстве файловых систем может быть достигнуто с помощью операций перемещения файлов.

streamingInputDF = (
  spark
    .readStream           # Similar to Batch just using `readStream` instead of `read`
    .schema(jsonSchema)               
    .json(inputPath)
)

Если вы не хотите запускать ноутбук постоянно, используйте опцию триггер один раз. При использовании триггера опция вывода выводится один раз для доступных данных, без этой опции поток вывода будет работать постоянно:

streamingOutputDF \
    .coalesce(1) \
    .writeStream \
    .format("parquet") \
    .partitionBy('ingest_date') \
    .option("checkpointLocation", checkPointPath) \
    .option("path", targetPath) \
    .trigger(once=True) \
    .start()

В этом сценарии вы можете использовать фабрику данных для запуска записной книжки Databricks без параметров.

0 голосов
/ 18 июня 2019

Здравствуйте, Сурбхи Тайал, и спасибо за ваш запрос.По порядку входящих я предполагаю, что вы имеете в виду, что первый, отправленный в блоки данных, должен быть первым законченным, в отличие от параллельной обработки.

Для этого вам потребуется следующее:

  • конвейерная переменная типа array.
  • механизм для заполнения переменной массива именами файлов.Это может быть действие GetMetadata или параметры конвейера или что-то еще.Добавьте дополнительные сведения, чтобы узнать, нужна ли вам помощь в этом.
  • a Ресурс Databricks и Linked Service

В своем конвейере создайте действие ForEach.В настройках отметьте опцию «Последовательный».В противном случае ваши действия будут отправляться параллельно, а не по одному.В настройках, ссылка на переменную массива в «Предметы».Выражение выглядит как @variables('myVariableName').Внутри действия ForEach поместите тип действия Databricks.Варианты выбора: «Записная книжка», «Баночка» и «Питон».Для себя я использовал ноутбук.Записную книжку было проще настроить из-за пользовательского интерфейса «Обзор».Установите действие, чтобы сначала использовать соответствующую связанную службу.Установите «Файл Python» / «Путь к ноутбуку» / «Имя основного класса».Разверните раздел «Параметры» и добавьте новый параметр.Дайте параметру (ам) то же имя, что и в скрипте Databricks.Значение должно быть @string(item()) (может отличаться, если ваш перечисляемый не является простым массивом примитивов).Это захватывает элемент из действия ForEach и гарантирует, что это строка.При необходимости установите библиотеки.

Когда вы попытаетесь запустить / отладить, помните, что для раскрутки кластера Databricks может потребоваться много времени.Это увеличивает время работы трубопровода.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...