Как Airflow может создать задание потока данных из оператора Python? - PullRequest
0 голосов
/ 28 января 2019

Когда я запускаю свой конвейер Beam из командной строки, используя прямой бегун или бегун потока данных, он работает нормально ...

Пример:

$ python my_pipeline.py --key /path/to/gcp/service/key.json --project gcp_project_name

Но когда яя пытаюсь использовать поток воздуха, у меня есть два варианта, оператор bash или оператор python.

Использование оператора bash приведет к успеху, но ограничит мою возможность использовать функции воздушного потока.

Но я такойПопытка сделать это запустить его как оператор Python.Поэтому я импортирую модуль из файла airflow dg, а затем запускаю его как оператор python.

Он также работает нормально, если я использую локальный бегун, но когда я изменил его на бегун потока данных, происходит сбой после создания задания в потоке данных GCP с этой ошибкой

ImportError: No module named airflow.bin.cli

Чего мне не хватает, чтобы Airflow создать поток данных работа оператора python ?

1 Ответ

0 голосов
/ 01 февраля 2019

ОК, это не идеальное решение, но вы можете использовать

DataFlowPythonOperator()

, который будет выполнять ту же команду bash, которую мы упоминали ранее.Это обходной путь и не равен PythonOperator, но больше похож на запуск BashOperator ... Все еще не может использовать силу функций Airflow в текущем случае (например, xcom) ... Документы

...