Как запускать задачи параллельно в apache Airflow - PullRequest
0 голосов
/ 07 сентября 2018

Я выполнил следующую метку в Airflow, enter image description here

При выполнении вышеупомянутого Дага, он будет последовательно запускать один из следующих порядков.

A -> B -> C1 -> C2 -> D1 -> D2

A -> B -> C2 -> C1 -> D2 -> D1

но мое требование - запускать задачи C1 и C2 параллельно. Часть моего airflow.cfg

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
#executor = LocalExecutor

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30

# Secret key used to run your flask app
secret_key = temporary_key

# Number of workers to run the Gunicorn web server
workers = 4

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2

Ответы [ 3 ]

0 голосов
/ 07 сентября 2018

Кажется, это проблема конфигурации. Из вашей конфигурации я вижу, что исполнителем является CeleryExecutor. Проверьте базу данных и компоненты брокера сообщений.

Если они не настроены для параллельной работы, ваши задачи также не будут выполняться в parellel.

0 голосов
/ 19 июня 2019

добавить параллелизм = x (где x больше, чем 1) в свойствах dag.

max_active_runs - параллелизм dag. параллелизм - это параллелизм задач.

пример:

    dag = DAG(
    dag_id,
    default_args=default_args,
    schedule_interval='00 03 * * *',
    max_active_runs=2,
    concurrency=2)
0 голосов
/ 07 сентября 2018

Если вы просто тестируете его на одной машине, я предлагаю использовать LocalExecutor. SequentialExecutor запускает задачи последовательно, а CeleryExecutor потребуется кластер машин, на котором работает брокер сообщений.

Кроме того, когда вы используете LocalExecutor, вы должны использовать метабазу, отличную от sqlite, так как sqlite не поддерживает параллельное чтение. Таким образом, вы можете использовать Postgres или MySQL и соответственно изменять sql_alchemy_conn в airflow.cfg файле.

Читать это: https://airflow.apache.org/howto/initialize-database.html

«LocalExecutor», исполнитель, который может распараллеливать экземпляры задач локально.

...