Примечание
У нас есть необычный мультиплексор -подобный пример использования в нашем рабочем процессе
+-----------------------+
| |
+------------>+ branch-1.begin-task |
| | |
| +-----------------------+
|
|
| +-----------------------+
| | |
+------------>+ branch-2.begin-task |
| | |
+------------+ | +-----------------------+
| | |
| MUX-task +----+ +
| | | |
+------------+ |
| |
+- -- -- -- ->
| |
|
| |
| +
|
| +-----------------------+
| | |
+------------>+ branch-n.begin-task |
| |
+-----------------------+
Ожидается, что поток будет работать следующим образом:
MUX-task
прослушивает события во внешней очереди (одиночная очередь) - каждое событие в очереди запускает выполнение одна из ветвей (branch-n.begin-task)
- одна за другой, по мере поступления событий MUX-задача должна инициировать выполнение соответствующей ветви
- после запуска всех веток MUX-задача завершает
Допущения
- Ровно
n
события поступают в очереди один для запуска каждой ветви n
равен динамически известен : его значение определено в Variable
Ограничения
- Внешняя очередь, куда поступают события, равна только одна
- , у нас не может быть
n
очередей (по одной на ветку), так как ветви растут со временем (n определяется динамически)
Мы не можем найти решение в наборе операторов и датчиков компании Airflow (или любой такой вещи, доступной вне - в Airflow
) для создания этого
Sensor
s можно использовать для прослушивания событий во внешней очереди; но мы должны прослушивать несколько событий, но не одно BranchPythonOperator
можно использовать для запуска выполнения одной ветви из множества, но оно немедленно помечает оставшиеся ветви как пропущено
Основное узкое место
Из-за вышеуказанного 2-го ограничения даже пользовательский оператор объединяет функции Sensor
и BranchPythonOperator
не сработает.
Мы пытались провести мозговой штурм вокруг причудливой комбинации Sensors
, DummyOperator
и trigger_rules
, чтобы достичь этого, но пока не добились успеха.
Это возможно в Airflow?
UPDATE-1
Вот некоторая справочная информация для понимания контекста рабочего процесса
- у нас есть конвейер ETL для синхронизации c
MySQL
таблиц (через несколько Aurora
баз данных) с нашим озером данных - , чтобы преодолеть влияние нашего конвейера syn c на Для рабочих баз данных мы решили сделать это
- для каждой базы данных, создать снимок t ( восстановить
AuroraDB
кластер из последней резервной копии) - выполнить
MySQL
syn c конвейер, используя этот снимок - затем в конце syn c завершите снимок (
AuroraDB
кластер)
- события жизненного цикла снимка из
Aurora
снимок процесс восстановления опубликован в SQS
очередь - единую очередь для всех баз данных
- эта настройка была сделана нашей командой DevOps (другой учетной записью AWS мы не имеют доступа к базовому
Lambda
s / SQS
/ infra)