Задержка / задержка параллельного выполнения задачи DAG воздушного потока на 60 секунд - PullRequest
1 голос
/ 09 марта 2019

Мы перешли на AirFlow 1.10.2 , чтобы разрешить использование процессора. Хорошо, что проблема, с которой мы столкнулись, была исправлена ​​в нашей среде.Тем не менее, мы заметили, что задачи группы обеспечения доступности баз данных, хотя и отправляются, и показывают, что они выполняются на панели мониторинга AirFlow, но они как бы выдерживают фактическую обработку, а затем остаются в очереди в течение примерно 60 секунд после того, как происходит фактическое выполнение.Обратите внимание, что для реализации нашего варианта использования

  • DAG AirFlow не зависят от времени, т. Е. Они не '** Запланированные DAG ' **, а запускаются через код Python.
  • AirFlow v1.10.2 используется как отдельная автономная установка [executor = LocalExecutor].

Код Python просматривает каталог для всех поступающих файлов.Он отмечает, что для любого файла код запускает AirFlow DAG.Мы получаем пачки прибывающих файлов, и поэтому в каждом конкретном случае у нас есть сценарии, в которых вызывается несколько экземпляров одних и тех же групп доступности данных [ фрагмент кода, приведенный ниже ].Триггеры DAG запускаются, что, в свою очередь, имеет задачу, которая вызывает код Python для запуска модуля Kubernetes, где происходит некоторая обработка, связанная с файлами.Ниже приведена выдержка из кода DAG

positional_to_ascii = BashOperator(
                    task_id="uncompress_the_file",
                    bash_command='python3.6 ' + os.path.join(cons.CODE_REPO, 'app/Code/k8Job/create_kubernetes_job.py') + ' POS-PREPROCESSING {{ dag_run.conf["inputfilepath"] }} {{ dag_run.conf["frt_id"]}}',
                    execution_timeout=None,
                    dag=dag)

После выполнения этой задачи запускается еще одна группа DAG, в которой есть задача обработки данных из выходных данных предыдущей группы DAG.

Ниже приведено несколько подробностей о параметрах нашего конфигурационного файла, которые могут помочь в оценке основной причины.

min_file_process_interval = 60 
dag_dir_list_interval = 300 
max_threads = 2
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
sql_alchemy_conn = mysql://airflow:fewfw324$gG@someXserver:3306/airflow
executor = LocalExecutor

Время разбора DagBag: 1.305286.Ниже также приведен снимок команды airflow list_dags -r ниже

-------------------------------------------------------------------
DagBag loading stats for /root/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 7
Total task number: 23
DagBag parsing time: 1.305286
------------------------------+----------+---------+----------+------------------------------
file                          | duration | dag_num | task_num | dags
------------------------------+----------+---------+----------+------------------------------
/trigger_cleansing.py         | 0.876388 |       1 |        5 | ['trigger_cleansing']
/processing_ebcdic_trigger.py | 0.383038 |       1 |        1 | ['processing_ebcdic_trigger']
/prep_preprocess_dag.py       | 0.015474 |       1 |        6 | ['prep_preprocess_dag']
/prep_scale_dag.py            | 0.012098 |       1 |        6 | ['dataprep_scale_dag']
/mvp.py                       | 0.010832 |       1 |        2 | ['dg_a']
/prep_uncompress_dag.py       | 0.004142 |       1 |        2 | ['dataprep_unzip_decrypt_dag']
/prep_positional_trigger.py   | 0.003314 |       1 |        1 | ['prep_positional_trigger']
------------------------------+----------+---------+----------+------------------------------

Ниже приведен статус службы планировщика воздушного потока, которая показывает несколько процессов

systemctl status airflow-scheduler
● airflow-scheduler.service - Airflow scheduler daemon
   Loaded: loaded (/etc/systemd/system/airflow-scheduler.service; enabled; vendor preset: disabled)
   Active: active (running) since Sat 2019-03-09 04:44:29 EST; 33min ago
 Main PID: 37409 (airflow)
   CGroup: /system.slice/airflow-scheduler.service
           ├─37409 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37684 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37685 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37686 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37687 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37688 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37689 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37690 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37691 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37692 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37693 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37694 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37695 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37696 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37697 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37699 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37700 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37701 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37702 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37703 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37704 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37705 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37706 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37707 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37708 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37709 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37710 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37712 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37713 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37714 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37715 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37717 /usr/bin/python3.6 /bin/airflow scheduler
           ├─37718 /usr/bin/python3.6 /bin/airflow scheduler
           └─37722 /usr/bin/python3.6 /bin/airflow scheduler

Теперь фактчто у нас есть несколько файлов, поступающих в группы DAG, которые постоянно запускаются, и у нас есть достаточно задач DAG, которые переходят в стадию ожидания.Странно, но у нас не было этой проблемы, когда мы использовали v1.9, пожалуйста, сообщите.

1 Ответ

0 голосов
/ 11 марта 2019

Я понял, что в файле ' airflow.cfg ' значение min_file_process_interval равно 60. Установка этого значения в ноль решает проблему, о которой я здесь писал.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...