Apche Airflow с MySQL backend - UnicodeDecodeError: код «utf-8» c не может декодировать байт 0x80 в позиции 0: недопустимый начальный байт - PullRequest
0 голосов
/ 25 марта 2020

Когда я щелкаю по имени группы DAG или в виде дерева или графического представления для активных групп DAG, я получаю следующую ошибку:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask/app.py", line 1988, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask/app.py", line 1641, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask/app.py", line 1544, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask/app.py", line 1639, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask/app.py", line 1625, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/flask_login.py", line 755, in decorated_view
    return func(*args, **kwargs)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/airflow/www/utils.py", line 219, in view_func
    return f(*args, **kwargs)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/airflow/www/utils.py", line 125, in wrapper
    return f(*args, **kwargs)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/airflow/www/views.py", line 1162, in tree
    DR.execution_date>=min_date)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3233, in all
    return list(self)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3389, in __iter__
    return self._execute_and_instances(context)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3414, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 982, in execute
    return meth(self, multiparams, params)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    distilled_params,
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1478, in _handle_dbapi_exception
    util.reraise(*exc_info)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute
    cursor.execute(statement, parameters)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/mysql/connector/cursor_cext.py", line 272, in execute
    self._handle_result(result)
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/mysql/connector/cursor_cext.py", line 163, in _handle_result
    self._handle_resultset()
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/mysql/connector/cursor_cext.py", line 651, in _handle_resultset
    self._rows = self._cnx.get_rows()[0]
  File "/home/airflowenv/airflow1/lib64/python3.6/site-packages/mysql/connector/connection_cext.py", line 318, in get_rows
    else self._cmysql.fetch_row()
SystemError: <built-in method fetch_row of _mysql_connector.MySQL object at 0x1e9ce60> returned a result with an error set

Я использую Airflow (версия 1.8.1) с MySQL 8 в качестве базы данных. Эти пакеты работали, когда я использовал sqlite и Oracle.

После некоторого исследования стека переполнения я внес эти изменения в файл конфигурации воздушного потока, но это не решило проблему.

sql_alchemy_conn = mysql+mysqlconnector://<user>:<password>i@localhost/MIAF?charset=utf8mb4

sql_engine_encoding = utf8mb4

Как мне решить эту проблему?

1 Ответ

0 голосов
/ 25 марта 2020

Проблема была вызвана столбцом conf в таблице dag_run, который имеет тип данных blob. Эта проблема была похожа на эта , и ответ, который мне помог, заключался в использовании опции use_pure.

Я исправил проблему, отредактировав sql_alchemy_conn в файле airflow.cfg.

sql_alchemy_conn = mysql+mysqlconnector://<user>:<password>i@localhost/MIAF?use_pure=True

PS Я решил, записав все запросы в файл журнала, отредактировав файл /mysql/connector/cursor_cext.py

        f = open("/home/airflow/mysql_utf8_error.log","a")
        f.write(operation)
        f.write("\n")
        f.close()

Я посмотрел на запрос, который был выполнен, когда произошел сбой и это было извлечение из dag_run.

...