Когда мои суб-метки превышают ~ 1000 задач, Airflow начинает ползти вдоль. Я заметил, что проблема заключается в том, что воздушный поток неоднократно пытается запустить задачи, которые терпят неудачу с ошибкой «задача находится в запланированном состоянии» вместо того, чтобы успешно работать, как я ожидал. Эти задачи отображаются в пользовательском интерфейсе воздушного потока желтым цветом, пока они случайно не начнутся случайно через некоторое время. Я не пробовал ни одного нормального (не поддагса) такого большого размера.
Нет ничего, что могло бы остановить выполнение этих даг.
Это также может быть связано с попаданиеммаксимальный параллелизм, когда работа начинается. Я действительно не знаю, где искать.
Только с набором небольших заданий, кажется, что поток воздуха работает нормально.
Я вижу много процессов, запускаемых потоком воздуха с задачами, запущенными
/usr/local/bin/airflow tasks run <subdag ID> <task id> <execution date> ...
Эти задачи должны выполняться нормально, но в их журналах я вижу следующее (я отредактировал имя задачи):
cat /opt/airflow/logs/<subdag ID>/<task ID>/<execution date>/1.log
[2019-11-06 08:56:01,572] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 08:56:01,578] {logging_mixin.py:89} INFO - [2019-11-06 08:56:01,578] {local_task_job.py:86} INFO - Task is not able to be run
[2019-11-06 15:33:31,196] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 15:33:31,204] {logging_mixin.py:89} INFO - [2019-11-06 15:33:31,203] {local_task_job.py:86} INFO - Task is not able to be run
[2019-11-06 15:35:45,554] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 15:35:45,562] {logging_mixin.py:89} INFO - [2019-11-06 15:35:45,562] {local_task_job.py:86} INFO - Task is not able to be run
[2019-11-06 15:36:53,001] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 15:36:53,003] {logging_mixin.py:89} INFO - [2019-11-06 15:36:53,002] {local_task_job.py:86} INFO - Task is not able to be run
Это потребляет огромное количество ЦП, после запуска этих задачи выйти такЧерез несколько часов задача, как правило, будет выполнена.
Еще несколько подробностей:
Я использую LocalExecutor
То, что я пробовал:
У меня естьпопытался настроить потоки планировщика (max_threads) на 1, попытался изменить run_duration до 300 с -1, увеличил dagbag_import_timeout до 200, что намного дольше, чем загружают мои пакеты (они занимают менее 3 секунд), и попытался полностью удалить базу данных ипереинициализировав его
Редактировать:
Я прошелся по исходному коду и внес изменения, которые обеспечили бесперебойную работу воздушного потока. К сожалению, это имеет следствие того, что поток воздуха не обрабатывает отмененные задачи должным образом - если что-либо ставится в очередь, с этим изменением он все равно будет запускать эти задачи в очереди.
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index a6b42bc..0c79f46 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1097,7 +1097,7 @@ class SchedulerJob(BaseJob):
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
- ignore_ti_state=False,
+ ignore_ti_state=True,
pool=simple_task_instance.pool,
file_path=simple_dag.full_filepath,
pickle_id=simple_dag.pickle_id)