Справочная информация. Я создал новую группу обеспечения доступности заданий / задач Airflow, в которой я использую SparkSubmitOperator. Я использую Spark и Airflow на своем рабочем столе (версии и c. Ниже). DAG работает нормально, пока не достигнет части Submit для задания Spark. Я пытался изменить соединение, используя следующие параметры. Независимо от того, что я пытаюсь сделать, я получаю следующее сообщение в журнале воздушного потока.
Airflow распознает соединение и пытается использовать его, но не удается.
Если я отправляю целевой DataPipelineExample.py из команды подсказка, он запускается без проблем.
Вопрос: Что мешает Airflow распознать и использовать подключение к локальному искру для выполнения искровой отправки?
Airflow.exceptions. AirflowException: Невозможно выполнить: spark-submit --master http://localhost: 4040 --name mySparkSubmitJob
Рабочий стол: Linux Mint VERSION = "19.3 (Tricia)" Spark: версия 2.4 .5 Pyspark: версия 2.4.5 Airflow: Версия: 1.10.9 Python 3.7.4 (по умолчанию, 13 августа 2019, 20:35:49) java версия "1.8.0_241"
Airflow Используемые или пробные подключения localhost 4040 spark: // localhost 4040 http://localhost: 4040 http://specific IP-адрес: 4040 Хост: localhost Порт: 4040 / Доп., Без доп. Et c. Дополнительно: {"root .default" ,: "spark_home": "", "spark_binary": "spark-submit", "namespace": "default"}
Информация о пути
export SCALA_HOME=~/anaconda3/share/scala-2.11.1
export SPARK_HOME=/usr/local/spark
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
export PATH=$PATH:/usr/local/spark/bin
Ниже полного DAG. Это компилируется и полностью распознается Python и Airflow.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': 'me@gmail.com',
'depends_on_past': False,
'start_date': datetime(2020, 3, 17),
'email': ['me@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2030, 3, 17),
}
dag = DAG(dag_id = 'a_data_pipelne_job', default_args=default_args, schedule_interval='*/45 * * * *')
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job_02',
conn_id='spark_local',
application = "/home/me/.config/spyder-py3/DataPipelineExample.py",
name='airflowspark-DataLoaderMongo',
verbose=True,
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)