Я оцениваю Airflow 1.9.0 для наших потребностей распределенной оркестровки (используя CeleryExecutor и RabbitMQ), и я вижу что-то странное.
Я сделал даг, состоящий из трех этапов:
- начать
- одновременно разворачивает и запускает N задач
- отделка
N может быть большим, может быть до 10K. Я ожидаю увидеть N задач, сброшенных в очередь Rabbit, когда начнется этап 2. Вместо этого я вижу только несколько сотен , добавленных за один раз. По мере того, как рабочие обрабатывают задачи, а очередь уменьшается, в сельдерей / кролик добавляется больше. В конце концов, это действительно завершается, однако я бы предпочел, чтобы он сразу же сбрасывал ВСЕ работы (все задачи по 10 КБ) в Celery по двум причинам:
Текущий способ делает планировщик долгоживущим и сохраняющим состояние. Планировщик может умереть только после завершения 5K, в этом случае оставшиеся 5K задачи никогда не будут добавлены (я проверял это)
Я хочу использовать размер очереди Кролика в качестве метрики для запуска событий автомасштабирования, чтобы добавить больше работников. Поэтому мне нужна точная картина того, сколько выдающихся работ осталось (10 тысяч, а не несколько сотен)
Я предполагаю, что у планировщика есть какой-то дроссель, который удерживает его от выгрузки всех 10K-сообщений одновременно? Если это так, это настраивается?
К вашему сведению Я уже установил «параллелизм» на 10K в потоке воздуха. Cfg
Вот мой тестовый даг:
# This dag tests how well airflow fans out
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
num_tasks = 10000
starting = BashOperator(
task_id='starting',
bash_command='echo starting',
dag=dag
)
all_done = BashOperator(
task_id='all_done',
bash_command='echo all done',
dag=dag)
for i in range(0, num_tasks):
task = BashOperator(
task_id='say_hello_' + str(i),
bash_command='echo hello world',
dag=dag)
task.set_upstream(starting)
task.set_downstream(all_done)