Как назначить одну задачу одному работнику воздушного потока? - PullRequest
0 голосов
/ 28 апреля 2019

Я новичок в Airflow и GCP.

Я использую GCP composer с воздушным потоком.У меня есть кластер kubernetes с 150 виртуальными ЦП.У меня есть DAG, которая содержит 100 задач, которые будут читать некоторые файлы из корзины GCP.Я ожидаю, что 140 задач можно назначить на 140 виртуальных ЦП, но на самом деле некоторые виртуальные ЦП занимают более 1 задачи.Что я должен сделать, чтобы заставить airflow распределить мои задачи более равномерно?

Я пытался редактировать airflow.cfg.Я изменил worker_concurrency на 1, но, похоже, бесполезно.Ниже приведены образцы для моего dag и my airflow.cfg

nodes = 140
for i in range(nodes):

    process_bash = bash_operator.BashOperator(
        task_id='140_batch_' + str(i+1),
        bash_command='python /home/airflow/gcs/data/... xxx.py)



[core]
parallelism = 300
dag_concurrency = 150
max_active_runs_per_dag = 150

[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 1
...