Самая новая версия Airflow, доступная в Composer, - это 1.10.2 или 1.10.3 (в зависимости от региона). К тому времени эти операторы уже были в разделе contrib
.
Сосредоточение на том, как запускать задания потока данных Python 3 с Composer, которые вам понадобятся для выпуска новой версии. Однако, если вам нужно немедленное решение, вы можете попытаться создать бэкпорт для fix .
. В этом случае я определил DataFlow3Hook
, который расширяет нормальный DataFlowHook
, но это делаетне жесткий код python2
в методе start_python_dataflow
:
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
...
py_interpreter: str = "python3"
):
...
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
Тогда у нас будет свой пользовательский DataFlowPython3Operator
вызов нового хука:
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
...
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
...
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
Наконец, вВ нашей DAG мы просто используем новый оператор:
task = DataFlowPython3Operator(
py_file='/home/airflow/gcs/data/main.py',
task_id=JOB_NAME,
dag=dag)
Полный код здесь . Работа выполняется с Python 3.6:
Сведения об окружении и используемые зависимости (задание Beam было минимальным примером):
softwareConfig:
imageVersion: composer-1.8.0-airflow-1.10.3
pypiPackages:
apache-beam: ==2.15.0
google-api-core: ==1.14.3
google-apitools: ==0.5.28
google-cloud-core: ==1.0.3
pythonVersion: '3'
Дайте мне знать, если это работает для вас. Если это так, я бы порекомендовал перенести код в плагин для удобства чтения кода и повторного его использования в группах доступности баз данных.