Мы перешли на AirFlow 1.10.2 , чтобы разрешить использование процессора. Хорошо, что проблема, с которой мы столкнулись, была исправлена в нашей среде.Тем не менее, мы заметили, что задачи группы обеспечения доступности баз данных, хотя и отправляются, и показывают, что они выполняются на панели мониторинга AirFlow, но они как бы выдерживают фактическую обработку, а затем остаются в очереди в течение примерно 60 секунд после того, как происходит фактическое выполнение.Обратите внимание, что для реализации нашего варианта использования
- DAG AirFlow не зависят от времени, т. Е. Они не '** Запланированные DAG ' **, а запускаются через код Python.
- AirFlow v1.10.2 используется как отдельная автономная установка [
executor = LocalExecutor
].
Код Python просматривает каталог для всех поступающих файлов.Он отмечает, что для любого файла код запускает AirFlow DAG.Мы получаем пачки прибывающих файлов, и поэтому в каждом конкретном случае у нас есть сценарии, в которых вызывается несколько экземпляров одних и тех же групп доступности данных [ фрагмент кода, приведенный ниже ].Триггеры DAG запускаются, что, в свою очередь, имеет задачу, которая вызывает код Python для запуска модуля Kubernetes, где происходит некоторая обработка, связанная с файлами.Ниже приведена выдержка из кода DAG
positional_to_ascii = BashOperator(
task_id="uncompress_the_file",
bash_command='python3.6 ' + os.path.join(cons.CODE_REPO, 'app/Code/k8Job/create_kubernetes_job.py') + ' POS-PREPROCESSING {{ dag_run.conf["inputfilepath"] }} {{ dag_run.conf["frt_id"]}}',
execution_timeout=None,
dag=dag)
После выполнения этой задачи запускается еще одна группа DAG, в которой есть задача обработки данных из выходных данных предыдущей группы DAG.
Ниже приведено несколько подробностей о параметрах нашего конфигурационного файла, которые могут помочь в оценке основной причины.
min_file_process_interval = 60
dag_dir_list_interval = 300
max_threads = 2
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
sql_alchemy_conn = mysql://airflow:fewfw324$gG@someXserver:3306/airflow
executor = LocalExecutor
Время разбора DagBag: 1.305286.Ниже также приведен снимок команды airflow list_dags -r
ниже
-------------------------------------------------------------------
DagBag loading stats for /root/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 7
Total task number: 23
DagBag parsing time: 1.305286
------------------------------+----------+---------+----------+------------------------------
file | duration | dag_num | task_num | dags
------------------------------+----------+---------+----------+------------------------------
/trigger_cleansing.py | 0.876388 | 1 | 5 | ['trigger_cleansing']
/processing_ebcdic_trigger.py | 0.383038 | 1 | 1 | ['processing_ebcdic_trigger']
/prep_preprocess_dag.py | 0.015474 | 1 | 6 | ['prep_preprocess_dag']
/prep_scale_dag.py | 0.012098 | 1 | 6 | ['dataprep_scale_dag']
/mvp.py | 0.010832 | 1 | 2 | ['dg_a']
/prep_uncompress_dag.py | 0.004142 | 1 | 2 | ['dataprep_unzip_decrypt_dag']
/prep_positional_trigger.py | 0.003314 | 1 | 1 | ['prep_positional_trigger']
------------------------------+----------+---------+----------+------------------------------
Ниже приведен статус службы планировщика воздушного потока, которая показывает несколько процессов
systemctl status airflow-scheduler
● airflow-scheduler.service - Airflow scheduler daemon
Loaded: loaded (/etc/systemd/system/airflow-scheduler.service; enabled; vendor preset: disabled)
Active: active (running) since Sat 2019-03-09 04:44:29 EST; 33min ago
Main PID: 37409 (airflow)
CGroup: /system.slice/airflow-scheduler.service
├─37409 /usr/bin/python3.6 /bin/airflow scheduler
├─37684 /usr/bin/python3.6 /bin/airflow scheduler
├─37685 /usr/bin/python3.6 /bin/airflow scheduler
├─37686 /usr/bin/python3.6 /bin/airflow scheduler
├─37687 /usr/bin/python3.6 /bin/airflow scheduler
├─37688 /usr/bin/python3.6 /bin/airflow scheduler
├─37689 /usr/bin/python3.6 /bin/airflow scheduler
├─37690 /usr/bin/python3.6 /bin/airflow scheduler
├─37691 /usr/bin/python3.6 /bin/airflow scheduler
├─37692 /usr/bin/python3.6 /bin/airflow scheduler
├─37693 /usr/bin/python3.6 /bin/airflow scheduler
├─37694 /usr/bin/python3.6 /bin/airflow scheduler
├─37695 /usr/bin/python3.6 /bin/airflow scheduler
├─37696 /usr/bin/python3.6 /bin/airflow scheduler
├─37697 /usr/bin/python3.6 /bin/airflow scheduler
├─37699 /usr/bin/python3.6 /bin/airflow scheduler
├─37700 /usr/bin/python3.6 /bin/airflow scheduler
├─37701 /usr/bin/python3.6 /bin/airflow scheduler
├─37702 /usr/bin/python3.6 /bin/airflow scheduler
├─37703 /usr/bin/python3.6 /bin/airflow scheduler
├─37704 /usr/bin/python3.6 /bin/airflow scheduler
├─37705 /usr/bin/python3.6 /bin/airflow scheduler
├─37706 /usr/bin/python3.6 /bin/airflow scheduler
├─37707 /usr/bin/python3.6 /bin/airflow scheduler
├─37708 /usr/bin/python3.6 /bin/airflow scheduler
├─37709 /usr/bin/python3.6 /bin/airflow scheduler
├─37710 /usr/bin/python3.6 /bin/airflow scheduler
├─37712 /usr/bin/python3.6 /bin/airflow scheduler
├─37713 /usr/bin/python3.6 /bin/airflow scheduler
├─37714 /usr/bin/python3.6 /bin/airflow scheduler
├─37715 /usr/bin/python3.6 /bin/airflow scheduler
├─37717 /usr/bin/python3.6 /bin/airflow scheduler
├─37718 /usr/bin/python3.6 /bin/airflow scheduler
└─37722 /usr/bin/python3.6 /bin/airflow scheduler
Теперь фактчто у нас есть несколько файлов, поступающих в группы DAG, которые постоянно запускаются, и у нас есть достаточно задач DAG, которые переходят в стадию ожидания.Странно, но у нас не было этой проблемы, когда мы использовали v1.9, пожалуйста, сообщите.