Как правильно подключить Airflow, настроенный на AWS EC2, к RDS? - PullRequest
0 голосов
/ 30 сентября 2019

У меня есть экземпляр воздушного потока на EC2, на котором работает веб-сервер / планировщик. Я хочу подключить экземпляр MySQL RDS в качестве базы данных метаданных бэкенда, а не собственный SQLite. Я заменил одну строку в Airflow.cfg, которая подключается к базе данных через sql_alchemy для подключения к RDS с драйвером pymysql:

#sql_alchemy_conn = sqlite:////home/cloud-user/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://admin:<PASSWORD>@airflow-db.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com:3306/airflow

Кажется, что соединение работает нормально, и я могу попасть в экземпляр RDSи запрашивать таблицы через клиент MySQL, настроенный на моем экземпляре EC2.

Когда я включаю или выключаю DAG, я получаю эту неприятную трассировку стека Python в своей оболочке:

[2019-09-30 14:00:51,774] {app.py:1891} ERROR - Exception on /admin/airflow/paused [POST]
Traceback (most recent call last):
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 2446, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1820, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1935, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/airflow/www/utils.py", line 279, in wrapper
    session.commit()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
    self.transaction.commit()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
    self._prepare_impl()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
    self.session.flush()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2459, in flush
    self._flush(objects)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2557, in _flush
    flush_context.execute()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
    statement, params
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
    distilled_params,
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1253, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1473, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1249, in _execute_context
    cursor, statement, parameters, context
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute
    result = self._query(query)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 517, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 732, in _read_query_result
    result.read()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 1075, in read
    first_packet = self.connection._read_packet()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 684, in _read_packet
    packet.check_error()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/protocol.py", line 220, in check_error
    err.raise_mysql_exception(self._data)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/err.py", line 109, in raise_mysql_exception
    raise errorclass(errno, errval)
ProgrammingError: (pymysql.err.ProgrammingError) (1064, u"You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '[(\\'is_paused\\', u\\'true\\'), (\\'dag_id\\', u\\'test\\')]'')' at line 1")
[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (%(dttm)s, %(dag_id)s, %(task_id)s, %(event)s, %(execution_date)s, %(owner)s, %(extra)s)]
[parameters: {'task_id': None, 'extra': "[('is_paused', u'true'), ('dag_id', u'test')]", 'execution_date': None, 'event': 'paused', 'owner': 'anonymous', 'dttm': datetime.datetime(2019, 9, 30, 18, 0, 51, 768073, tzinfo=<Timezone [UTC]>), 'dag_id': u'test'}]
(Background on this error at: http://sqlalche.me/e/f405) 

Отто, что я видел в документации по Airflow, я делаю изменения правильно, и команды «airflow initdb / resetdb» выполняются без ошибок.

Я уже потратил довольно много времени на поиск этой ошибки, но естьнет четких ответов на эту проблему. Я действительно не уверен, что мне не хватает предварительных условий или мне следует использовать другой разъем?

РЕДАКТИРОВАТЬ: Я использую Python 2.7, как видно из трассировки стека. Airflow заявляет о совместимости в краткосрочной перспективе, но я вижу, что проблемы других пользователей SO исчезли после обновления до python 3.6: Ссылка на другое решение . Я попробую это и обновлю, если это будет работать.

1 Ответ

0 голосов
/ 01 октября 2019

Похоже, что решение действительно заключается в обновлении до python 3.6 с использованием виртуальной среды из-за требуемой двойственности python 2.x и 3.y с системами Linux и Airflow. В частности, я следовал этому руководству , и мои группы обеспечения доступности баз данных, похоже, успешно выполняются.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...