HttpError 400 при попытке запустить задачу DataProcSparkOperator из локального потока - PullRequest
1 голос
/ 05 марта 2019

Я тестирую группу обеспечения доступности баз данных, которая раньше работала в Google Composer без ошибок при локальной установке Airflow.DAG раскручивает кластер Google Dataproc, запускает задание Spark (файл JAR, расположенный в корзине GS), а затем раскручивает кластер.

Задача DataProcSparkOperator сразу каждый раз завершается с ошибкой:

googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataproc.googleapis.com/v1beta2/projects//regions/global/jobs:submit?alt=json returned "Invalid resource field value in the request.">

Похоже, что URI неправильный / неполный, но я не уверен, что его вызывает.Ниже мясо моего DAG.Все остальные задачи выполняются без ошибок, и единственное отличие состоит в том, что DAG больше не работает на Composer:

default_dag_args = {
    'start_date': yesterday,
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 0,
    'retry_delay': dt.timedelta(seconds=30),
    'project_id': models.Variable.get('gcp_project'),
    'cluster_name': 'susi-bsm-cluster-{{ ds_nodash }}'
}

def slack():
    '''Posts to Slack if the Spark job fails'''
    text = ':x: The DAG *{}* broke and I am not smart enough to fix it. Check the StackDriver and DataProc logs.'.format(DAG_NAME)
    s.post_slack(SLACK_URI, text)

with DAG(DAG_NAME, schedule_interval='@once',
    default_args=default_dag_args) as dag:
    # pylint: disable=no-value-for-parameter

    delete_existing_parquet = bo.BashOperator(
        task_id = 'delete_existing_parquet',
        bash_command = 'gsutil rm -r {}/susi/bsm/bsm.parquet'.format(GCS_BUCKET)
    )

    create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
        task_id = 'create_dataproc_cluster',
        num_workers = num_workers_override or models.Variable.get('default_dataproc_workers'),
        zone = models.Variable.get('gce_zone'),
        init_actions_uris = ['gs://cjones-composer-test/susi/susi-bsm-dataproc-init.sh'],
        trigger_rule = trigger_rule.TriggerRule.ALL_DONE
    )

    run_spark_job = dpo.DataProcSparkOperator(
       task_id = 'run_spark_job',
       main_class = MAIN_CLASS,
       dataproc_spark_jars = [MAIN_JAR],
       arguments=['{}/susi.conf'.format(CONF_DEST), DATE_CONST]
    )

    notify_on_fail = po.PythonOperator(
        task_id = 'output_to_slack',
        python_callable = slack,
        trigger_rule = trigger_rule.TriggerRule.ONE_FAILED
    )

    delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
       task_id = 'delete_dataproc_cluster',
       trigger_rule = trigger_rule.TriggerRule.ALL_DONE
    )

    delete_existing_parquet >> create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster >> notify_on_fail

Любая помощь с этим будет очень признательна!

1 Ответ

3 голосов
/ 09 марта 2019

В отличие от DataprocClusterCreateOperator, DataProcSparkOperator не принимает project_id в качестве параметра.Он получает его из соединения с Airflow (если вы не укажете параметр gcp_conn_id, по умолчанию будет google_cloud_default).Вы должны настроить соединение.

Причина, по которой вы не видите этого при запуске DAG в Composer, заключается в том, что Composer настраивает соединение google_cloud_default.

...