Планировщик воздушного потока работает нормально, с ошибкой -D - PullRequest
2 голосов
/ 05 апреля 2020

Я настроил Airflow на сервере AWS EC2 с Ubuntu 18.04, Python 3.6.9. Бэкэнд БД - 5.7.26 MySQL, я использую LocalExecutor. Настройка не более чем:

apt-get install python3 python3-pip python3-venv libmysqlclient-dev

Я устанавливаю Airflow с pip3 install apache-airflow[mysql]==1.10.9, подключаю init DB, запускаю веб-сервер, и он работает как нормально, так и с -D. Планировщик, однако, работает только при запуске на переднем плане. Попытка запустить его как демон завершается неудачно со следующей трассировкой:

Traceback (most recent call last):
    File "/usr/local/bin/airflow", line 37, in <module> args.func(args)
    File "/usr/local/lib/python3.6/dist-packages/airflow/utils/cli.py", line 75, in wrapper  return f(*args, **kwargs)
    File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 1032, in schedulerjob.run()
    File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/base_job.py", line 215, in run session.commit()
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 1036, in commit  self.transaction.commit()
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 503, in commitself._prepare_impl()
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl  self.session.flush()
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2496, in flushself._flush(objects)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2637, in _flush  transaction.rollback(_capture_exception=True)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__exc_value, with_traceback=exc_tb,
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 178, in raise_raise exception
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2597, in _flush  flush_context.execute()
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute  rec.execute(self)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute  uow,
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 213, in save_obj) in _organize_states_for_save(base_mapper, states, uowtransaction):  
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 374, in _organize_states_for_save base_mapper, uowtransaction, states
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 1602, in _connections_for_states  connection = uowtransaction.transaction.connection(base_mapper)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 313, in connection  return self._connection_for_bind(bind, execution_options)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 420, in _connection_for_bind conn = self._parent._connection_for_bind(bind, execution_options)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 432, in _connection_for_bind conn = bind._contextual_connect()
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 2251, in _contextual_connect self._wrap_pool_connect(self.pool.connect, None),
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 2285, in _wrap_pool_connect  return fn()
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 363, in connect return _ConnectionFairy._checkout(self)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 804, in _checkout  result = pool._dialect.do_ping(fairy.connection)
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 138, in do_pingdbapi_connection.ping(False)
    File "/usr/local/lib/python3.6/dist-packages/pymysql/connections.py", line 546, in ping  self._execute_command(COMMAND.COM_PING, "")
    File "/usr/local/lib/python3.6/dist-packages/pymysql/connections.py", line 771, in _execute_command  self._write_bytes(packet)
    File "/usr/local/lib/python3.6/dist-packages/pymysql/connections.py", line 711, in _write_bytesself._sock.settimeout(self._write_timeout)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268>
Traceback (most recent call last):
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 503, in <lambda> 
    File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 702, in _finalize_fairy
    File "/usr/lib/python3.6/logging/__init__.py", line 1337, in error
    File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log
    File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle
    File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers  
    File "/usr/lib/python3.6/logging/__init__.py", line 865, in handle
    File "/usr/lib/python3.6/logging/__init__.py", line 1071, in emit
    File "/usr/lib/python3.6/logging/__init__.py", line 1061, in _open
NameError: name 'open' is not defined

Я пытаюсь настроить сервер для полупроизводственных целей, так что на самом деле это блокировщик для меня. Буду признателен за любые советы.

РЕДАКТИРОВАТЬ

Я пытался использовать mysql-connector-python==8.0.18, и планировщик работал как демон, но попытка открыть даг не удалась со следующей трассировкой:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/www/utils.py", line 384, in view_func
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/www/utils.py", line 290, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/www/views.py", line 1559, in tree
    start_date=min_date, end_date=base_date, session=session)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/dag.py", line 837, in get_task_instances
    tis = tis.order_by(TaskInstance.execution_date).all()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3233, in all
    return list(self)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3389, in __iter__
    return self._execute_and_instances(context)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3414, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 982, in execute
    return meth(self, multiparams, params)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    distilled_params,
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1478, in _handle_dbapi_exception
    util.reraise(*exc_info)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute
    cursor.execute(statement, parameters)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/cursor_cext.py", line 272, in execute
    self._handle_result(result)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/cursor_cext.py", line 163, in _handle_result
    self._handle_resultset()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/cursor_cext.py", line 651, in _handle_resultset
    self._rows = self._cnx.get_rows()[0]
  File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/connection_cext.py", line 301, in get_rows
    else self._cmysql.fetch_row()
SystemError: <built-in method fetch_row of _mysql_connector.MySQL object at 0x3cdaa10> returned a result with an error set

РЕДАКТИРОВАТЬ 2

В конечном итоге сработало использование mysqlclient, указав строку подключения SQLAlchemy для начала с mysql+mysqldb. Я не даю это как ответ, потому что первоначальная проблема сохраняется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...