Я должен был решить эту проблему в прошлом. В Airflow 1.10.2 (или ниже) код вызывает конечную точку service.projects().templates().launch()
. Это было исправлено в 1.10.3 , где вместо этого используется региональный: service.projects().locations().templates().launch()
.
По состоянию на октябрь 2019 г. последняя версия Airflow, доступная для сред Composer, - 1.10.2. Если вам нужно решение немедленно, исправление можно перенести обратно в Composer.
Для этого мы можем переопределить DataflowTemplateOperator
для нашей собственной версии с именем RegionalDataflowTemplateOperator
:
class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
def execute(self, context):
hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
self.parameters, self.template)
Теперь будет использоваться модифицированный RegionalDataFlowHook
, который переопределяет метод start_template_dataflow
оператора DataFlowHook
для вызова правильной конечной точки:
class RegionalDataFlowHook(DataFlowHook):
def _start_template_dataflow(self, name, variables, parameters,
dataflow_template):
...
request = service.projects().locations().templates().launch(
projectId=variables['project'],
location=variables['region'],
gcsPath=dataflow_template,
body=body
)
...
return response
Затем мы можем создать задачу, используя наш новый оператори предоставленный Google шаблон (для целей тестирования):
task = RegionalDataflowTemplateOperator(
task_id=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/europe/output'.format(BUCKET)
},
dag=dag,
)
Полный рабочий DAG здесь . Для более чистой версии оператор может быть перемещен в отдельный модуль.