Используя пример сценария конвейера потока данных с import googleads, zeep
, я настроил тестовую среду Composer.DAG такой же, как ваш, и я получаю ту же ошибку.Затем я делаю пару изменений, чтобы убедиться, что зависимости можно найти на рабочих машинах.
В группе обеспечения доступности баз данных я использую простой PythonOperator
, а не PythonVirtualenvOperator
.У меня есть конвейер потока данных и файл установки (main.py
и setup.py
) в корзине Google Cloud Storage , так что Composer может их найти.В установочном файле есть список требований, где мне нужно иметь, например, zeep и googleads.Я адаптировал пример файла настройки из здесь , изменив это:
REQUIRED_PACKAGES = [
'google-cloud-storage==1.10.0', 'zeep==3.2.0',
'argparse==1.4.0', 'google-cloud-kms==0.2.1',
'googleads==15.0.2', 'dill'
]
setuptools.setup(
name='Imports test',
version='1',
description='Imports test workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
# Command class instantiated and run during pip install scenarios.
'build': build,
'CustomCommands': CustomCommands,
}
)
Мой DAG
with models.DAG( 'composer_sample',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
PULL_DATA = PythonOperator(
task_id='PULL_DATA',
python_callable=execute_dataflow,
op_kwargs={
'main': '/home/airflow/gcs/data/main.py',
'project': PROJECT,
'temp_location': 'gs://dataflow-imports-test/temp',
'setup_file': '/home/airflow/gcs/data/setup.py',
'max_num_workers': 2,
'output': 'gs://dataflow-imports-test/output',
'project_id': PROJECT_ID})
PULL_DATA
без изменений в вызываемом Python.Тем не менее, с этой конфигурацией я все еще получаю сообщение об ошибке.
Следующий шаг , в консоли Google Cloud Platform (GCP), я иду в «Composer» через меню навигации и затем нажимаюна имя окружающей среды.На вкладке «Пакеты PyPI» я добавляю zeep и googleads и нажимаю «отправить».Обновление среды занимает некоторое время, но оно работает.
После этого шага мой конвейер может импортировать зависимости и успешно работать.Я также попытался запустить DAG с зависимостями, указанными на консоли GCP, но не в соответствии с требованиями setup.py
.И рабочий процесс снова прерывается, но в разных местах.Поэтому обязательно укажите их в обоих местах.