Условно выполнить несколько веток одну за другой - PullRequest
0 голосов
/ 19 апреля 2020

Примечание


У нас есть необычный мультиплексор -подобный пример использования в нашем рабочем процессе

                                +-----------------------+
                                |                       |
                  +------------>+  branch-1.begin-task  |
                  |             |                       |
                  |             +-----------------------+
                  |
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-2.begin-task  |
                  |             |                       |
+------------+    |             +-----------------------+
|            |    |
|  MUX-task  +----+                         +
|            |    |                         |
+------------+    |
                  |                         |
                  +- -- -- -- ->
                  |                         |
                  |
                  |                         |
                  |                         +
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-n.begin-task  |
                                |                       |
                                +-----------------------+

Ожидается, что поток будет работать следующим образом:

  • MUX-task прослушивает события во внешней очереди (одиночная очередь)
  • каждое событие в очереди запускает выполнение одна из ветвей (branch-n.begin-task)
  • одна за другой, по мере поступления событий MUX-задача должна инициировать выполнение соответствующей ветви
  • после запуска всех веток MUX-задача завершает

Допущения

  • Ровно n события поступают в очереди один для запуска каждой ветви
  • n равен динамически известен : его значение определено в Variable

Ограничения

  • Внешняя очередь, куда поступают события, равна только одна
  • , у нас не может быть n очередей (по одной на ветку), так как ветви растут со временем (n определяется динамически)

Мы не можем найти решение в наборе операторов и датчиков компании Airflow (или любой такой вещи, доступной вне - в Airflow) для создания этого

  1. Sensor s можно использовать для прослушивания событий во внешней очереди; но мы должны прослушивать несколько событий, но не одно
  2. BranchPythonOperator можно использовать для запуска выполнения одной ветви из множества, но оно немедленно помечает оставшиеся ветви как пропущено

Основное узкое место

Из-за вышеуказанного 2-го ограничения даже пользовательский оператор объединяет функции Sensor и BranchPythonOperator не сработает.

Мы пытались провести мозговой штурм вокруг причудливой комбинации Sensors, DummyOperator и trigger_rules, чтобы достичь этого, но пока не добились успеха.

Это возможно в Airflow?


UPDATE-1

Вот некоторая справочная информация для понимания контекста рабочего процесса

  • у нас есть конвейер ETL для синхронизации c MySQL таблиц (через несколько Aurora баз данных) с нашим озером данных
  • , чтобы преодолеть влияние нашего конвейера syn c на Для рабочих баз данных мы решили сделать это
    • для каждой базы данных, создать снимок t ( восстановить AuroraDB кластер из последней резервной копии)
    • выполнить MySQL syn c конвейер, используя этот снимок
    • затем в конце syn c завершите снимок (AuroraDB кластер)
  • события жизненного цикла снимка из Aurora снимок процесс восстановления опубликован в SQS очередь
    • единую очередь для всех баз данных
    • эта настройка была сделана нашей командой DevOps (другой учетной записью AWS мы не имеют доступа к базовому Lambda s / SQS / infra)

1 Ответ

0 голосов
/ 19 апреля 2020

XCOM с на помощь!


Мы решили смоделировать задачи следующим образом (обе задачи пользовательские operator с)

  • MUX-task больше похож на итеративный - sensor: он продолжает прослушивать события в очереди и предпринимает некоторые действия против каждого события, прибывающего в очередь
  • Все branch-x.begin-task простые датчики : они прослушивают публикацию XCOM (имя которого в предварительно определенном формате c )

Рабочий процесс выполняется следующим образом

  • MUX-task прослушивает события в очереди (прослушивающая часть заключена в for -l oop со многими итерации как число ветвей)
  • Когда приходит событие, MUX-task забирает его; определяет, какая «ветка» должна быть запущена, и публикует XCOM для соответствующей ветви
  • Соответствующая ветка sensor обнаруживает, что XCOM на это следующее тыкание, и ветвь начинает выполняться. Фактически, ветка sensor просто действует как шлюз , который открывается внешним событием (XCOM) и позволяет выполнять ветку

Поскольку датчиков слишком много (по одному на каждую ветвь), мы, скорее всего, будем использовать mode='reschedule' для преодоления тупиков


  • Поскольку Описанный подход в значительной степени опирается на опрос , мы не считаем его суперэффективным.
  • A реактивный запуск, основанный на подход был бы более желательным, но у нас нет я не смог разобраться

UPDATE-1

  • Похоже, что "реактивный" подход возможен, если мы могли бы смоделировать каждая ветвь как отдельная DAG и вместо публикации XCOM с для каждой ветки, запускает DAG ветки, как TriggerDagRunOperator делает
  • Но поскольку наша монолитная c DAG генерируется программно через сложные логи c, это изменение будет изменено Было бы довольно сложно (много кода переписать). Поэтому мы решили продолжить основанный на опросе подход и жить с несколькими минутами дополнительной задержки в конвейере, для завершения которого уже требуется несколько часов

ОБНОВЛЕНИЕ-2

[со ссылкой на ОБНОВЛЕНИЕ-1 раздел вопроса]

Поскольку наша фактическая реализация потребовала от нас просто подождать для создания базы данных мы решили упростить рабочий процесс следующим образом:

  • конечные точки базы данных были исправлены с помощью DNS (они не меняли каждый раз, когда Aurora снимок восстанавливался
  • мы покончили с MUX-task (а также с очередью SQS для событий жизненного цикла восстановления Aurora )
  • начальная задача каждой ветви branch-x.begin-task был смоделирован как простой sensor, который пытался запустить запрос dummy SQL (SELECT 1), чтобы проверить, стала ли конечная точка базы данных активной или нет
...