Как настроить DaskExecuter в Apache Airflow - PullRequest
0 голосов
/ 10 октября 2018

Я хочу настроить Dask для распространения DAG в Airflow.Я прочитал https://airflow.apache.org/howto/executor/use-dask.html и https://distributed.readthedocs.io/en/latest/,, но я не понимаю, как это работает.У меня есть два сервера Apache Aiflow, где запускать dask-shedulers и dask-worker, чтобы при падении первого сервера все автоматически работало на втором?Как я понял Airflow ставил задачи в даске-шедулере.Я не понимаю, как подружиться с dask-планировщиками на двух серверах.Я не понимаю, зачем нужны работники dask и что это делает, я думаю, что это какой-то ненужный компонент.

Я не хочу использовать CeleryExecutor и настраивать RabbitMQ или Redis для Celery

Ответы [ 3 ]

0 голосов
/ 13 мая 2019

Вы можете найти два лучших описания для запуска и запуска Airflow + Dask: https://www.alibabacloud.com/blog/schedule-data-lake-analytics-tasks-by-using-airflow_594183 или https://tech.marksblogg.com/install-and-configure-apache-airflow.html. В частности, первая ссылка мне очень помогла.И работает планировщик Airflow + веб-сервер + DASK-планировщик в порядке.Однако, как только я запускаю Airflow worker + dask-worker, работник Airflow выходит и жалуется на то, что не активирован Celery:

ModuleNotFoundError: No module named 'celery'

И когда я запускаю dask-worker без Airflow worker, все кажется работающимхорошо, пока я не вызову DAG:

worker_1     | [2019-05-12 20:47:05,527] {__init__.py:51} INFO - Using executor DaskExecutor
worker_1     | usage: airflow [-h]
worker_1     |                {backfill,list_dag_runs,list_tasks,clear,pause,unpause,trigger_dag,delete_dag,pool,variables,kerberos,render,run,initdb,list_dags,dag_state,task_failed_deps,task_state,serve_logs,test,webserver,resetdb,upgradedb,scheduler,worker,flower,version,connections,create_user,delete_user,list_users,sync_perm,next_execution,rotate_fernet_key}
worker_1     |                ...
worker_1     | airflow: error: the following arguments are required: subcommand
worker_1     | distributed.worker - WARNING -  Compute Failed
worker_1     | Function:  airflow_run
worker_1     | args:      ()
worker_1     | kwargs:    {}
worker_1     | Exception: CalledProcessError(2, ['airflow', 'run', 'example_python_operator', 'print_the_context', '2019-05-12T20:47:02.111022+00:00', '--pickle', '13', '--local', '-sd', '/opt/airflow/dags/python_exec.py'])
worker_1     |
webserver_1  | [2019-05-12 20:47:06 +0000] [37] [INFO] Handling signal: ttin
webserver_1  | [2019-05-12 20:47:06 +0000] [744] [INFO] Booting worker with pid: 744
webserver_1  | [2019-05-12 20:47:06,299] {dask_executor.py:77} ERROR - Failed to execute task: CalledProcessError(2, ['airflow', 'run', 'example_python_operator', 'print_the_context', '2019-05-12T20:47:02.111022+00:00', '--pickle', '13', '--local', '-sd', '/opt/airflow/dags/python_exec.py'])

Любой намек, чтобы это исправить?

0 голосов
/ 21 августа 2019

Я понял это после многих исследований.Оказывается, это проблема с Dask Executor (https://issues.apache.org/jira/browse/AIRFLOW-4494). Я применил исправление к образу докера, который я использую для запуска воздушного потока, и исправил его! Исправление запланировано на следующий выпуск.

RUN cd /usr/local/lib/python3.6/site-packages/airflow/executors && \
    sed -i "s@return subprocess.check_call(command, shell=True, close_fds=True)@return subprocess.check_call(command, close_fds=True)@g" dask_executor.py
0 голосов
/ 06 декабря 2018

Планировщик запускается на одном сервере, а не на двух.У меня на одной машине просто планировщик воздушного потока и планировщик даска.В конфиге воздушного потока для dask scheduler у меня localhost: 8786.Затем на других машинах вы запускаете dask worker и даете ему ip-адрес и порт вашего планировщика.Отправьте задание через поток воздуха, и рабочие подберут его, если вы все сделали правильно.

...