Мы пытаемся использовать spark_submit_operator
воздушного потока, чтобы запустить приложение-образец python, контейнеризованное с использованием следующего Spark guide и примера spark_submit_operaor из @CTiPKA в следующий поток
Мы можем запускать приложение вне воздушного потока и внутри дага с помощью BashOperator. Проблема с оператором spark_spark_submit возникает из-за «мягкой» ссылки на путь python упакованной среды через PYSPARK_PYTHON
, который используется перед вызовом spark-submit
.
Мы пробовали следующее:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import bdap
airflowConfig = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 12, 4),
}
dag = DAG(
'nltk_app', default_args=airflowConfig, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
#spark_binary='spark-submit',
#driver_class_path='PYSPARK_PYTHON=./NLTK/nltk_env/bin/python',
env_vars={
'PYSPARK_PYTHON':'./NLTK/nltk_env/bin/python',
},
task_id='spark_submit_job',
conn_id='spark_default',
application='/somepath/airflow/workflows/nltk_app/spark_nltk_sample.py',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='2',
name='nltk_app_in_dag',
verbose=True,
driver_memory='1g',
conf={
'spark.yarn.appMasterEnv.PYSPARK_PYTHON':'./NLTK/nltk_env/bin/python',
'spark.yarn.appMasterEnv.NLTK_DATA':'./'
},
archives='/somepath/airflow/workflows/nltk_app/nltk_env.zip#NLTK,/somepath/airflow/workflows/nltk_app/tokenizers.zip#tokenizers,/somepath/airflow/workflows/nltk_app/taggers.zip#taggers',
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
Вышеупомянутое приводит к ошибке, поскольку экспорт PYTHON_PATH в оболочке отличается от вызова относительного пути через PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark-submit
. По-видимому, spark-submit делает некоторые маги c с этим встроенным вызовом, который я все еще не могу взломать.
Я надеюсь, что кто-то еще попробовал это и успешно использовал spark_submit_operator и такую относительную ссылку и может дайте нам подсказку, как их правильно использовать. BashOperator просто кажется менее интуитивно понятным и немного «беспорядочным», когда есть отдельный оператор для spark_submit.