не удалось выполнить искру воздушного потока - PullRequest
0 голосов
/ 15 апреля 2020

версия воздушного потока: 1.10.10

Я хочу запустить очень простой пример искры по потоку воздуха.

Я следовал этому посту: как запустить искровой код в потоке воздуха

python код:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta

default_args = {
    'owner': 'defy',
    'depends_on_past': False,
    'email': ['liyiheng@qiniu.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'start_date': datetime(2020, 4, 15),
    'end_date': datetime(2020, 5, 15),
}

dag = DAG('test_spark', default_args=default_args, schedule_interval=timedelta(minutes=1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

spark_submit_task = SparkSubmitOperator(
    task_id='spark_submit_job',
    conn_id='spark_deploy_client',
    java_class='org.apache.spark.examples.SparkPi',
    application='local:///home/qiniu/platform/spark/examples/jars/spark-examples_2.11-2.4.4.jar',
    total_executor_cores='1',
    executor_cores='1',
    executor_memory='2g',
    num_executors='1',
    name='airflow-wordcount',
    verbose=True,
    driver_memory='1g',
    dag=dag,
)

t1 >> print_path_env_task >> spark_submit_task

Я создал новое соединение для спарка, config:

ConnId: spark_deploy_client
Host: yarn
Extra: {"queue": "root.default", "deploy_mode": "client", "spark_home": "", "spark_binary": "spark-submit", "namespace": "default"}

Но получил ошибку задачи спарка:

[2020-04-15 16:39:36,408] {logging_mixin.py:112} INFO - [2020-04-15 16:39:36,408] {spark_submit_hook.py:325} INFO - Spark-Submit cmd: spark-submit --master yarn --num-executors 1 --total-executor-cores 1 --executor-cores 1 --executor-memory 2g --driver-memory 1g --name airflow-wordcount --class org.apache.spark.examples.SparkPi --verbose --queue root.default local:///home/qiniu/platform/spark/examples/jars/spark-examples_2.11-2.4.4.jar
[2020-04-15 16:39:36,419] {taskinstance.py:1145} ERROR - [Errno 2] No such file or directory
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 187, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 395, in submit
    **kwargs)
  File "/usr/lib/python2.7/subprocess.py", line 711, in __init__
    errread, errwrite)
  File "/usr/lib/python2.7/subprocess.py", line 1343, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory

Эта работа не может быть проще, я не могу заставить ее работать, и, кажется, не могу найти ответ от Google, Пожалуйста, помогите ...

1 Ответ

0 голосов
/ 22 апреля 2020

Я отказываюсь от использования SparkSubmitOperator, я не могу заставить его работать.

Использование BashOperator и запуск spark-submit с абсолютным путем работает нормально, или пишу прогон. sh для вызова spark-submit довольно легко.

...