Мы перешли на puckel / Airflow-1.10.2, чтобы попытаться устранить низкую производительность, которую мы имели в разных средах.Мы работаем на ECS Airflow 1.10.2 на AWS ECS.Интересно, что CPU / mem никогда не превышают 80%.Метадоб Airflow также остается недостаточно используемым.
Ниже я перечислил используемую нами конфигурацию, время разбора DagBag плюс подробное время выполнения из вывода cProfile
просто запуска DagBag()
в чистом видеPython.
Некоторые из наших групп DAG импортируют функцию из create_subdag_functions.py
, которая возвращает группу DAG, которую мы используем в 12 группах DAG.Большинство из этих групп доступности баз данных и соответствующих им подкадров выполняется только в час, но 1 подгруппы DAG / 3 запускаются каждые 10 минут.
max_threads = 2
dag_dir_list_interval = 300
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor
Некоторые наблюдения:
airflow list_dags -r
занимаеттоже очень долго и запускает примеры DAG, даже если они отключены.Время разбора каждого DAG будет прыгать. - Длительность каждого DAG является непоследовательной (но она применяется только к нашим DAG, а не к примерам)
- Обычно в это время происходит большой скачок.например, 5 дагов будут иметь длительность <1, а затем следующие 4 будут иметь длительности 20+ </li>
- Когда мы профилировали функцию
DagBag()
с помощью cProfile, мы обнаружили, что DagBag () большую часть своего времени проводил в airflow.utils.dag_processing.list_py_paths
Эта функция, вероятно, из-за 50+ файлов sql в нашей папке / usr / local / airflow / dags - При просмотре времен посадки время выполнения задачи между двумя конкретными прогонами увеличилось на порядок.Я пытался просматривать журналы и т. Д., И между двумя прогонами нет ничего примечательного.Я прикрепил изображение внизу.Эта потеря производительности была в Airflow 1.10.0
Решения, которые я пробовал:
- увеличение / уменьшение
max_threads
- увеличение / устранение
min_file_process_interval
- очистка базы данных воздушного потока от всех групп обеспечения доступности баз данных и перезагрузка
- выключение и повторное развертывание среды
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 189.77048399999995
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
/dag1.py | 60.576728 | 1 | 21 | ['dag1']
/dag2.py | 55.092603999999994 | 1 | 28 | ['dag2']
/dag3.py | 47.997972000000004 | 1 | 17 | ['dag3']
/dag4.py | 22.99313 | 3 | 16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/dag5.py | 0.67 | 1 | 21 | ['dag5']
/dag6.py | 0.652114 | 1 | 9 | ['dag6']
/dag7.py | 0.45368 | 1 | 26 | ['dag7']
/dag8.py | 0.396908 | 5 | 40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag9.py | 0.242012 | 6 | 38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag10.py | 0.134342 | 1 | 1 | ['dag10']
/dag11.py | 0.13325 | 2 | 8 | ['dag11', 'dag12.subdag1']
/dag12.py | 0.10562 | 1 | 6 | ['dag12']
/create_subdag_functions.py | 0.105292 | 0 | 0 | []
example_http_operator.py | 0.040636 | 1 | 6 | ['example_http_operator']
example_subdag_operator.py | 0.005328 | 3 | 15 | ['example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2']
example_bash_operator.py | 0.004052 | 1 | 6 | ['example_bash_operator']
example_branch_operator.py | 0.003444 | 1 | 11 | ['example_branch_operator']
example_branch_python_dop_operator_3.py | 0.003418 | 1 | 3 | ['example_branch_dop_operator_v3']
example_passing_params_via_test_command.py | 0.003222 | 1 | 2 | ['example_passing_params_via_test_command']
example_skip_dag.py | 0.002386 | 1 | 8 | ['example_skip_dag']
example_trigger_controller_dag.py | 0.002386 | 1 | 1 | ['example_trigger_controller_dag']
example_short_circuit_operator.py | 0.002344 | 1 | 6 | ['example_short_circuit_operator']
example_python_operator.py | 0.002218 | 1 | 6 | ['example_python_operator']
example_latest_only.py | 0.002196 | 1 | 2 | ['latest_only']
example_latest_only_with_trigger.py | 0.001848 | 1 | 5 | ['latest_only_with_trigger']
example_xcom.py | 0.001722 | 1 | 3 | ['example_xcom']
docker_copy_data.py | 0.001718 | 0 | 0 | []
example_trigger_target_dag.py | 0.001704 | 1 | 2 | ['example_trigger_target_dag']
tutorial.py | 0.00165 | 1 | 3 | ['tutorial']
test_utils.py | 0.001376 | 1 | 1 | ['test_utils']
example_docker_operator.py | 0.00103 | 0 | 0 | []
subdags/subdag.py | 0.001016 | 0 | 0 | []
-------------------------------------------------------------------------------------------------------+--------------------+---------+----------+--------------------------------------------------
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
file | duration | dag_num | task_num | dags
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/dag1.py | 74.819988 | 1 | 21 | ['dag1']
/dag3.py | 53.193430000000006 | 1 | 17 | ['dag3']
/dag8.py | 34.535742 | 5 | 40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag4.py | 21.543944000000003 | 6 | 38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag5.py | 18.458316000000003 | 3 | 16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/create_subdag_functions.py | 14.652806000000002 | 0 | 0 | []
/dag7.py | 13.051984000000001 | 2 | 8 | ['dag11', 'dag11.subdag1']
/dag8.py | 10.02703 | 1 | 21 | ['dag5']
/dag9.py | 9.834226000000001 | 1 | 1 | ['dag10']
/dag10.py | 9.575258000000002 | 1 | 28 | ['dag2']
/dag11.py | 9.418897999999999 | 1 | 9 | ['dag6']
/dag12.py | 9.319210000000002 | 1 | 6 | ['dag12']
/dag13.py | 8.686964 | 1 | 26 | ['dag7']
Примечание: для краткости извлечены примеры DAG из второго выхода
Выход cProfile from airflow.models import DagBag; DagBag()
:
{{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{__init__.py:51}} INFO - Using executor SequentialExecutor
{{models.py:273}} INFO - Filling up the DagBag from
ncalls tottime percall cumtime percall filename:lineno(function)
997 443.441 0.445 443.441 0.445 {built-in method io.open}
198 186.978 0.944 483.629 2.443 zipfile.py:198(is_zipfile)
642 65.069 0.101 65.069 0.101 {method 'close' of '_io.BufferedReader' objects}
1351 45.924 0.034 45.946 0.034 <frozen importlib._bootstrap_external>:830(get_data)
7916 39.403 0.005 39.403 0.005 {built-in method posix.stat}
2/1 22.927 11.464 544.419 544.419 dag_processing.py:220(list_py_file_paths)
33 18.992 0.576 289.797 8.782 models.py:321(process_file)
22 8.723 0.397 8.723 0.397 {built-in method posix.scandir}
412 2.379 0.006 2.379 0.006 {built-in method posix.listdir}
9 1.301 0.145 3.058 0.340 linecache.py:82(updatecache)
1682/355 0.186 0.000 0.731 0.002 sre_parse.py:470(_parse)
1255 0.183 0.000 0.183 0.000 {built-in method marshal.loads}
3092/325 0.143 0.000 0.647 0.002 sre_compile.py:64(_compile)
59 0.139 0.002 0.139 0.002 {built-in method builtins.compile}
25270 0.134 0.000 0.210 0.000 sre_parse.py:253(get)
52266 0.132 0.000 0.132 0.000 {method 'append' of 'list' objects}
4210/4145 0.131 0.000 1.760 0.000 {built-in method builtins.__build_class__}
Падение производительности воздушного потока: