Airflow DAG генерирует RecursionError, запускаемое через веб-консоль - PullRequest
0 голосов
/ 19 февраля 2020

Я получаю следующую ошибку, когда пытаюсь вызвать группу обеспечения доступности баз данных, использующую пользовательский BaseOperator.

вот ошибка,

UnicodeDecodeError: код 'utf-8' c can не декодировать байт 0x80 в позиции 0: недопустимый начальный байт

Вышеуказанное исключение было прямой причиной следующего исключения:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 290, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 337, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/views.py", line 1213, in trigger
    external_trigger=True
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1659, in create_dagrun
    session=session)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1346, in create_dagrun
    run.refresh_from_db()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 109, in refresh_from_db
    DR.run_id == self.run_id
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3347, in one
    ret = self.one_or_none()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3316, in one_or_none
    ret = list(self)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3389, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3414, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1478, in _handle_dbapi_exception
    util.reraise(*exc_info)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line 272, in execute
    self._handle_result(result)
  File "/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line 163, in _handle_result
    self._handle_resultset()
  File "/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line 651, in _handle_resultset
    self._rows = self._cnx.get_rows()[0]
  File "/usr/local/lib/python3.7/site-packages/mysql/connector/connection_cext.py", line 318, in get_rows
    else self._cmysql.fetch_row()
SystemError: <method 'fetch_row' of '_mysql_connector.MySQL' objects> returned a result with an error set

Мой код выглядит следующим образом:

из airflow.plugins_manager import AirflowPlugin из airflow.utils.decorators import apply_defaults

class TestOperator(BaseOperator):
    template_fields = ('param1')
    ui_color = '#A7E6A7'

    @apply_defaults
    def __init__(self,param1,*args, **kwargs):
        self.param1 = param1
        super(TestOperator, self).__init__(*args, **kwargs)

    def execute(self):
        print ('welcome to airflow')


class TestOperatorPlugin(AirflowPlugin):
    name = "TestOperator_plugin"
    operators = [TestOperator]

--here is Dag,
from TestOperator import TestOperator
from airflow import DAG
from datetime import datetime

prog_args = {
'depends_on_past': False, 'param1' : 'testOne'
}

testMYDAG = DAG('TestMYDAG',  start_date = datetime(2020, 2, 18) , description='TestMYDAG', default_args = prog_args, schedule_interval=None)

testOp = TestOperator(task_id='test_dag', dag=testMYDAG )

testOp

1 Ответ

0 голосов
/ 20 марта 2020

Команда, я решил проблему, добавив схему кодирования в utf-8 в файле airflow.cfg. Кроме того, его необходимо добавить в строку sql_alchemy_connection.

...