Я пытаюсь подключить мой PythonOperator и поместить его в плагины $ AIRFLOW_HOME /, как показано ниже:
class MyPythonOperator(PythonOperator):
def my_callable(param1, param2, param3):
# do something
@apply_defaults
def __init__(self, task_id, *args, **kwargs):
super(MyPythonOperator, self).__init__(
task_id=task_id,
python_callable = self.my_callable,
provide_context = True,
*args, **kwargs)
Затем я определяю код прерывания воздушного потока, это очень просто, всего две задачи:
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_workflow',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
task1 = MyPythonOperator(
task_id='task1',
params={'param1': 'param1_value',
'param2': 'param2_value',
'param3': 'param3_value'},
dag=dag
)
task2 = MyPythonOperator(
task_id='task2',
params={'param1': 'param1_value',
'param2': 'param2_value',
'param2': 'param3_value'},
dag=dag
)
task1 >> task2
но после запуска кода dag python выдается сообщение об ошибке:
<code>
$ python example_airflow_code.py
[2019-05-15 19:51:10,338] {<strong>init</strong>.py:51} INFO - Using executor SequentialExecutor
usage: example_airflow_code.py [-h]
{list_tasks,backfill,test,run,pause,unpause,list_dag_runs}
...
example_airflow_code.py: error: too few arguments</p>
<p>
Я попробовал отладку и вставил точку останова в эту строку:
<code>
super(MyPythonOperator, self).<strong>init</strong>(
Я нашел перед вызовом супер-конструктора, значения self.dag и self.dag_id ненормальные, значение:
<code>
str: Traceback (most recent call last):
File "/Applications/Eclipse.app/Contents/Eclipse/plugins/org.python.pydev.core_6.4.4.201807281807/pysrc/_pydevd_bundle/pydevd_resolver.py", line 166, in _getPyDictionary
attr = getattr(var, n)
File "/Users/zhuangxy/anaconda2/lib/python2.7/site-packages/airflow/models/<strong>init</strong>.py", line 2399, in dag_id
return 'adhoc_' + self.owner
AttributeError: 'MyPythonOperator' object has no attribute 'owner'</p>
<p>
Кто-нибудь знает, в чем проблема этого примера?
Большое спасибо!