Google Dataflow: импортировать пользовательский модуль Python - PullRequest
1 голос
/ 13 января 2020

Я пытаюсь запустить Apache конвейер луча (Python) в облачном потоке данных Google, запускаемый группой DAG в Google Cloud Coomposer.

Структура моей папки dags в соответствующем сегменте GCS: следующим образом:

/dags/
  dataflow.py <- DAG
  dataflow/
    pipeline.py <- pipeline
    setup.py
    my_modules/
      __init__.py
      commons.py <- the module I want to import in the pipeline

setup.py очень базовый c, но в соответствии с документами Apache Beam и ответами на SO:

import setuptools

setuptools.setup(setuptools.find_packages())

В файле DAG (dataflow.py) Я установил параметр setup_file и передал его в поток данных:

default_dag_args = {
    ... ,
    'dataflow_default_options': {
        ... ,
        'runner': 'DataflowRunner',
        'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
    }
}

В файле конвейера (pipe.py) я пытаюсь использовать

from my_modules import commons

, но это терпит неудачу. Журнал в Google Cloud Composer (Apache Airflow) гласит:

gcp_dataflow_hook.py:132} WARNING - b'  File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n    from my_modules import commons\n           ^\nSyntaxError: invalid syntax'

Основная идея c файла для файла setup.py задокументирована здесь

Также есть похожие вопросы по SO, которые мне помогли:

Google Dataflow - Не удалось импортировать пользовательские python модули

Dataflow / apache beam: управлять пользовательскими зависимостями модулей

Мне действительно интересно, почему мои конвейеры отказывают с Syntax Error, а не с module not found типом ошибки ...

1 Ответ

0 голосов
/ 15 января 2020

Я попытался воспроизвести вашу проблему, а затем попытался ее решить, поэтому я создал ту же структуру папок, что у вас уже есть:

/dags/
  dataflow.py
  dataflow/
     pipeline.py -> pipeline
     setup.py
     my_modules/
        __init__.py
        common.py

Поэтому, чтобы она работала, я сделал следующее изменение: эти папки в месте, где выполняется экземпляр, код может найти его, например, в папке /tmp/ экземпляра.

Итак, моя группа доступности баз данных будет выглядеть примерно так:

1 - Сначала я объявляю свои аргументы:

default_args = {
   'start_date': datetime(xxxx, x, x),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
   'dataflow_default_options': {
       'project': '<project>',
       'region': '<region>',
       'stagingLocation': 'gs://<bucket>/stage',
       'tempLocation': 'gs://<bucket>/temp',
       'setup_file': <setup.py>,
       'runner': 'DataflowRunner'
   }
} 

2 - После этого я создал группу обеспечения доступности баз данных и перед запуском задачи «Поток данных» скопировал весь каталог папки, созданный выше, в /tmp/ папка экземпляра Task t1, и после этого я запускаю конвейер из / tmp / directory Task t2:

with DAG(
    'composer_df',
     default_args=default_args,
     description='datflow dag',
     schedule_interval="xxxx") as dag:

     def copy_dependencies():
          process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*', 
          '/tmp/'])
          process.communicate()


     t1 = python_operator.PythonOperator(
        task_id='copy_dependencies',
        python_callable=copy_dependencies,
        provide_context=False
     )


     t2 = DataFlowPythonOperator(task_id="composer_dataflow", 
          py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')

     t1 >> t2

Вот так я и создал файл DAG dataflow.py, а затем в pipeline.py пакет для импорта будет выглядеть так:

from my_modules import commons

Он должен работать нормально, поскольку каталог папки понятен для виртуальной машины.

...