Composer не видит, что задание потока данных выполнено успешно - PullRequest
0 голосов
/ 24 октября 2019

Я использую Gcloud Composer для запуска заданий потока данных.

Моя группа обеспечения доступности баз данных состоит из двух заданий потока данных, которые должны выполняться одно за другим.

import datetime

from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models


default_dag_args = {

    'start_date': datetime.datetime(2019, 10, 23),
    'dataflow_default_options': {
               'project': 'myproject',
               'region': 'europe-west1',
               'zone': 'europe-west1-c',
               'tempLocation': 'gs://somebucket/',
               }
}

with models.DAG(
        'some_name',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    parameters = {'params': "param1"}

    t1 = DataflowTemplateOperator(
        task_id='dataflow_example_01',
        template='gs://path/to/template/template_001',
        parameters=parameters,
        dag=dag)

    parameters2 = {'params':"param2"}

    t2 = DataflowTemplateOperator(
        task_id='dataflow_example_02',
        template='gs://path/to/templates/template_002',
        parameters=parameters2,
        dag=dag
    )

    t1 >> t2

Когда я проверяю поток данных,задание выполнено успешно, все файлы, которые он должен создать, созданы, но, похоже, он работал в регионе США, среда облачного композитора находится на западе Европы.

В потоке воздуха я вижу, что первое задание ещезапущен, поэтому второй не запущен

enter image description here

Что мне добавить в DAG, чтобы он был успешным? Как мне бегать в Европе?

Любой совет или решение о том, как действовать, будет наиболее ценно. Спасибо!

1 Ответ

3 голосов
/ 24 октября 2019

Я должен был решить эту проблему в прошлом. В Airflow 1.10.2 (или ниже) код вызывает конечную точку service.projects().templates().launch(). Это было исправлено в 1.10.3 , где вместо этого используется региональный: service.projects().locations().templates().launch().

По состоянию на октябрь 2019 г. последняя версия Airflow, доступная для сред Composer, - 1.10.2. Если вам нужно решение немедленно, исправление можно перенести обратно в Composer.

Для этого мы можем переопределить DataflowTemplateOperator для нашей собственной версии с именем RegionalDataflowTemplateOperator:

class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
  def execute(self, context):
    hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                        delegate_to=self.delegate_to,
                        poll_sleep=self.poll_sleep)

    hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
                                 self.parameters, self.template)

Теперь будет использоваться модифицированный RegionalDataFlowHook, который переопределяет метод start_template_dataflow оператора DataFlowHook для вызова правильной конечной точки:

class RegionalDataFlowHook(DataFlowHook):
  def _start_template_dataflow(self, name, variables, parameters,
                               dataflow_template):
      ...
      request = service.projects().locations().templates().launch(
          projectId=variables['project'],
          location=variables['region'],
          gcsPath=dataflow_template,
          body=body
      )
      ...
      return response

Затем мы можем создать задачу, используя наш новый оператори предоставленный Google шаблон (для целей тестирования):

task = RegionalDataflowTemplateOperator(
    task_id=JOB_NAME,
    template=TEMPLATE_PATH,
    parameters={
        'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
        'output': 'gs://{}/europe/output'.format(BUCKET)
    },
    dag=dag,
)

Полный рабочий DAG здесь . Для более чистой версии оператор может быть перемещен в отдельный модуль.

...