Airflow SparkSubmitOperator - Как инициировать отправку на другом сервере - PullRequest
0 голосов
/ 14 декабря 2018

Я новичок в Airflow и Spark, и я борюсь с SparkSubmitOperator .

Наш планировщик воздушного потока и наш кластер hadoop не настроены на одном компьютере ( первый вопрос: это хорошая практика? ).

У нас много автоматических процедурчто нужно вызывать скрипты pyspark.Эти сценарии pyspark хранятся в кластере hadoop (10.70.1.35).Мешки с воздушным потоком хранятся в машине с воздушным потоком (10.70.1.22).

В настоящее время, когда мы хотим инициировать сценарий pyspark с воздушным потоком, мы используем простой BashOperator следующим образом:

cmd = "ssh hadoop@10.70.1.35 spark-submit \
   --master yarn \
   --deploy-mode cluster \
   --executor-memory 2g \
   --executor-cores 2 \
   /home/hadoop/pyspark_script/script.py"
t = BashOperator(task_id='Spark_datamodel',bash_command=cmd,dag=dag)

Работает отлично.Но мы хотели бы начать с использованием SparkSubmitOperator для запуска отправки наших скриптов pyspark .

Я попробовал это:

from airflow import DAG
from datetime import timedelta, datetime
from airflow.contrib.operators.spark_submit_operator import 
SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

dag = DAG('SPARK_SUBMIT_TEST',start_date=datetime(2018,12,10), 
schedule_interval='@daily')


sleep = BashOperator(task_id='sleep', bash_command='sleep 10',dag=dag)

_config ={'application':'hadoop@10.70.1.35:/home/hadoop/pyspark_script/test_spark_submit.py',
    'master' : 'yarn',
    'deploy-mode' : 'cluster',
    'executor_cores': 1,
    'EXECUTORS_MEM': '2G'
}

spark_submit_operator = SparkSubmitOperator(
    task_id='spark_submit_job',
    dag=dag,
    **_config)

sleep.set_downstream(spark_submit_operator) 

Синтаксис должен быть в порядке, поскольку dag непоказать как сломанныйНо когда он запускается, он выдает мне следующую ошибку:

[2018-12-14 03:26:42,600] {logging_mixin.py:95} INFO - [2018-12-14 
03:26:42,600] {base_hook.py:83} INFO - Using connection to: yarn
[2018-12-14 03:26:42,974] {logging_mixin.py:95} INFO - [2018-12-14 
03:26:42,973] {spark_submit_hook.py:283} INFO - Spark-Submit cmd: 
['spark-submit', '--master', 'yarn', '--executor-cores', '1', '--name', 
'airflow-spark', '--queue', 'root.default', 
'hadoop@10.70.1.35:/home/hadoop/pyspark_script/test_spark_submit.py']
[2018-12-14 03:26:42,977] {models.py:1760} ERROR - [Errno 2] No such 
file or directory: 'spark-submit'
Traceback (most recent call last):
      File "/home/dataetl/anaconda3/lib/python3.6/site- 
   packages/airflow/models.py", line 1659, in _run_raw_task    
    result = task_copy.execute(context=context)
      File "/home/dataetl/anaconda3/lib/python3.6/site- 
   packages/airflow/contrib/operators/spark_submit_operator.py", line 
168, 
    in execute
        self._hook.submit(self._application)
      File "/home/dataetl/anaconda3/lib/python3.6/site- 
   packages/airflow/contrib/hooks/spark_submit_hook.py", line 330, in 
submit
        **kwargs)
      File "/home/dataetl/anaconda3/lib/python3.6/subprocess.py", line 
707, 
    in __init__
        restore_signals, start_new_session)
      File "/home/dataetl/anaconda3/lib/python3.6/subprocess.py", line 
    1326, in _execute_child
        raise child_exception_type(errno_num, err_msg)
    FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit'

Вот мои вопросы:

  1. Должен ли я установить искривление на моем воздушном потоке? Я спрашиваю, потому что в этой теме я прочитал, что мне нужно скопировать hdfs-site.xml и hive-site.xml.Но, как вы можете себе представить, у меня нет ни /etc/hadoop/, ни /etc/hive/ каталогов на моей машине воздушного потока.

  2. a) Если нет , куда именно я должен скопироватьhdfs-site.xml и hive-site.xml на моей машине с воздушным потоком?

  3. b) Если да , значит ли это, что мне нужно настроить мою машину с воздушным потоком в качестве клиента?Вид пограничного узла, который не участвует в заданиях, но может использоваться для отправки действий?

  4. Тогда смогу ли я spark-submit с моего воздушного потока? Если да, тогда мне не нужно создавать соединение на Airflow, как янапример, для базы данных mysql, верно?

  5. Ох и вишня на торте: смогу ли я хранить свои скрипты pyspark в моей машине воздушного потока и spark-submit их из этоготот же воздушный поток машины.Это было бы удивительно!

Любой комментарий был бы очень полезен, даже если вы не смогли ответить на все мои вопросы ...

В любом случае заранее спасибо!:)

1 Ответ

0 голосов
/ 14 декабря 2018

Чтобы ответить на ваш первый вопрос, да, это хорошая практика.

Как вы можете использовать SparkSubmitOperator, пожалуйста, обратитесь к моему ответу на https://stackoverflow.com/a/53344713/5691525

  1. Да , вам нужны искровые разрядники на машине с воздушным потоком.
  2. -
  3. Да
  4. Нет -> Вам все еще нужно соединение, чтобы сообщить Airflow, где вы установили свои искровые двоичные файлы.Аналогично https://stackoverflow.com/a/50541640/5691525
  5. Должно работать
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...