airflow_version = 1.10.2;python_version = 3.6.8
У меня возникают проблемы с пониманием того, как сделать вызываемый Python более пригодным для повторного использования в PythonOperator airflow, так как работает та же функция, объявленная в самом файле dag, но импортирующая его из вспомогательной библиотекизавершается неудачей.
Итак, следующее работает :
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs={'task_id': 'previous_task_id',
'temp_file': temp_file},
provide_context=True,
)
Но, перемещая my_function в модуль lib / helpers.py и затем импортируяпроисходит сбой с ошибкой.
Broken DAG: [path to dag] cannot import my_function
ПРИМЕЧАНИЕ : lib / helpers.py содержит другие функции (хотя и более простые), которые успешно импортируются и используются в текущей и других группах обеспечения доступности баз данных.
Как реализовать my_function , чтобы ее могли вызывать другие даг?