Я пытаюсь контролировать, на каком работнике Airflow выполняется задание, однако планировщик не выбирает параметр очереди в определении DAG.
Я определил очередь в своем операторе subdag:
xdata_run_etl = sub_dag_operator_with_celery_executor(
subdag = build_xdata_etl_dag(dag, 'xdata_run_etl'),
task_id = 'xdata_run_etl',
dag = dag,
trigger_rule='none_failed',
queue='subdag'
)
И я вижу, что настройки очереди были подобраны. В разделе «Атрибуты задачи» в пользовательском интерфейсе queue
установлено значение subdag
.
Однако, когда я запускаю группу обеспечения доступности баз данных, планировщик все еще отправляет задачу в очередь по умолчанию. Как отмечается в журналах планировщика:
: [2020-04-02 20:38:49,581] {scheduler_job.py:1168} INFO - Sending ('run_etl', 'xdata_run_etl', datetime.datetime(2020, 4, 2, 17, 27, 38, 368220, tzininfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 10) to executor with priority 2 and queue default
Ожидаемое поведение состоит в том, что эта задача будет отправлена в очередь subdag
и запущена на работнике Airflow, который прослушивает эту очередь. (airflow worker -q subdag
). Фактическое поведение заключается в том, что все задачи отправляются в очередь по умолчанию независимо от определяемого параметра очереди.
Версия воздушного потока: 1.10.9