Я хочу использовать MySQL в качестве внутренней базы данных для воздушного потока apache после установки зависимостей при запуске
airflow initdb
Airflow начинает настройку базы данных, но затем происходит сбой со следующим журналом
shahbaz@OpenSource:~$ airflow initdb
[2019-07-11 12:01:13,726] {settings.py:182} INFO -
settings.configure_orm(): Using pool settings. pool_size=5,
pool_recycle=1800, pid=17492
[2019-07-11 12:01:13,917] {__init__.py:51} INFO - Using executor
LocalExecutor
DB: mysql+mysqldb://airflow:***@localhost:3306/airflow
[2019-07-11 12:01:14,276] {db.py:350} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl MySQLImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1,
current schema
INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 ->
1507a7289a2f, create is_encrypted
INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f ->
13eb55f81627, maintain history for compatibility with earlier
migrations
INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 ->
338e90f54d61, More logging into task_instance
INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 ->
52d714495f0, job_id indices
INFO [alembic.runtime.migration] Running upgrade 52d714495f0 ->
502898887f84, Adding extra to Log
INFO [alembic.runtime.migration] Running upgrade 502898887f84 ->
1b38cef5b76e, add dagrun
INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e ->
2e541a1dcfed, task_duration
INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed ->
40e67319e3a9, dagrun_config
INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 ->
561833c1c74b, add password column to user
INFO [alembic.runtime.migration] Running upgrade 561833c1c74b ->
4446e08588, dagrun start end
INFO [alembic.runtime.migration] Running upgrade 4446e08588 ->
bbc73705a13e, Add notification_sent column to sla_miss
INFO [alembic.runtime.migration] Running upgrade bbc73705a13e ->
bba5a7cfc896, Add a column to track the encryption state of the
'Extra' field in connection
INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 ->
1968acfc09e3, add is_encrypted column to variable table
INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 ->
2e82aab8ef20, rename user table
INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 ->
211e584da130, add TI state index
INFO [alembic.runtime.migration] Running upgrade 211e584da130 ->
64de9cddf6c9, add task fails journal table
INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 ->
f2ca10b85618, add dag_stats table
INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 ->
4addfa1236f1, Add fractional seconds to mysql tables
INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 ->
8504051e801b, xcom dag task indices
INFO [alembic.runtime.migration] Running upgrade 8504051e801b ->
5e7d17757c7a, add pid field to TaskInstance
INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a ->
127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 ->
cc1e65623dc7, add max tries column to task instance
INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 ->
bdaa763e6c56, Make xcom value column a large binary
INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 ->
947454bf1dff, add ti job_id index
INFO [alembic.runtime.migration] Running upgrade 947454bf1dff ->
d2ae31099d61, Increase text size for MySQL (not relevant for other
DBs' text types)
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 ->
0e2a74e0fc9f, Add time zone awareness
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 ->
33ae817a1ff4, kubernetes_resource_checkpointing
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-
packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.")
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/shahbaz/.local/bin/airflow", line 32, in <module>
args.func(args)
File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 1096, in initdb
db.initdb(settings.RBAC)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 91, in initdb
upgradedb()
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 358, in upgradedb
command.upgrade(config, 'heads')
File "/usr/local/lib/python3.6/dist-packages/alembic/command.py", line 254, in upgrade
script.run_env()
File "/usr/local/lib/python3.6/dist-packages/alembic/script/base.py", line 427, in run_env
util.load_python_file(self.dir, 'env.py')
File "/usr/local/lib/python3.6/dist-packages/alembic/util/pyfiles.py", line 81, in load_python_file
module = load_module_py(module_id, path)
File "/usr/local/lib/python3.6/dist-packages/alembic/util/compat.py", line 83, in load_module_py
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 92, in <module>
run_migrations_online()
File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 86, in run_migrations_online
context.run_migrations()
File "<string>", line 8, in run_migrations
File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/environment.py", line 836, in run_migrations
self.get_context().run_migrations(**kw)
File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/migration.py", line 330, in run_migrations
step.migration_fn(**kw)
File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py", line 55, in upgrade
*columns_and_constraints
File "<string>", line 8, in create_table
File "<string>", line 3, in create_table
File "/usr/local/lib/python3.6/dist-packages/alembic/operations/ops.py", line 1120, in create_table
return operations.invoke(op)
File "/usr/local/lib/python3.6/dist-packages/alembic/operations/base.py", line 319, in invoke
return fn(self, operation)
File "/usr/local/lib/python3.6/dist-packages/alembic/operations/toimpl.py", line 101, in create_table
operations.impl.create_table(table)
File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 194, in create_table
self._exec(schema.CreateTable(table))
File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 118, in _exec
return conn.execute(construct, *multiparams, **params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 980, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
return connection._execute_ddl(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1042, in _execute_ddl
compiled,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 276, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, \n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), \n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), \n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error at: http://sqlalche.me/e/e3q8)
Вы видите, что команда initdb завершается неудачно для kubernetes_resource_checkpointing
, а последняя запись журнала указывает, что это произошло из-за OperationalError в sqlalchemy.
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError)
(3812, "An expression of non-boolean type specified to a check
constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE
kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true,
\n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id),
\n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
\n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error
at: http://sqlalche.me/e/e3q8)
Я хотел быздесь утверждают, что я могу запустить apache-airflow с помощью базы данных Postgres, и я использую airflow с Postgres только потому, что это странно для MySQL.
Я использую
apache-airflow версия 1.10.3
mysql версия 8.0.16 (MySQL Community Server - GPL)
Также я попытался установить SQL_MODE для MYSQL с 'ANSI', как было указано в документах airflow, но все это было вТщетно.
Любая помощь будет оцененато, что я обнаружил, я проверил файлы кода, указанные Ши Ченом, два файла ответственны за это поведение.
33ae817a1ff4_add_kubernetes_resource_checkpointing.py
86770d1215c0_add_kubernetes_scheduler_uniqueness.py
Оба файла являются файлами миграции с использованием библиотек alembic и sqlalchemy. Я обнаружил, что следующий код sqlalchemy, написанный в файле 33ae817a1ff4_add_kubernetes_resource_checkpointing.py
def upgrade():
columns_and_constraints = [
sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
sa.Column("resource_version", sa.String(255))
]
conn = op.get_bind()
# alembic creates an invalid SQL for mssql dialect
if conn.dialect.name not in ('mssql'):
columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id"))
table = op.create_table(
RESOURCE_TABLE,
*columns_and_constraints
)
op.bulk_insert(table, [
{"resource_version": ""}
])
, интерпретируется как следующий SQL-запрос, который не являетсяправильный
CREATE TABLE
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true,
resource_version VARCHAR(255), PRIMARY KEY (one_row_id),
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
CHECK (one_row_id IN (0, 1))
Вместо этого SQL-запрос должен быть примерно таким:
CREATE TABLE
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true,
resource_version VARCHAR(255), PRIMARY KEY (one_row_id),
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id IN (0,
1)))
Ссылка, предоставленная 'skadya', была полезна. Я заставил систему работать после внесения изменений вкод двух вышеупомянутых файлов.
вам просто нужно изменить следующий код с
if conn.dialect.name not in ('mssql'):
columns_and_constraints.append(
sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
)
на
if conn.dialect.name not in ('mssql', 'mysql'):
columns_and_constraints.append(
sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
)