Я настроил Airflow на сервере AWS EC2 с Ubuntu 18.04, Python 3.6.9. Бэкэнд БД - 5.7.26 MySQL, я использую LocalExecutor. Настройка не более чем:
apt-get install python3 python3-pip python3-venv libmysqlclient-dev
Я устанавливаю Airflow с pip3 install apache-airflow[mysql]==1.10.9
, подключаю init DB, запускаю веб-сервер, и он работает как нормально, так и с -D. Планировщик, однако, работает только при запуске на переднем плане. Попытка запустить его как демон завершается неудачно со следующей трассировкой:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 37, in <module> args.func(args)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/cli.py", line 75, in wrapper return f(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 1032, in schedulerjob.run()
File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/base_job.py", line 215, in run session.commit()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 1036, in commit self.transaction.commit()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 503, in commitself._prepare_impl()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl self.session.flush()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2496, in flushself._flush(objects)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2637, in _flush transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__exc_value, with_traceback=exc_tb,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 178, in raise_raise exception
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2597, in _flush flush_context.execute()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute rec.execute(self)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute uow,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 213, in save_obj) in _organize_states_for_save(base_mapper, states, uowtransaction):
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 374, in _organize_states_for_save base_mapper, uowtransaction, states
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 1602, in _connections_for_states connection = uowtransaction.transaction.connection(base_mapper)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 313, in connection return self._connection_for_bind(bind, execution_options)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 420, in _connection_for_bind conn = self._parent._connection_for_bind(bind, execution_options)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 432, in _connection_for_bind conn = bind._contextual_connect()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 2251, in _contextual_connect self._wrap_pool_connect(self.pool.connect, None),
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 2285, in _wrap_pool_connect return fn()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 363, in connect return _ConnectionFairy._checkout(self)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 804, in _checkout result = pool._dialect.do_ping(fairy.connection)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 138, in do_pingdbapi_connection.ping(False)
File "/usr/local/lib/python3.6/dist-packages/pymysql/connections.py", line 546, in ping self._execute_command(COMMAND.COM_PING, "")
File "/usr/local/lib/python3.6/dist-packages/pymysql/connections.py", line 771, in _execute_command self._write_bytes(packet)
File "/usr/local/lib/python3.6/dist-packages/pymysql/connections.py", line 711, in _write_bytesself._sock.settimeout(self._write_timeout)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 503, in <lambda>
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 702, in _finalize_fairy
File "/usr/lib/python3.6/logging/__init__.py", line 1337, in error
File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log
File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle
File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers
File "/usr/lib/python3.6/logging/__init__.py", line 865, in handle
File "/usr/lib/python3.6/logging/__init__.py", line 1071, in emit
File "/usr/lib/python3.6/logging/__init__.py", line 1061, in _open
NameError: name 'open' is not defined
Я пытаюсь настроить сервер для полупроизводственных целей, так что на самом деле это блокировщик для меня. Буду признателен за любые советы.
РЕДАКТИРОВАТЬ
Я пытался использовать mysql-connector-python==8.0.18
, и планировщик работал как демон, но попытка открыть даг не удалась со следующей трассировкой:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 39, in reraise
raise value
File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/flask_login/utils.py", line 258, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/www/utils.py", line 384, in view_func
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/www/utils.py", line 290, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/www/views.py", line 1559, in tree
start_date=min_date, end_date=base_date, session=session)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/models/dag.py", line 837, in get_task_instances
tis = tis.order_by(TaskInstance.execution_date).all()
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3233, in all
return list(self)
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3389, in __iter__
return self._execute_and_instances(context)
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3414, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 982, in execute
return meth(self, multiparams, params)
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
distilled_params,
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
e, statement, parameters, cursor, context
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1478, in _handle_dbapi_exception
util.reraise(*exc_info)
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
cursor, statement, parameters, context
File "/home/ubuntu/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute
cursor.execute(statement, parameters)
File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/cursor_cext.py", line 272, in execute
self._handle_result(result)
File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/cursor_cext.py", line 163, in _handle_result
self._handle_resultset()
File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/cursor_cext.py", line 651, in _handle_resultset
self._rows = self._cnx.get_rows()[0]
File "/home/ubuntu/.local/lib/python3.6/site-packages/mysql/connector/connection_cext.py", line 301, in get_rows
else self._cmysql.fetch_row()
SystemError: <built-in method fetch_row of _mysql_connector.MySQL object at 0x3cdaa10> returned a result with an error set
РЕДАКТИРОВАТЬ 2
В конечном итоге сработало использование mysqlclient
, указав строку подключения SQLAlchemy для начала с mysql+mysqldb
. Я не даю это как ответ, потому что первоначальная проблема сохраняется.