Airflow sparkSubmitOperator и приложение python в контейнере - PullRequest
0 голосов
/ 07 мая 2020

Мы пытаемся использовать spark_submit_operator

воздушного потока, чтобы запустить приложение-образец python, контейнеризованное с использованием следующего Spark guide и примера spark_submit_operaor из @CTiPKA в следующий поток

Мы можем запускать приложение вне воздушного потока и внутри дага с помощью BashOperator. Проблема с оператором spark_spark_submit возникает из-за «мягкой» ссылки на путь python упакованной среды через PYSPARK_PYTHON, который используется перед вызовом spark-submit.

Мы пробовали следующее:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import bdap

airflowConfig = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2019, 12, 4),
    }

dag = DAG(
    'nltk_app', default_args=airflowConfig, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

spark_submit_task = SparkSubmitOperator(
    #spark_binary='spark-submit',
    #driver_class_path='PYSPARK_PYTHON=./NLTK/nltk_env/bin/python',
    env_vars={
            'PYSPARK_PYTHON':'./NLTK/nltk_env/bin/python',
    },
    task_id='spark_submit_job',
    conn_id='spark_default',
    application='/somepath/airflow/workflows/nltk_app/spark_nltk_sample.py',
    total_executor_cores='1',
    executor_cores='1',
    executor_memory='2g',
    num_executors='2',
    name='nltk_app_in_dag',
    verbose=True,
    driver_memory='1g',
    conf={
        'spark.yarn.appMasterEnv.PYSPARK_PYTHON':'./NLTK/nltk_env/bin/python',
        'spark.yarn.appMasterEnv.NLTK_DATA':'./'
    },
    archives='/somepath/airflow/workflows/nltk_app/nltk_env.zip#NLTK,/somepath/airflow/workflows/nltk_app/tokenizers.zip#tokenizers,/somepath/airflow/workflows/nltk_app/taggers.zip#taggers',
    dag=dag,
)

t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)

Вышеупомянутое приводит к ошибке, поскольку экспорт PYTHON_PATH в оболочке отличается от вызова относительного пути через PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark-submit. По-видимому, spark-submit делает некоторые маги c с этим встроенным вызовом, который я все еще не могу взломать.

Я надеюсь, что кто-то еще попробовал это и успешно использовал spark_submit_operator и такую ​​относительную ссылку и может дайте нам подсказку, как их правильно использовать. BashOperator просто кажется менее интуитивно понятным и немного «беспорядочным», когда есть отдельный оператор для spark_submit.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...