Airflow неоднократно повторяет попытки с ошибкой задачи находится в «запланированном» состоянии, которое не является допустимым состоянием для выполнения - PullRequest
0 голосов
/ 06 ноября 2019

Когда мои суб-метки превышают ~ 1000 задач, Airflow начинает ползти вдоль. Я заметил, что проблема заключается в том, что воздушный поток неоднократно пытается запустить задачи, которые терпят неудачу с ошибкой «задача находится в запланированном состоянии» вместо того, чтобы успешно работать, как я ожидал. Эти задачи отображаются в пользовательском интерфейсе воздушного потока желтым цветом, пока они случайно не начнутся случайно через некоторое время. Я не пробовал ни одного нормального (не поддагса) такого большого размера.

Нет ничего, что могло бы остановить выполнение этих даг.

Это также может быть связано с попаданиеммаксимальный параллелизм, когда работа начинается. Я действительно не знаю, где искать.

Только с набором небольших заданий, кажется, что поток воздуха работает нормально.

Я вижу много процессов, запускаемых потоком воздуха с задачами, запущенными

/usr/local/bin/airflow tasks run <subdag ID> <task id> <execution date> ...

Эти задачи должны выполняться нормально, но в их журналах я вижу следующее (я отредактировал имя задачи):

cat /opt/airflow/logs/<subdag ID>/<task ID>/<execution date>/1.log
[2019-11-06 08:56:01,572] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 08:56:01,578] {logging_mixin.py:89} INFO - [2019-11-06 08:56:01,578] {local_task_job.py:86} INFO - Task is not able to be run
[2019-11-06 15:33:31,196] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 15:33:31,204] {logging_mixin.py:89} INFO - [2019-11-06 15:33:31,203] {local_task_job.py:86} INFO - Task is not able to be run
[2019-11-06 15:35:45,554] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 15:35:45,562] {logging_mixin.py:89} INFO - [2019-11-06 15:35:45,562] {local_task_job.py:86} INFO - Task is not able to be run
[2019-11-06 15:36:53,001] {taskinstance.py:618} INFO - Dependencies not met for <TaskInstance: <dag>.<subdag>.<task> 2019-11-06T03:25:02.889939+00:00 [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-11-06 15:36:53,003] {logging_mixin.py:89} INFO - [2019-11-06 15:36:53,002] {local_task_job.py:86} INFO - Task is not able to be run

Это потребляет огромное количество ЦП, после запуска этих задачи выйти такЧерез несколько часов задача, как правило, будет выполнена.

Еще несколько подробностей:

Я использую LocalExecutor

То, что я пробовал:

У меня естьпопытался настроить потоки планировщика (max_threads) на 1, попытался изменить run_duration до 300 с -1, увеличил dagbag_import_timeout до 200, что намного дольше, чем загружают мои пакеты (они занимают менее 3 секунд), и попытался полностью удалить базу данных ипереинициализировав его

Редактировать:

Я прошелся по исходному коду и внес изменения, которые обеспечили бесперебойную работу воздушного потока. К сожалению, это имеет следствие того, что поток воздуха не обрабатывает отмененные задачи должным образом - если что-либо ставится в очередь, с этим изменением он все равно будет запускать эти задачи в очереди.

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index a6b42bc..0c79f46 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1097,7 +1097,7 @@ class SchedulerJob(BaseJob):
                 ignore_all_deps=False,
                 ignore_depends_on_past=False,
                 ignore_task_deps=False,
-                ignore_ti_state=False,
+                ignore_ti_state=True,
                 pool=simple_task_instance.pool,
                 file_path=simple_dag.full_filepath,
                 pickle_id=simple_dag.pickle_id)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...