Медленный запуск Airflow 1.10.2 ETL при использовании ExternalTaskSensor для зависимости задачи DAG? - PullRequest
2 голосов
/ 21 мая 2019

У меня есть две группы доступности баз данных, которые мне нужно запустить с Airflow 1.10.2 + CeleryExecutor.Первый DAG (DAG1) - это длительная загрузка данных из s3 в Redshift (более 3 часов).Мой второй DAG (DAG2) выполняет вычисления для данных, загруженных DAG1.Я хочу включить ExternalTaskSensor в DAG2, чтобы вычисления надежно выполнялись после загрузки данных.Теоретически это так просто!

Я могу успешно заставить DAG2 дождаться завершения DAG1, обеспечив одновременный запуск обеих групп DAG (schedule = "0 8 * * *" для обеих групп DAG), и DAG2зависит от конечного задания в DAG1.Но я вижу огромную задержку в нашем ETL на DAG1, когда я представляю датчик.Я сначала, хотя это было, потому что моя оригинальная реализация использовала mode="poke", который я понимаю, блокирует рабочий.Однако даже когда я изменил это значение на mode="reschedule", как я читал в документах https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/base_sensor_operator.html, я все еще вижу значительную задержку ETL.

Я использую приведенный ниже код ExternalTaskSensor в DAG2:

wait_for_data_load = ExternalTaskSensor(
    dag=dag,
    task_id="wait_for_data_load",
    external_dag_id="dag1",
    external_task_id="dag1_final_task_id",
    mode="reschedule",
    poke_interval=1800,  # check every 30 min
    timeout=43200,  # timeout after 12 hours (catch delayed data load runs)
    soft_fail=False  # if the task fails, we assume a failure
)

Если код работал правильно, я ожидал бы, что датчик выполнит быструю проверку, завершил ли DAG1, и, если нет, перенесет на 30 минут время, как определено poke_interval, не вызывая задержки для DAG1.ETL.Если DAG1 не завершится через 12 часов, тогда DAG2 перестанет тыкать и завершится неудачей.

Вместо этого я получаю частые ошибки для каждой из задач в DAG1, говоря (например) Executor reports task instance <TaskInstance: dag1.data_table_temp_redshift_load 2019-05-20 08:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?, даже если задачизавершаются успешно (с некоторой задержкой).Непосредственно перед отправкой этой ошибки я вижу строку в наших журналах Sentry, в которой говорится Executor reports dag1.data_table_temp_redshift_load execution_date=2019-05-20 08:00:00+00:00 as failed for try_number 1, хотя (снова) я вижу, что задача выполнена успешно.

Журналы на DAG2 также выглядят немного странно.Я вижу повторные попытки, зарегистрированные в те же промежутки времени, как в приведенной ниже выдержке:

--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

[2019-05-21 08:01:48,417] {{models.py:1593}} INFO - Executing <Task(ExternalTaskSensor): wait_for_data_load> on 2019-05-20T08:00:00+00:00
[2019-05-21 08:01:48,419] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run dag2 wait_for_data_load 2019-05-20T08:00:00+00:00 --job_id 572075 --raw -sd DAGS_FOLDER/dag2.py --cfg_path /tmp/tmp4g2_27c7']
[2019-05-21 08:02:02,543] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:02,542] {{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=28219
[2019-05-21 08:02:12,000] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:11,996] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-05-21 08:02:15,840] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:15,827] {{models.py:273}} INFO - Filling up the DagBag from /usr/local/airflow/dags/dag2.py
[2019-05-21 08:02:16,746] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:16,745] {{dag2.py:40}} INFO - Waiting for the dag1_final_task_id operator to complete in the dag1 DAG
[2019-05-21 08:02:17,199] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:17,198] {{cli.py:520}} INFO - Running <TaskInstance: dag1. wait_for_data_load 2019-05-20T08:00:00+00:00 [running]> on host 11d93b0b0c2d
[2019-05-21 08:02:17,708] {{external_task_sensor.py:91}} INFO - Poking for dag1. dag1_final_task_id on 2019-05-20T08:00:00+00:00 ... 
[2019-05-21 08:02:17,890] {{models.py:1784}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2019-05-21 08:02:17,892] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.25.2) or chardet (3.0.4) doesn't match a supported version!
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   RequestsDependencyWarning)
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-05-21 08:02:17,894] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   """)
[2019-05-21 08:02:22,597] {{logging_mixin.py:95}} INFO - [2019-05-21 08:02:22,589] {{jobs.py:2527}} INFO - Task exited with return code 0

[2019-05-21 08:01:48,125] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2. wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:01:48,311] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2. wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:01:48,311] {{models.py:1571}} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

[2019-05-21 08:01:48,417] {{models.py:1593}} INFO - Executing <Task(ExternalTaskSensor): wait_for_data_load> on 2019-05-20T08:00:00+00:00
[2019-05-21 08:01:48,419] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run dag2 wait_for_data_load 2019-05-20T08:00:00+00:00 --job_id 572075 --raw -sd DAGS_FOLDER/dag2.py --cfg_path /tmp/tmp4g2_27c7']
[2019-05-21 08:02:02,543] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:02,542] {{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=28219
[2019-05-21 08:02:12,000] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:11,996] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-05-21 08:02:15,840] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:15,827] {{models.py:273}} INFO - Filling up the DagBag from /usr/local/airflow/dags/dag2.py
[2019-05-21 08:02:16,746] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:16,745] {{dag2.py:40}} INFO - Waiting for the dag1_final_task_id operator to complete in the dag1 DAG
[2019-05-21 08:02:17,199] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:17,198] {{cli.py:520}} INFO - Running <TaskInstance: dag2.wait_for_data_load 2019-05-20T08:00:00+00:00 [running]> on host 11d93b0b0c2d
[2019-05-21 08:02:17,708] {{external_task_sensor.py:91}} INFO - Poking for dag1.dag1_final_task_id on 2019-05-20T08:00:00+00:00 ... 
[2019-05-21 08:02:17,890] {{models.py:1784}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2019-05-21 08:02:17,892] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.25.2) or chardet (3.0.4) doesn't match a supported version!
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   RequestsDependencyWarning)
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-05-21 08:02:17,894] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   """)
[2019-05-21 08:02:22,597] {{logging_mixin.py:95}} INFO - [2019-05-21 08:02:22,589] {{jobs.py:2527}} INFO - Task exited with return code 0
[2019-05-21 08:33:31,875] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2.wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:33:31,903] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2.wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:33:31,903] {{models.py:1571}} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

Хотя во всех журналах указано Starting attempt 1 of 4, я вижу записи попыток примерно каждые 30 минут, но я вижу несколько журналов длякаждый временной интервал (10+ одинаковых журналов, напечатанных для каждого 30-минутного интервала).

Из поисков вокруг я вижу, что другие люди используют датчики в производственных процессах https://eng.lyft.com/running-apache-airflow-at-lyft-6e53bb8fccff,, что заставляет меня думать, что есть способ обойти это, или я что-то неправильно понимаю.Но я также вижу открытые проблемы в проекте воздушного потока, связанные с этой проблемой, так что, возможно, есть более глубокая проблема в проекте?Я также нашел связанный, но оставшийся без ответа пост здесь Apache Airflow 1.10.3: Executor сообщает об экземпляре задачи ???выполнено (не выполнено), хотя задание говорит о своей очереди.Была ли задача убита извне?

Кроме того, мы используем следующие параметры конфигурации:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

1 Ответ

0 голосов
/ 22 мая 2019

Эти симптомы были вызваны вызовом Variable.set() в теле DAG1, которое DAG2 затем использовал для получения динамически сгенерированных DAG1 dag_id.Variable.set() all вызывало ошибку (обнаружена в рабочих журналах).Как описано здесь , планировщик опрашивает определения DAG с каждым тактом, чтобы обновлять DAG в актуальном состоянии.Это означало ошибку с каждым пульсом, которая вызывала большую задержку ETL.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...