Ошибка выполнения SparkSubmitOperator из задания Airflow - PullRequest
1 голос
/ 18 марта 2020

Справочная информация. Я создал новую группу обеспечения доступности заданий / задач Airflow, в которой я использую SparkSubmitOperator. Я использую Spark и Airflow на своем рабочем столе (версии и c. Ниже). DAG работает нормально, пока не достигнет части Submit для задания Spark. Я пытался изменить соединение, используя следующие параметры. Независимо от того, что я пытаюсь сделать, я получаю следующее сообщение в журнале воздушного потока.

Airflow распознает соединение и пытается использовать его, но не удается.

Если я отправляю целевой DataPipelineExample.py из команды подсказка, он запускается без проблем.

Вопрос: Что мешает Airflow распознать и использовать подключение к локальному искру для выполнения искровой отправки?

Airflow.exceptions. AirflowException: Невозможно выполнить: spark-submit --master http://localhost: 4040 --name mySparkSubmitJob

Рабочий стол: Linux Mint VERSION = "19.3 (Tricia)" Spark: версия 2.4 .5 Pyspark: версия 2.4.5 Airflow: Версия: 1.10.9 Python 3.7.4 (по умолчанию, 13 августа 2019, 20:35:49) java версия "1.8.0_241"

Airflow Используемые или пробные подключения localhost 4040 spark: // localhost 4040 http://localhost: 4040 http://specific IP-адрес: 4040 Хост: localhost Порт: 4040 / Доп., Без доп. Et c. Дополнительно: {"root .default" ,: "spark_home": "", "spark_binary": "spark-submit", "namespace": "default"}

Информация о пути

export SCALA_HOME=~/anaconda3/share/scala-2.11.1
export SPARK_HOME=/usr/local/spark
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
export PATH=$PATH:/usr/local/spark/bin

Ниже полного DAG. Это компилируется и полностью распознается Python и Airflow.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta

default_args = {
    'owner': 'me@gmail.com',
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 17),
    'email': ['me@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    'end_date': datetime(2030, 3, 17),
}

dag = DAG(dag_id = 'a_data_pipelne_job', default_args=default_args, schedule_interval='*/45 * * * *')

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

spark_submit_task = SparkSubmitOperator(
    task_id='spark_submit_job_02',
    conn_id='spark_local',
    application = "/home/me/.config/spyder-py3/DataPipelineExample.py",
    name='airflowspark-DataLoaderMongo',
    verbose=True,
    dag=dag,
)

t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)

1 Ответ

0 голосов
/ 17 апреля 2020

Мне удалось обойти эту проблему с помощью SSHOperator. Он менее уязвим к проблемам конфигурации среды, чем SparkSubmitOperator. SparkSubmit вызывается через S SH в контексте локального дома pyspark. Добавьте аргумент пути для вашего python сценария, и вы готовы к go.

dag = DAG(dag_id = 'a_pjm_data_pipelne__ssh_job', 
                  default_args=default_args, 
                  schedule_interval='*/60 * * * *',
                  params={'project_source': '/home/me/.config/spyder-py3',
                  'spark_submit': '/usr/local/spark/bin/spark-submit DataPipelineExample.py'})

templated_bash_command = """
    echo 'HOSTNAME: localhost' #To check that you are properly connected to the host
    cd {{ params.project_source }}
    {{ params.spark_submit }}
"""

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

submit_spark_task = SSHOperator(
    task_id="SSH_task",
    ssh_conn_id='ssh_default',
    command=templated_bash_command,
    dag=dag
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...