Трудности с использованием Gcloud Composer DAG для запуска задания Spark - PullRequest
0 голосов
/ 20 февраля 2019

Я играю с Gcloud Composer, пытаюсь создать группу обеспечения доступности баз данных, которая создает кластер DataProc, запускает простое задание Spark, а затем разрушает кластер.Я пытаюсь запустить пример задания Spark PI.

Я понимаю, что при вызове DataProcSparkOperator я могу выбрать только определение либо свойства main_jar, либо свойства main_class.Когда я определяю main_class, задание завершается с ошибкой:

java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:239)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Когда я выбираю определение свойства main_jar, задание завершается с ошибкой:

Error: No main class set in JAR; please specify one with --class
Run with --help for usage help or --verbose for debug output

Я немного растерялся из-за того, как решить эту проблему, поскольку я новичок в Spark и DataProc.

Мой DAG:

import datetime as dt
from airflow import DAG, models
from airflow.contrib.operators import dataproc_operator as dpo
from airflow.utils import trigger_rule

MAIN_JAR = 'file:///usr/lib/spark/examples/jars/spark-examples.jar'
MAIN_CLASS = 'org.apache.spark.examples.SparkPi'
CLUSTER_NAME = 'quickspark-cluster-{{ ds_nodash }}'

yesterday = dt.datetime.combine(
    dt.datetime.today() - dt.timedelta(1),
    dt.datetime.min.time())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': dt.timedelta(seconds=30),
    'project_id': models.Variable.get('gcp_project')
}

with DAG('dataproc_spark_submit', schedule_interval='0 17 * * *',
    default_args=default_dag_args) as dag:

    create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
        project_id = default_dag_args['project_id'],
        task_id = 'create_dataproc_cluster',
        cluster_name = CLUSTER_NAME,
        num_workers = 2,
        zone = models.Variable.get('gce_zone')
    )

    run_spark_job = dpo.DataProcSparkOperator(
        task_id = 'run_spark_job',
        #main_jar = MAIN_JAR,
        main_class = MAIN_CLASS,
        cluster_name = CLUSTER_NAME
    )

    delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
        project_id = default_dag_args['project_id'],
        task_id = 'delete_dataproc_cluster',
        cluster_name = CLUSTER_NAME,
        trigger_rule = trigger_rule.TriggerRule.ALL_DONE
    )

    create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster

1 Ответ

0 голосов
/ 20 февраля 2019

Я сравнил его с успешной работой с использованием интерфейса командной строки и увидел, что даже когда класс заполнял поле Main class or jar, путь к банке был указан в Jar files:

enter image description here

Проверка оператора Я заметил, что есть также dataproc_spark_jars параметр , который не является взаимоисключающим для main_class:

run_spark_job = dpo.DataProcSparkOperator(
    task_id = 'run_spark_job',
    dataproc_spark_jars = [MAIN_JAR],
    main_class = MAIN_CLASS,
    cluster_name = CLUSTER_NAME
)

Добавление это сделало трюк:

enter image description here

enter image description here

...