проблема с импортом зависимостей при запуске Dataflow из Google Cloud Composer - PullRequest
0 голосов
/ 04 марта 2019

Я использую Dataflow из облачного композитора Google, скрипт потока данных содержит некоторые нестандартные зависимости, такие как zeep, googleads.которые должны быть установлены на рабочих узлах потока данных, поэтому я упаковал их с помощью setup.py.когда я пытаюсь запустить это в dag, composer проверяет файлы потока данных и жалуется на No module names Zeep , googleads.Поэтому я создал pythonvirtualenvoperator и установил все необходимые нестандартные зависимости и попытался запустить задание потока данных, и он все еще жаловался на добавление zeep и googleads.

Вот моя кодовая база:

PULL_DATA = PythonVirtualenvOperator(
    task_id=PROCESS_TASK_ID,
    python_callable=execute_dataflow,
    op_kwargs={
        'main': 'main.py',
        'project': PROJECT,
        'temp_location': 'gs://bucket/temp',
        'setup_file': 'setup.py',
        'max_num_workers': 2,
        'output': 'gs://bucket/output',
        'project_id': PROJECT_ID},
    requirements=['google-cloud-storage==1.10.0', 'zeep==3.2.0',
                  'argparse==1.4.0', 'google-cloud-kms==0.2.1',
                  'googleads==15.0.2', 'dill'],
    python_version='2.7',
    use_dill=True,
    system_site_packages=True,
    on_failure_callback=on_failure_handler,
    on_success_callback=on_success_handler,
    dag='my-dag')

иКод, вызываемый моим питоном:

def execute_dataflow(**kwargs):
        import subprocess
        TEMPLATED_COMMAND = """
                          python main.py \
                                 --runner DataflowRunner \
                                 --project {project} \
                                 --region us-central1 \
                                 --temp_location {temp_location} \
                                 --setup_file {setup_file} \
                                 --output {output} \
                                 --project_id {project_id} 
                          """.format(**kwargs)
        process = subprocess.Popen(['/bin/bash', '-c', TEMPLATED_COMMAND])
        process.wait()
        return process.returncode

Мой файл main.py

import zeep
import googleads

{Apache-beam-code to construct dataflow pipeline}

Есть предложения?

Ответы [ 2 ]

0 голосов
/ 19 марта 2019

Используя пример сценария конвейера потока данных с import googleads, zeep, я настроил тестовую среду Composer.DAG такой же, как ваш, и я получаю ту же ошибку.Затем я делаю пару изменений, чтобы убедиться, что зависимости можно найти на рабочих машинах.

В группе обеспечения доступности баз данных я использую простой PythonOperator, а не PythonVirtualenvOperator.У меня есть конвейер потока данных и файл установки (main.py и setup.py) в корзине Google Cloud Storage , так что Composer может их найти.В установочном файле есть список требований, где мне нужно иметь, например, zeep и googleads.Я адаптировал пример файла настройки из здесь , изменив это:

REQUIRED_PACKAGES = [
    'google-cloud-storage==1.10.0', 'zeep==3.2.0',
'argparse==1.4.0', 'google-cloud-kms==0.2.1',
'googleads==15.0.2', 'dill'
    ]

setuptools.setup(
    name='Imports test',
    version='1',
    description='Imports test workflow package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        # Command class instantiated and run during pip install scenarios.
        'build': build,
        'CustomCommands': CustomCommands,
        }
    )

Мой DAG

with models.DAG(  'composer_sample',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    PULL_DATA = PythonOperator(
    task_id='PULL_DATA',
    python_callable=execute_dataflow,
    op_kwargs={
        'main': '/home/airflow/gcs/data/main.py',
        'project': PROJECT,
        'temp_location': 'gs://dataflow-imports-test/temp',
        'setup_file': '/home/airflow/gcs/data/setup.py',
        'max_num_workers': 2,
        'output': 'gs://dataflow-imports-test/output',
        'project_id': PROJECT_ID})
    PULL_DATA

без изменений в вызываемом Python.Тем не менее, с этой конфигурацией я все еще получаю сообщение об ошибке.

Следующий шаг , в консоли Google Cloud Platform (GCP), я иду в «Composer» через меню навигации и затем нажимаюна имя окружающей среды.На вкладке «Пакеты PyPI» я добавляю zeep и googleads и нажимаю «отправить».Обновление среды занимает некоторое время, но оно работает.

После этого шага мой конвейер может импортировать зависимости и успешно работать.Я также попытался запустить DAG с зависимостями, указанными на консоли GCP, но не в соответствии с требованиями setup.py.И рабочий процесс снова прерывается, но в разных местах.Поэтому обязательно укажите их в обоих местах.


0 голосов
/ 05 марта 2019

Моя работа имеет requirements.txt.Вместо использования параметра --setup_file, как у вас, он указывает следующее:

--requirements_file prod_requirements.txt

Это говорит DataFlow об установке библиотек в requirements.txt до запуска задания.

Ссылка: https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

...