Как я могу запустить конвейер Dataflow с установочным файлом, используя Cloud Composer / Apache Airflow? - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть работающий конвейер потока данных, который в первый раз запускает setup.py для установки некоторых локальных вспомогательных модулей.Теперь я хочу использовать Cloud Composer / Apache Airflow для планирования конвейера.Я создал свой файл DAG и поместил его в назначенную папку Google Storage DAG вместе с моим конвейерным проектом.Структура папок выглядит следующим образом:

{Composer-Bucket}/
    dags/
       --DAG.py
       Pipeline-Project/
           --Pipeline.py
           --setup.py
           Module1/
              --__init__.py
           Module2/
              --__init__.py
           Module3/
              --__init__.py

Часть моей группы обеспечения доступности баз данных, в которой указан файл setup.py, выглядит следующим образом:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
    task_id="resumeparsertask",
    py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
    dataflow_default_options={
        "project": {PROJECT-NAME},    
        "setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})

Однако, когда я смотрю на журналы ввеб-интерфейс Airflow, я получаю сообщение об ошибке:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.

Я не уверен, почему он не может найти установочный файл.Как мне запустить конвейер Dataflow с установочным файлом / модулями?

Ответы [ 2 ]

0 голосов
/ 14 сентября 2018

Если вы посмотрите на код для DataflowPythonOperator , похоже, что основной py_file может быть файлом внутри корзины GCS и локализован оператором до выполнения конвейера.Тем не менее, я не вижу ничего подобного для dataflow_default_options.Похоже, что параметры просто копируются и форматируются.

Поскольку папка GCS dag монтируется на экземплярах Airflow с помощью Cloud Storage Fuse , вы сможете получить доступ к файлу локально, используя "dags_folder "env var.то есть вы можете сделать что-то вроде этого:

from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')

Затем вы можете использовать переменную LOCAL_SETUP_FILE для свойства setup_file в dataflow_default_options.

0 голосов
/ 14 сентября 2018

Запускаете ли вы Composer и Dataflow с одной и той же учетной записью службы, или они разделены?В последнем случае вы проверяли, имеет ли учетная запись службы Dataflow доступ для чтения к корзине и объекту?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...