Я получаю следующую ошибку, когда пытаюсь вызвать группу обеспечения доступности баз данных, использующую пользовательский BaseOperator.
вот ошибка,
UnicodeDecodeError: код 'utf-8' c can не декодировать байт 0x80 в позиции 0: недопустимый начальный байт
Вышеуказанное исключение было прямой причиной следующего исключения:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/flask_login/utils.py", line 258, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 290, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 337, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/www/views.py", line 1213, in trigger
external_trigger=True
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1659, in create_dagrun
session=session)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1346, in create_dagrun
run.refresh_from_db()
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 109, in refresh_from_db
DR.run_id == self.run_id
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3347, in one
ret = self.one_or_none()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3316, in one_or_none
ret = list(self)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3389, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3414, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1478, in _handle_dbapi_exception
util.reraise(*exc_info)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line 272, in execute
self._handle_result(result)
File "/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line 163, in _handle_result
self._handle_resultset()
File "/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line 651, in _handle_resultset
self._rows = self._cnx.get_rows()[0]
File "/usr/local/lib/python3.7/site-packages/mysql/connector/connection_cext.py", line 318, in get_rows
else self._cmysql.fetch_row()
SystemError: <method 'fetch_row' of '_mysql_connector.MySQL' objects> returned a result with an error set
Мой код выглядит следующим образом:
из airflow.plugins_manager import AirflowPlugin из airflow.utils.decorators import apply_defaults
class TestOperator(BaseOperator):
template_fields = ('param1')
ui_color = '#A7E6A7'
@apply_defaults
def __init__(self,param1,*args, **kwargs):
self.param1 = param1
super(TestOperator, self).__init__(*args, **kwargs)
def execute(self):
print ('welcome to airflow')
class TestOperatorPlugin(AirflowPlugin):
name = "TestOperator_plugin"
operators = [TestOperator]
--here is Dag,
from TestOperator import TestOperator
from airflow import DAG
from datetime import datetime
prog_args = {
'depends_on_past': False, 'param1' : 'testOne'
}
testMYDAG = DAG('TestMYDAG', start_date = datetime(2020, 2, 18) , description='TestMYDAG', default_args = prog_args, schedule_interval=None)
testOp = TestOperator(task_id='test_dag', dag=testMYDAG )
testOp