Airflow с CeleryExecutor не выбирает набор конфигурации для Celery из airflow.cfg - PullRequest
0 голосов
/ 11 мая 2019

Я на некоторое время застрял с этой проблемой.

Я следовал шагам и успешно справлялся с заданиями из сельдерея, и когда я запускал только сельдерей, он работал, как и ожидалось:

-------------- celery@xxxxcxxxxx.zzzz.org v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Linux-2.6.32-573.22.1.el6.x86_64-x86_64-with-redhat-6.7-Santiago 2019-05-11 11:34:25
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fcb942e93d0
- ** ---------- .> transport:   amqp://guest:**@xx.xx.xx.xx:5672//
- ** ---------- .> results:     redis://localhost:6379/
- *** --- * --- .> concurrency: 5 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
[tasks]
  . tasks.add
  . tasks.rehi

Однако, когда я настроил то же самое в airflow.cfg, используя следующую команду: Проверьте это

Это мой airflow.cfg:

executor=CeleryExecutor

CELERY_BROKER_URL = 'amqp://guest:guest@xx.xx.xx:5672/'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/'

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/apservices/airflow/airflow.db
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@yy.yy.yy.yy:5432/airflow

Однако, когдаЯ запускаю airflow worker Его просто не выбирают конфигурации, указанные в файле airflow.cfg:

-------------- celery@xxx.xx.org v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Linux-2.6.32-573.22.1.el6.x86_64-x86_64-with-redhat-6.7-Santiago 2019-05-11 12:00:57
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7fa16cf99750
- ** ---------- .> transport:   sqla+mysql://airflow:airflow@localhost:3306/airflow
- ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> default          exchange=default(direct) key=default

Complete Stacktrace:

[2019-05-11 12:00:56,936] {configuration.py:206} ПРЕДУПРЕЖДЕНИЕ - раздел / ключ [celery / celery_ssl_active] не найден в конфигурации [2019-05-11 12: 00: 56,936] {default_celery.py:41} ПРЕДУПРЕЖДЕНИЕ - Celery Executor будет работать без SSL[2019-05-11 12: 00: 56,938] { init .py: 45} INFO - Использование исполнителя CeleryExecutor

--------------celery@xx.xx.org v4.3.0 (ревень) ---- **** ----- --- * * * - Linux-2.6.32-573.22.1.el6.x86_64-x86_64-с-redhat-6,7-Сантьяго 2019-05-11 12:00:57 - * - **** -- - ** ---------- [config] - ** ----------.> app: airflow.executors.celery_executor: 0x7fa16cf99750 - ** ----------.> транспорт: sqla + mysql: // воздушный поток: воздушный поток @ localhost: 3306 / воздушный поток - ** ----------.> результаты: mysql: // воздушный поток: @localhost: 3306 / airflow - *** --- * ---.> параллелизм: 16 (предварительная обработка) - ******* ----.> события задачи: ВЫКЛ (включить -E для мониторингазадачи в этом работнике) --- ***** ----- -------------- [queues].> default exchange = default (direct) ключ = default

[2019-05-11 12: 00: 57,160: CRITICAL / MainProcess] Неустранимая ошибка: TypeError («Недопустимые аргументы» visibility_timeout »отправлены в create_engine () с использованием конфигурации MySQLDialect_mysqldb / QueuePool / Engine.Убедитесь, что ключевые аргументы соответствуют этой комбинации компонентов. ",) Traceback (последний вызов был последним): файл" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery / worker / worker.py ", строка 205, в стартовом файле self.blueprint.start (self)" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/bootsteps.py ", строка 119, в стартовом файле step.start (parent) File" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/bootsteps.py ", строка 369,в начале верните файл self.obj.start () "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", строка 318, взапустите файл blueprint.start (self) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/bootsteps.py", строка 119, в start step.start (parent)Файл "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", строка 41, в начале c.connection, on_decode_error = c.on_decode_errorФайл "/ opt / cloudera / parcels / Anaconda-2.5.0 / lib / python2.7 / site-packages / celery / app / amqp.py ", строка 297, в файле TaskConsumer ** kw" / opt / cloudera / parcels / Anaconda-2.5.0 / lib / python2.7 / site-packages / kombu / messaging.py ", строка 386, в init файле self.revive (self.channel)" / opt / cloudera / parcels / Anaconda-2.5.0 / lib / python2.7 / site-packages / kombu / messaging.py ", строка 408, в файле revive self.declare ()" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7 / site-packages / kombu / messaging.py ", строка 421, в файле Declare queue.declare ()" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/entity.py ", строка 608, в объявлении self._create_queue (nowait = nowait, channel = channel) Файл" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/entity.py ", строка 617, в _create_queue self.queue_declare (nowait = nowait, passive = false, channel = channel) Файл" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/entity.py ", строка 652, в queue_declarenowait = nowait, Файл "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/virtual/base.py", строка 531, в queue_declare self._new_queue (queue, ** kwargs) Файл "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py", строка 82,в файле _new_queue self._get_or_create (queue) "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py",строка 70, в _get_or_create obj = self.session.query (self.queue_cls) \ File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/ init .py ", строка 65, в сеансе _, файл Session = self._open ()" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py ", строка 56, в _open engine = self._engine_from_config () Файл" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages / комбу / транспорт / SQLAlchemy / INIT .py»,строка 51, в _engine_from_config возвращает файл create_engine (conninfo.hostname, ** transport_options) "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/sqlalchemy/engine/init .py ", строка 386, в файле create_engine return Strategy.create (* args, ** kwargs) Файл" /opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/sqlalchemy/engine / стратегия.py ", строка 144, в create engineclass. name )) TypeError: Недопустимые аргументы (аргументы) 'visibility_timeout' отправлены в create_engine () с использованием конфигурации MySQLDialect_mysqldb / QueuePool / Engine.Пожалуйста, убедитесь, что ключевые аргументы подходят для этой комбинации компонентов.[2019-05-11 12: 00: 57,898] {configuration.py:206} ПРЕДУПРЕЖДЕНИЕ - раздел / ключ [celery / celery_ssl_active] не найден в конфигурации [2019-05-11 12: 00: 57,898] {default_celery.py:41} ПРЕДУПРЕЖДЕНИЕ - Celery Executor будет работать без SSL [2019-05-11 12: 00: 57,900] { init .py: 45} ИНФОРМАЦИЯ - Использование исполнителя CeleryExecutor Стартовая фляга [2019-05-11 12:00: 58,083] {_internal.py:122} INFO - * Работает на http://0.0.0.0:8793/ (нажмите CTRL + C, чтобы выйти)

Я не беспокоюсь о TypeError: Invalid argument(s) 'visibility_timeout' сТеперь, когда я уверен, что конфигурации, переданные в airflow.conf, только поднимаются.

Дайте мне знать, что мне не хватает, любая помощь очень ценится!

Ура!

...