Нужна помощь в настройке BROKER_URL в настройках Airflow и Celery Executor - PullRequest
0 голосов
/ 15 февраля 2019

Сводка

Я впервые использую Apache-Airflow.Я заставил работать веб-сервер, SequentialExecutor и LocalExecutor, но у меня возникают проблемы при использовании CeleryExecutor с rabbitmq-сервером.В настоящее время у меня есть два экземпляра AWS EC2.

Ошибка

Подводя итог: Мой работник не может подключиться к rabbitmq-серверу на моем узле планировщика.Всякий раз, когда я запускаю airflow worker на рабочем экземпляре, он выдает:

- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7f53a8dce400
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> default          exchange=default(direct) key=default


[2019-02-15 02:26:23,742: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.

Конфигурация

Я следовал всем указаниям, которые мог найти в Интернете.Оба экземпляра имеют один и тот же файл airflow.cfg, где

[core]
executor = CeleryExecutor

[celery]
broker_url = pyamqp://username:password@hostname:port/virtual_host

и result_backend указывают на одну и ту же базу данных MySQL на RDS, с которой работает воздушный поток.

Из того, что я могу сказать, нетнесмотря ни на что, рабочий узел всегда пытался подключиться к локальному rabbitmq-серверу и полностью игнорировал этот broker_url в моем файле airflow.cfg.

Что я пробовал

Я запутался в исходном коде и заметил в celery/app/base.py, если я выхожу из системы с ошибками, он попадает в _get_config() при созданиисоединение, на самом деле в словаре возвращено ДВА значения.

BROKER_URL = None
broker_url = pyamqp://username:password@hostname:port/virtual_host

и вся логика соединения, кажется, указывает на клавишу BROKER_URL.

Я попытался установить BROKER_URL и CELERY_BROKER_URL в airflow.cfg, но, похоже, он не учитывает регистр и игнорирует последнее.Чтобы посмотреть, сработает ли это, я изменил метод _get_config() и взломал:

s['BROKER_URL'] = s['broker_url']
return s

И, как я и ожидал, все начало работать.

Я что-то не так делаю?Я бы действительно не хотел использовать этот хак, но я не могу понять, почему он игнорирует значения конфигурации.

Спасибо!

...