Протестировать Apache Airflow DAG, когда он уже запланирован и работает? - PullRequest
0 голосов
/ 18 января 2019

Я выполнил следующую тестовую команду:

airflow test events {task_name_redacted} 2018-12-12

... и получил следующий вывод:

Dependencies not met for <TaskInstance: events.{redacted} 2018-12-12T00:00:00+00:00 [None]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (16) for this task's DAG 'events' has been reached.
[2019-01-17 19:47:48,978] {models.py:1556} WARNING - 
--------------------------------------------------------------------------------
FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 6. State set to NONE.
--------------------------------------------------------------------------------

[2019-01-17 19:47:48,978] {models.py:1559} INFO - Queuing into pool None

Мой Airflow настроен с максимальным параллелизмом 16. Означает ли это, что я не могу проверить задачу, когда группа обеспечения доступности баз данных работает в данный момент и использовала все ее слоты для задач?

Кроме того, это было немного неясно из документации, но действительно ли airflow test выполняет задачу, как если бы это был SparkSubmitOperator, он фактически отправил бы задание?

1 Ответ

0 голосов
/ 18 января 2019

Хотя я еще не достиг той фазы развертывания, где параллелизм будет иметь значение, документы дают довольно хорошее представление о проблеме под рукой

  1. Так как в любой момент времени работает только один scheduler вы не должны запускать несколько раз в любом случае ), действительно, похоже, что независимо от того, DAG - выполняется , работает - выполняется или тест - выполняется , это ограничение будет применяться к ним коллективно. Так что это, безусловно, препятствие.

    # Количество экземпляров задач, которые могут запускаться одновременно планировщиком

    dag_concurrency = 16


  1. Но учтите, что просто увеличивая это число (при условии, что у вас достаточно большие ящики для здоровенных worker с / несколько worker с), некоторые другие конфигурации должны быть настроены как ну, чтобы достичь такого параллелизма, я чувствую, что вы хотите.

    Все они перечислены в [core] разделе

    # Количество параллелизма в качестве параметра для исполнителя. это определяет максимальное количество экземпляров задач, которые должны быть запущены одновременно на этой установке воздушного потока

    параллелизм = 32

    # Когда пулы не используются, задачи запускаются в «пуле по умолчанию», чьи Размер руководствуется этим элементом конфигурации

    non_pooled_task_slot_count = 128

    # Максимальное количество активных прогонов DAG на одну DAG

    max_active_runs_per_dag = 16


  1. Но мы все еще не там, потому что, как только вы создадите столько задач одновременно, бэкэнд metadata -db начнет задыхаться. Хотя это, скорее всего, незначительная проблема (и может не повлиять, если у вас нет действительно огромных DAG с / очень больших * Variable взаимодействий в ваших задачах), ее все же стоит отметить как потенциальную контрольно-пропускной пункт

    # Размер пула SqlAlchemy - это максимальное количество базы данных. соединения в бассейне. 0 означает отсутствие ограничений.

    sql_alchemy_pool_size = 5

    # Пул SqlAlchemy recycle - это количество секунд соединения может быть простаивающим в пуле до его аннулирования. Этот конфиг не применить к sqlite. Если количество подключений к БД превышено, меньшее значение конфигурации позволит системе быстрее восстановиться.

    sql_alchemy_pool_recycle = 1800

    # Сколько секунд повторять попытку восстановления соединения с БД после отсоединяется. Установка этого значения в 0 отключает повторные попытки.

    sql_alchemy_reconnect_timeout = 300


  1. Само собой разумеется, все это в значительной степени бесполезно, если вы не выберете правильное executor; SequentialExecutor, в частности, предназначен только для тестирования

    # Класс исполнителя, который должен использовать поток воздуха. Выбор включает SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor

    executor = SequentialExecutor


  1. Но тогда params для BaseOperator вроде depends_on_past, wait_for_downstream также могут испортить партию

  1. Наконец, я оставляю вас с этой ссылкой, связанной с комбинацией Airflow + Spark: Как отправить задания Spark в кластер EMR из Airflow?

(Простите, если ответ смутил вас больше, чем вы уже были, но ..)

...