Apache Airflow: после указания airflow.cfg на postgres он все еще пытается работать на MySQL - PullRequest
0 голосов
/ 30 декабря 2018

Я использую Apache-airflow2 .Мои дагы до сих пор работали на LocalExecutor без сбоев.Теперь я хочу увеличить его и использовать CeleryExecutor (я все еще делаю это на моем локальном Mac). Я настроил его для работы на CeleryExecutor, и при запуске сервера в журнале отображается CeleryExecutor.Но всякий раз, когда я запускаю airflow работника (то есть то же самое можно использовать в качестве работника) или airflow flower, я сталкиваюсь с ошибкой, когда он пытается подключиться к Mysqlite и завершается неудачно, когда не находит модуль.

Я настроил rabbitMQ на локальный и воздушный поток на virtualenv.Найдите ниже обновленные строки в файле airflow.cfg:

broker_url = amqp://myuser:mypassword@localhost/myvhost
result_backend = db+postgresql://localhost:5433/celery_space?user=celery_user&password=celery_user
sql_alchemy_conn = postgresql://localhost:5433/postgres?user=postgres&password=root
executor = CeleryExecutor

Ниже приведена ошибка рабочего:

 -------------- celery@superadmins-MacBook-Pro.local v4.2.1 (windowlicker)
---- **** ----- 
--- * ***  * -- Darwin-18.2.0-x86_64-i386-64bit 2018-12-30 09:15:00
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x10c682fd0
- ** ---------- .> transport:   sqla+mysql://airflow:airflow@localhost:3306/airflow
- ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . airflow.executors.celery_executor.execute_command

[2018-12-30 09:15:00,290: INFO/MainProcess] Connected to sqla+mysql://airflow:airflow@localhost:3306/airflow
[2018-12-30 09:15:00,304: CRITICAL/MainProcess] Unrecoverable error: ModuleNotFoundError("No module named 'MySQLdb'")
Traceback (most recent call last):
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 317, in start
    blueprint.start(self)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start
    c.connection, on_decode_error=c.on_decode_error,
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer
    **kw
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/messaging.py", line 386, in __init__
    self.revive(self.channel)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/messaging.py", line 408, in revive
    self.declare()
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/messaging.py", line 421, in declare
    queue.declare()
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/entity.py", line 608, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/entity.py", line 617, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/entity.py", line 652, in queue_declare
    nowait=nowait,
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
    self._new_queue(queue, **kwargs)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
    self._get_or_create(queue)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
    obj = self.session.query(self.queue_cls) \
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
    _, Session = self._open()
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
    engine = self._engine_from_config()
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
    return create_engine(conninfo.hostname, **transport_options)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/sqlalchemy/engine/__init__.py", line 425, in create_engine
    return strategy.create(*args, **kwargs)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py", line 81, in create
    dbapi = dialect_cls.dbapi(**dbapi_args)
  File "/Users/deepaksaroha/Desktop/apache_2.0/nb-atom-airflow/lib/python3.7/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 102, in dbapi
    return __import__('MySQLdb')
ModuleNotFoundError: No module named 'MySQLdb'
[2018-12-30 09:15:00,599] {__init__.py:51} INFO - Using executor SequentialExecutor
Starting flask

Пожалуйста, дайте знать, как рабочий может работать синхронно с настройками airflow.cfg.Спасибо за любую помощь, дайте мне знать, если понадобится что-нибудь еще из журнала или файла конфигурации.

...