apache airflow initdb терпит неудачу в точке kubernetes_resource_checking для mysql - PullRequest
1 голос
/ 11 июля 2019

Я хочу использовать MySQL в качестве внутренней базы данных для воздушного потока apache после установки зависимостей при запуске

airflow initdb

Airflow начинает настройку базы данных, но затем происходит сбой со следующим журналом

shahbaz@OpenSource:~$ airflow initdb
[2019-07-11 12:01:13,726] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, 
pool_recycle=1800, pid=17492
[2019-07-11 12:01:13,917] {__init__.py:51} INFO - Using executor 
LocalExecutor
DB: mysql+mysqldb://airflow:***@localhost:3306/airflow
[2019-07-11 12:01:14,276] {db.py:350} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, 
current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 
1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 
13eb55f81627, maintain history for compatibility with earlier 
migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 
338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 
52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 
502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 
1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 
2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 
40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 
561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 
4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> 
bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> 
bba5a7cfc896, Add a column to track the encryption state of the 
'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 
1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 
2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 
211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 
64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> 
f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 
4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 
8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 
5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 
127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> 
cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> 
bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 
947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> 
d2ae31099d61, Increase text size for MySQL (not relevant for other 
DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 
0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 
33ae817a1ff4, kubernetes_resource_checkpointing
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist- 
packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
cursor, statement, parameters, context


File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/shahbaz/.local/bin/airflow", line 32, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 1096, in initdb
    db.initdb(settings.RBAC)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 91, in initdb
    upgradedb()
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 358, in upgradedb
    command.upgrade(config, 'heads')
  File "/usr/local/lib/python3.6/dist-packages/alembic/command.py", line 254, in upgrade
    script.run_env()
  File "/usr/local/lib/python3.6/dist-packages/alembic/script/base.py", line 427, in run_env
    util.load_python_file(self.dir, 'env.py')
  File "/usr/local/lib/python3.6/dist-packages/alembic/util/pyfiles.py", line 81, in load_python_file
    module = load_module_py(module_id, path)
  File "/usr/local/lib/python3.6/dist-packages/alembic/util/compat.py", line 83, in load_module_py
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 92, in <module>
    run_migrations_online()
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 86, in run_migrations_online
    context.run_migrations()
  File "<string>", line 8, in run_migrations
  File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/environment.py", line 836, in run_migrations
    self.get_context().run_migrations(**kw)
  File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/migration.py", line 330, in run_migrations
    step.migration_fn(**kw)
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py", line 55, in upgrade
    *columns_and_constraints
  File "<string>", line 8, in create_table
  File "<string>", line 3, in create_table
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/ops.py", line 1120, in create_table
    return operations.invoke(op)
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/base.py", line 319, in invoke
    return fn(self, operation)
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/toimpl.py", line 101, in create_table
    operations.impl.create_table(table)
  File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 194, in create_table
    self._exec(schema.CreateTable(table))
  File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 118, in _exec
    return conn.execute(construct, *multiparams, **params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 980, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
    return connection._execute_ddl(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1042, in _execute_ddl
    compiled,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 276, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, \n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), \n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), \n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error at: http://sqlalche.me/e/e3q8)

Вы видите, что команда initdb завершается неудачно для kubernetes_resource_checkpointing

, а последняя запись журнала указывает, что это произошло из-за OperationalError в sqlalchemy.

sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) 
(3812, "An expression of non-boolean type specified to a check 
constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE 
kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, 
\n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), 
\n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), 
\n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error 
at: http://sqlalche.me/e/e3q8)

Я хотел быздесь утверждают, что я могу запустить apache-airflow с помощью базы данных Postgres, и я использую airflow с Postgres только потому, что это странно для MySQL.

Я использую

apache-airflow версия 1.10.3

mysql версия 8.0.16 (MySQL Community Server - GPL)

Также я попытался установить SQL_MODE для MYSQL с 'ANSI', как было указано в документах airflow, но все это было вТщетно.

Любая помощь будет оцененато, что я обнаружил, я проверил файлы кода, указанные Ши Ченом, два файла ответственны за это поведение.

33ae817a1ff4_add_kubernetes_resource_checkpointing.py
86770d1215c0_add_kubernetes_scheduler_uniqueness.py

Оба файла являются файлами миграции с использованием библиотек alembic и sqlalchemy. Я обнаружил, что следующий код sqlalchemy, написанный в файле 33ae817a1ff4_add_kubernetes_resource_checkpointing.py

def upgrade():

    columns_and_constraints = [
        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
        sa.Column("resource_version", sa.String(255))
    ]

    conn = op.get_bind()

    # alembic creates an invalid SQL for mssql dialect
    if conn.dialect.name not in ('mssql'):
        columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id"))

    table = op.create_table(
        RESOURCE_TABLE,
        *columns_and_constraints
    )

    op.bulk_insert(table, [
        {"resource_version": ""}
    ])

, интерпретируется как следующий SQL-запрос, который не являетсяправильный

CREATE TABLE 
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true, 
resource_version VARCHAR(255), PRIMARY KEY (one_row_id), 
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), 
CHECK (one_row_id IN (0, 1))

Вместо этого SQL-запрос должен быть примерно таким:

CREATE TABLE 
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true, 
resource_version VARCHAR(255), PRIMARY KEY (one_row_id), 
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id IN (0, 
1)))

Ссылка, предоставленная 'skadya', была полезна. Я заставил систему работать после внесения изменений вкод двух вышеупомянутых файлов.

вам просто нужно изменить следующий код с

if conn.dialect.name not in ('mssql'):
        columns_and_constraints.append(
            sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
        )

на

if conn.dialect.name not in ('mssql', 'mysql'):
    columns_and_constraints.append(
        sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
    )

Ответы [ 2 ]

2 голосов
/ 19 июля 2019

Существует открытая ошибка в системе отслеживания ошибок воздушного потока.

https://issues.apache.org/jira/browse/AIRFLOW-4995.

В качестве обходного пути вы можете применить предложенные изменения в запрос на получение вручную.

1 голос
/ 14 июля 2019

Я столкнулся с точно такой же проблемой. Кто-то знает, что делать?

Кстати, я столкнулся с другой проблемой, жалуясь, что таблица dag_stats уже существует при сбросе БД. Мне пришлось вручную отбросить dag_stats, чтобы сброс прошел этот шаг. Но все же заблокирован на это ограничение.

CREATE TABLE kube_resource_version (
    one_row_id BOOL NOT NULL DEFAULT true,
    resource_version VARCHAR(255),
    PRIMARY KEY (one_row_id),
    CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
    CHECK (one_row_id IN (0, 1))
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...