Невозможно запустить планировщик воздушного потока - PullRequest
1 голос
/ 30 мая 2019

Я недавно установил поток воздуха на сервере AWS, используя это руководство для Ubuntu 16.04.После мучительной и успешной установки запустился веб-сервер.Я попробовал образец дампа следующим образом

from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
from airflow import DAG
import airflow


# DEFAULT ARGS
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': False}


dag = DAG('init_run', default_args=default_args, description='DAG SAMPLE',
schedule_interval='@daily')


def print_something():
        print("HELLO AIRFLOW!")


with dag:
        task_1 = PythonOperator(task_id='do_it', python_callable=print_something)
        task_2 = DummyOperator(task_id='dummy')

        task_1 << task_2

Но когда я открываю пользовательский интерфейс, задачи в дамбе все еще находятся в «Нет статуса» независимо от того, сколькораз я запускаю вручную или обновляю страницу.

Позже я обнаружил, что планировщик воздушного потока не работает и показывает следующую ошибку:

{celery_executor.py:228} ERROR - Error sending Celery task:No module named 'MySQLdb'
Celery Task ID: ('init_run', 'dummy', datetime.datetime(2019, 5, 30, 18, 0, 24, 902499, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 118, in send_task_to_executor
    result = task.apply_async(args=[command], queue=queue)
  File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 535, in apply_async
    **options
  File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 728, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 552, in send_task_message
    **properties
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 510, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 121, in maybe_declare
    return _maybe_declare(entity, channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 145, in _maybe_declare
    entity.declare(channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 608, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 617, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 652, in queue_declare
    nowait=nowait,
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
    self._new_queue(queue, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
    self._get_or_create(queue)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
    obj = self.session.query(self.queue_cls) \
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
    _, Session = self._open()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
    engine = self._engine_from_config()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
    return create_engine(conninfo.hostname, **transport_options)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/__init__.py", line 443, in create_engine
    return strategy.create(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py", line 87, in create
    dbapi = dialect_cls.dbapi(**dbapi_args)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 104, in dbapi
    return __import__("MySQLdb")
ModuleNotFoundError: No module named 'MySQLdb'

Вот настройка в файле конфигурации (воздушный поток.cfg):

sql_alchemy_conn = postgresql+psycopg2://airflow@localhost:5432/airflow
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result_backend =  db+postgresql://airflow:airflow@localhost/airflow

Я боролся с этой проблемой уже два дня, пожалуйста, помогите

Ответы [ 2 ]

0 голосов
/ 31 мая 2019

Я думаю, что пример, который вы используете, не говорил вам установить mysql, и кажется, что вы используете его в URL брокера.

Вы можете установить MySQL и настроить его.(для python 3.5+)

pip install mysqlclient

В качестве альтернативы , для быстрого исправления.Вы также можете использовать rabbit MQ (Rabbitmq - брокер сообщений, который вам потребуется для повторного запуска пакетов с воздушным потоком с сельдереем). Вход для гостевого пользователя

, и тогда ваш broker_url будет

broker_url = amqp://guest:guest@localhost:5672//

, если нетуже установлен, Rabbitmq можно установить с помощью следующей команды.

sudo apt install rabbitmq-server

Изменить конфигурацию NODE_IP_ADDRESS = 0.0.0.0 в файле конфигурации, расположенном по адресу

/etc/rabbitmq/rabbitmq-env.conf

Запустить службу RabbitMQ

sudo service rabbitmq-server start
0 голосов
/ 31 мая 2019

В вашем airflow.cfg также должна быть опция конфигурации для celery_result_backend. Можете ли вы сообщить нам, что это значение установлено? Если он отсутствует в вашей конфигурации, установите для него то же значение, что и result_backend

т.е:

celery_result_backend =  db+postgresql://airflow:airflow@localhost/airflow

А затем перезапустите стек воздушного потока, чтобы убедиться, что изменения конфигурации применяются.

(Я хотел оставить это как комментарий, но у меня недостаточно представителей для этого)

...