версия воздушного потока: 1.10.10
Я хочу запустить очень простой пример искры по потоку воздуха.
Я следовал этому посту: как запустить искровой код в потоке воздуха
python код:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': 'defy',
'depends_on_past': False,
'email': ['liyiheng@qiniu.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2020, 4, 15),
'end_date': datetime(2020, 5, 15),
}
dag = DAG('test_spark', default_args=default_args, schedule_interval=timedelta(minutes=1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_deploy_client',
java_class='org.apache.spark.examples.SparkPi',
application='local:///home/qiniu/platform/spark/examples/jars/spark-examples_2.11-2.4.4.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='1',
name='airflow-wordcount',
verbose=True,
driver_memory='1g',
dag=dag,
)
t1 >> print_path_env_task >> spark_submit_task
Я создал новое соединение для спарка, config:
ConnId: spark_deploy_client
Host: yarn
Extra: {"queue": "root.default", "deploy_mode": "client", "spark_home": "", "spark_binary": "spark-submit", "namespace": "default"}
Но получил ошибку задачи спарка:
[2020-04-15 16:39:36,408] {logging_mixin.py:112} INFO - [2020-04-15 16:39:36,408] {spark_submit_hook.py:325} INFO - Spark-Submit cmd: spark-submit --master yarn --num-executors 1 --total-executor-cores 1 --executor-cores 1 --executor-memory 2g --driver-memory 1g --name airflow-wordcount --class org.apache.spark.examples.SparkPi --verbose --queue root.default local:///home/qiniu/platform/spark/examples/jars/spark-examples_2.11-2.4.4.jar
[2020-04-15 16:39:36,419] {taskinstance.py:1145} ERROR - [Errno 2] No such file or directory
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 187, in execute
self._hook.submit(self._application)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 395, in submit
**kwargs)
File "/usr/lib/python2.7/subprocess.py", line 711, in __init__
errread, errwrite)
File "/usr/lib/python2.7/subprocess.py", line 1343, in _execute_child
raise child_exception
OSError: [Errno 2] No such file or directory
Эта работа не может быть проще, я не могу заставить ее работать, и, кажется, не могу найти ответ от Google, Пожалуйста, помогите ...