Оператор GCP Composer (Airflow) - PullRequest
       47

Оператор GCP Composer (Airflow)

0 голосов
/ 12 декабря 2018

Я использую GCP Composer API (Airflow) и мою группу обеспечения доступности баз данных, чтобы увеличить количество рабочих, которые мне возвращаются, ошибка ниже:

Broken DAG: [/home/airflow/gcs/dags/cluster_scale_workers.py] 'module' object has no attribute 'DataProcClusterScaleOperator' 

Кажется, что-тоОднако, когда я смотрю на Поток воздуха. Прочитайте документы и перепроверьте мой код, кажется, что все в порядке.Чего мне не хватает?

Это связано с версией GCP Airflow?

Код:

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project'),
    'cluster_name': 'hive-cluster'
}

with models.DAG(
        'scale_workers',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    scale_to_6_workers = dataproc_operator.DataprocClusterScaleOperator(
        task_id='scale_dataproc_cluster_6',
        cluster_name='hive-cluster',
        num_workers=6,
        num_preemptible_workers=3,
        dag=dag
        )

1 Ответ

0 голосов
/ 13 декабря 2018

Мне удалось найти проблему и разобраться с ней.Комментарий Ашиша Кумара, приведенный выше, является правильным.

Проблема заключалась в том, что версия Airflow, которую я использовал (1.9.0), не поддерживала DataProcClusterScaleOperator.Я создал еще один экземпляр, активировав BETA и выбрав версию 1.10.0.

Что исправило мою проблему.

...