Как запустить конвейер данных Apache Beam через Apache Airflow с использованием DataflowPythonOperator - PullRequest
0 голосов
/ 13 июня 2019

Я реализовал DataflowPythonOperator в моей Airflow DAG, и во время выполнения я получаю сообщение об ошибке ниже:

2019-06-12 07:04:27,988] {{models.py:1595}} INFO - Executing <Task(DataFlowPythonOperator): task_run_pipeline> on 2019-05-01T04:10:00+00:00
[2019-06-12 07:04:27,989] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run example_with_dataflow task_run_pipeline 2019-05-01T04:10:00+00:00 --job_id 57 --raw -sd DAGS_FOLDER/example_with_dataflow.py --cfg_path /tmp/tmp1fjmyili']
[2019-06-12 07:04:32,437] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:32,436] {{settings.py:174}} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2019-06-12 07:04:35,107] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:35,105] {{__init__.py:51}} INFO - Using executor LocalExecutor
[2019-06-12 07:04:39,188] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:39,186] {{models.py:271}} INFO - Filling up the DagBag from /usr/local/airflow/dags/example_with_dataflow.py
[2019-06-12 07:04:39,861] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:39,860] {{cli.py:484}} INFO - Running <TaskInstance: example_with_dataflow.task_run_pipeline 2019-05-01T04:10:00+00:00 [running]> on host 10b352a0858c
[2019-06-12 07:04:40,086] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:40,086] {{discovery.py:272}} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2019-06-12 07:04:40,895] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:40,894] {{discovery.py:873}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/example-airflow-test/o/example%2Fprocess_details.py?alt=media
[2019-06-12 07:04:42,079] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:42,079] {{gcp_dataflow_hook.py:120}} INFO - Running command: python /tmp/dataflow58bcc900-process_details.py --runner=DataflowRunner --project=data_example --zone=us-central1 --temp_location=gs://path/temp --staging_location=gs://path/staging --labels=airflow-version=v1-10-1 --job_name=task-run-pipeline-59ed310c --region=us-central1
[2019-06-12 07:04:42,130] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:42,130] {{gcp_dataflow_hook.py:151}} INFO - Start waiting for DataFlow process to complete.
[2019-06-12 07:04:42,391] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:42,391] {{gcp_dataflow_hook.py:132}} WARNING - b'Traceback (most recent call last):\n  File "/tmp/dataflow58bcc900-process_details.py", line 7, in <module>\n    import apache_beam as beam\nModuleNotFoundError: No module named \'apache_beam\''
[2019-06-12 07:04:42,392] {{models.py:1760}} ERROR - DataFlow failed with return code 1

Похоже, что есть проблема с установкой зависимостей.Если я запускаю конвейер данных Beam по отдельности, он работает правильно, используя DataflowRunner на GCP.Для запуска веб-сервера Apache Airflow я использую файл Puckel Docker-compose.Ваши идеи / предложения были бы очень благодарны!Спасибо:)

1 Ответ

0 голосов
/ 14 июня 2019

Вы можете создавать шаблоны потока данных, а затем запускать эти шаблоны из Airflow / Cloud composer.Эти проблемы, связанные с зависимостями, не создадут никаких проблем.

...