Вызываемая функция воздушного потока Python многоразового использования - PullRequest
3 голосов
/ 08 марта 2019

airflow_version = 1.10.2;python_version = 3.6.8

У меня возникают проблемы с пониманием того, как сделать вызываемый Python более пригодным для повторного использования в PythonOperator airflow, так как работает та же функция, объявленная в самом файле dag, но импортирующая его из вспомогательной библиотекизавершается неудачей.

Итак, следующее работает :

def my_function(temp_file, task_id, **kwargs):

    xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

    if not xcom_vals:
        return 'Xcom message not retrieved'

    ack_messages = []

    for item in xcom_vals:

        ack_messages += <do stuff>

    return ack_messages

with DAG(<dag args>):

    process_messages = PythonOperator(
        task_id='get_messages',
        python_callable=my_function,
        op_kwargs={'task_id': 'previous_task_id',
                    'temp_file': temp_file},
        provide_context=True,
    )

Но, перемещая my_function в модуль lib / helpers.py и затем импортируяпроисходит сбой с ошибкой.

Broken DAG: [path to dag] cannot import my_function

ПРИМЕЧАНИЕ : lib / helpers.py содержит другие функции (хотя и более простые), которые успешно импортируются и используются в текущей и других группах обеспечения доступности баз данных.

Как реализовать my_function , чтобы ее могли вызывать другие даг?

1 Ответ

0 голосов
/ 29 марта 2019

Выяснил, что это связано с пользовательским интерфейсом воздушного потока и планировщиком, который неправильно анализирует папки libs, какое-то запаздывание после git-синхронизации? Таким образом, пользовательский интерфейс и планировщик правильно анализировали файл dag, но не папку lib.

В конце концов это поведение было решено перезапуском как пользовательского интерфейса, так и модуля планировщика (мы запускаем поток воздуха в kubernetes), и на этом все.

...