У меня есть работающий конвейер потока данных, который в первый раз запускает setup.py
для установки некоторых локальных вспомогательных модулей.Теперь я хочу использовать Cloud Composer / Apache Airflow для планирования конвейера.Я создал свой файл DAG и поместил его в назначенную папку Google Storage DAG вместе с моим конвейерным проектом.Структура папок выглядит следующим образом:
{Composer-Bucket}/
dags/
--DAG.py
Pipeline-Project/
--Pipeline.py
--setup.py
Module1/
--__init__.py
Module2/
--__init__.py
Module3/
--__init__.py
Часть моей группы обеспечения доступности баз данных, в которой указан файл setup.py, выглядит следующим образом:
resumeparserop = dataflow_operator.DataFlowPythonOperator(
task_id="resumeparsertask",
py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
dataflow_default_options={
"project": {PROJECT-NAME},
"setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})
Однако, когда я смотрю на журналы ввеб-интерфейс Airflow, я получаю сообщение об ошибке:
RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.
Я не уверен, почему он не может найти установочный файл.Как мне запустить конвейер Dataflow с установочным файлом / модулями?