Проблемы производительности ECS Airflow 1.10.2.Операторы и задачи занимают в 10 раз больше времени - PullRequest
2 голосов
/ 03 апреля 2019

Мы перешли на puckel / Airflow-1.10.2, чтобы попытаться устранить низкую производительность, которую мы имели в разных средах.Мы работаем на ECS Airflow 1.10.2 на AWS ECS.Интересно, что CPU / mem никогда не превышают 80%.Метадоб Airflow также остается недостаточно используемым.

Ниже я перечислил используемую нами конфигурацию, время разбора DagBag плюс подробное время выполнения из вывода cProfile просто запуска DagBag() в чистом видеPython.

Некоторые из наших групп DAG импортируют функцию из create_subdag_functions.py, которая возвращает группу DAG, которую мы используем в 12 группах DAG.Большинство из этих групп доступности баз данных и соответствующих им подкадров выполняется только в час, но 1 подгруппы DAG / 3 запускаются каждые 10 минут.

max_threads = 2
dag_dir_list_interval = 300 
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor

Некоторые наблюдения:

  • airflow list_dags -r занимаеттоже очень долго и запускает примеры DAG, даже если они отключены.Время разбора каждого DAG будет прыгать.
  • Длительность каждого DAG является непоследовательной (но она применяется только к нашим DAG, а не к примерам)
  • Обычно в это время происходит большой скачок.например, 5 дагов будут иметь длительность <1, а затем следующие 4 будут иметь длительности 20+ </li>
  • Когда мы профилировали функцию DagBag() с помощью cProfile, мы обнаружили, что DagBag () большую часть своего времени проводил в airflow.utils.dag_processing.list_py_pathsЭта функция, вероятно, из-за 50+ файлов sql в нашей папке / usr / local / airflow / dags
  • При просмотре времен посадки время выполнения задачи между двумя конкретными прогонами увеличилось на порядок.Я пытался просматривать журналы и т. Д., И между двумя прогонами нет ничего примечательного.Я прикрепил изображение внизу.Эта потеря производительности была в Airflow 1.10.0

Решения, которые я пробовал:

  • увеличение / уменьшение max_threads
  • увеличение / устранение min_file_process_interval
  • очистка базы данных воздушного потока от всех групп обеспечения доступности баз данных и перезагрузка
  • выключение и повторное развертывание среды

DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 189.77048399999995
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
/dag1.py                                    | 60.576728          |       1 |       21 | ['dag1']
/dag2.py                                    | 55.092603999999994 |       1 |       28 | ['dag2']
/dag3.py                                    | 47.997972000000004 |       1 |       17 | ['dag3']
/dag4.py                                    | 22.99313           |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/dag5.py                                    | 0.67               |       1 |       21 | ['dag5']
/dag6.py                                    | 0.652114           |       1 |        9 | ['dag6']
/dag7.py                                    | 0.45368            |       1 |       26 | ['dag7']
/dag8.py                                    | 0.396908           |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag9.py                                    | 0.242012           |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag10.py                                   | 0.134342           |       1 |        1 | ['dag10']
/dag11.py                                   | 0.13325            |       2 |        8 | ['dag11', 'dag12.subdag1']
/dag12.py                                   | 0.10562            |       1 |        6 | ['dag12']
/create_subdag_functions.py                 | 0.105292           |       0 |        0 | []
 example_http_operator.py                   | 0.040636           |       1 |        6 | ['example_http_operator']
 example_subdag_operator.py                 | 0.005328           |       3 |       15 | ['example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2']
 example_bash_operator.py                   | 0.004052           |       1 |        6 | ['example_bash_operator']
 example_branch_operator.py                 | 0.003444           |       1 |       11 | ['example_branch_operator']
 example_branch_python_dop_operator_3.py    | 0.003418           |       1 |        3 | ['example_branch_dop_operator_v3']
 example_passing_params_via_test_command.py | 0.003222           |       1 |        2 | ['example_passing_params_via_test_command']
 example_skip_dag.py                        | 0.002386           |       1 |        8 | ['example_skip_dag']
 example_trigger_controller_dag.py          | 0.002386           |       1 |        1 | ['example_trigger_controller_dag']
 example_short_circuit_operator.py          | 0.002344           |       1 |        6 | ['example_short_circuit_operator']
 example_python_operator.py                 | 0.002218           |       1 |        6 | ['example_python_operator']
 example_latest_only.py                     | 0.002196           |       1 |        2 | ['latest_only']
 example_latest_only_with_trigger.py        | 0.001848           |       1 |        5 | ['latest_only_with_trigger']
 example_xcom.py                            | 0.001722           |       1 |        3 | ['example_xcom']
 docker_copy_data.py                        | 0.001718           |       0 |        0 | []
 example_trigger_target_dag.py              | 0.001704           |       1 |        2 | ['example_trigger_target_dag']
 tutorial.py                                | 0.00165            |       1 |        3 | ['tutorial']
 test_utils.py                              | 0.001376           |       1 |        1 | ['test_utils']
 example_docker_operator.py                 | 0.00103            |       0 |        0 | []
 subdags/subdag.py                          | 0.001016           |       0 |        0 | []
-------------------------------------------------------------------------------------------------------+--------------------+---------+----------+--------------------------------------------------
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
file                          | duration           | dag_num | task_num | dags
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/dag1.py                      | 74.819988          |       1 |       21 | ['dag1']
/dag3.py                      | 53.193430000000006 |       1 |       17 | ['dag3']
/dag8.py                      | 34.535742          |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag4.py                      | 21.543944000000003 |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag5.py                      | 18.458316000000003 |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/create_subdag_functions.py   | 14.652806000000002 |       0 |        0 | []
/dag7.py                      | 13.051984000000001 |       2 |        8 | ['dag11', 'dag11.subdag1']
/dag8.py                      | 10.02703           |       1 |       21 | ['dag5']
/dag9.py                      | 9.834226000000001  |       1 |        1 | ['dag10']
/dag10.py                     | 9.575258000000002  |       1 |       28 | ['dag2']
/dag11.py                     | 9.418897999999999  |       1 |        9 | ['dag6']
/dag12.py                     | 9.319210000000002  |       1 |        6 | ['dag12']
/dag13.py                     | 8.686964           |       1 |       26 | ['dag7']

Примечание: для краткости извлечены примеры DAG из второго выхода

Выход cProfile from airflow.models import DagBag; DagBag():

{{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{__init__.py:51}} INFO - Using executor SequentialExecutor
{{models.py:273}} INFO - Filling up the DagBag from 

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      997  443.441    0.445  443.441    0.445 {built-in method io.open}
      198  186.978    0.944  483.629    2.443 zipfile.py:198(is_zipfile)
      642   65.069    0.101   65.069    0.101 {method 'close' of '_io.BufferedReader' objects}
     1351   45.924    0.034   45.946    0.034 <frozen importlib._bootstrap_external>:830(get_data)
     7916   39.403    0.005   39.403    0.005 {built-in method posix.stat}
      2/1   22.927   11.464  544.419  544.419 dag_processing.py:220(list_py_file_paths)
       33   18.992    0.576  289.797    8.782 models.py:321(process_file)
       22    8.723    0.397    8.723    0.397 {built-in method posix.scandir}
      412    2.379    0.006    2.379    0.006 {built-in method posix.listdir}
        9    1.301    0.145    3.058    0.340 linecache.py:82(updatecache)
 1682/355    0.186    0.000    0.731    0.002 sre_parse.py:470(_parse)
     1255    0.183    0.000    0.183    0.000 {built-in method marshal.loads}
 3092/325    0.143    0.000    0.647    0.002 sre_compile.py:64(_compile)
       59    0.139    0.002    0.139    0.002 {built-in method builtins.compile}
    25270    0.134    0.000    0.210    0.000 sre_parse.py:253(get)
    52266    0.132    0.000    0.132    0.000 {method 'append' of 'list' objects}
4210/4145    0.131    0.000    1.760    0.000 {built-in method builtins.__build_class__}

Падение производительности воздушного потока:

Airflow performance drop

...