Как запустить задание Dataflow с Apache Airflow и не блокировать другие задачи? - PullRequest
0 голосов
/ 25 января 2020

Задача

Задачи воздушного потока типа DataflowTemplateOperator занимают длительное время для выполнения . Это означает, что другие задачи могут быть заблокированы (правильно?).

Когда мы запускаем больше этих задач, это означает, что нам потребуется больший кластер Cloud Composer (в нашем случае) для выполнения задач, которые по существу блокируют, в то время как они должны не должно быть (они должны быть asyn c операции).

Опции

  • Опция 1 : просто запустите задание, и задание воздушного потока выполнено успешно
  • Опция 2 : запись обертку, как объяснено здесь и использовать режим перепланирования, как объяснено здесь

Вариант 1 не представляется возможным, так как DataflowTemplateOperator имеет только возможность укажите время ожидания между проверками завершения, которые называются poll_sleep ( source ).

Для DataflowCreateJavaJobOperator есть опция check_if_running для ожидания завершения предыдущего задания с тем же именем ( см. Этот код )

Кажется, что после запуска задания выполняется wait_for_finish (см. эта строка ), что сводится к «неполному» заданию (см. эта строка ).

Для варианта 2 мне нужен вариант 1.

Вопросы

  1. Правильно ли считать, что задачи потока данных будут блокировать другие в облаке Composer / Airflow?
  2. Есть ли способ запланировать работу без «ожидания до конца sh» с помощью встроенных операторов? (Я мог что-то упустить из виду)
  3. Есть ли простой способ написать это сам ? Я имею в виду просто выполнить сценарий запуска bash, а затем задачу, которая проверяет, правильно ли выполнено задание, но в режиме перепланирования.
  4. Есть ли другой способ избежать блокировки других задач во время выполнения заданий потока данных? По сути это асинхронная c операция, которая не должна занимать ресурсы.

1 Ответ

0 голосов
/ 27 января 2020

Ответы

  1. Правильно ли я предполагаю, что задачи потока данных будут блокировать другие в Cloud Composer / Airflow? A: Отчасти да. Воздушный поток имеет параметр параллелизма в конфигурации, который определяет количество задач, которые должны выполняться одновременно в системе. Наличие блока задач в этом слоте может замедлить выполнение в системе, но эта проблема неизбежно возникает при увеличении количества задач и групп доступности баз данных. Вы можете увеличить это в конфигурации в зависимости от ваших потребностей

Есть ли способ запланировать работу без "ожидания до конца sh", используя встроенные операторы? (Я мог что-то упустить из виду) A: Да. Вы можете использовать PythonOperator, а в python_callable вы можете использовать ловушку потока данных для запуска задания в асинхронном режиме c (запускать и не ждать).
Есть ли простой способ написать это сам? Я имею в виду просто выполнить сценарий запуска bash, а затем задачу, которая проверяет, правильно ли выполнено задание, но в режиме перепланирования. A: Когда вы говорите «перенести», я предполагаю, что вы собираетесь повторить задание, которое ищет задание, которое проверяет, правильно ли выполнено задание. Если я прав, вы можете установить задачу на режим повтора и задержку, с которой вы хотите, чтобы повтор произошел.
Есть ли другой способ избежать блокировки других задач при выполнении заданий потока данных? По сути это асинхронная c операция, которая не должна занимать ресурсы. A: Я думаю, что я ответил на это во втором вопросе.
...