Импортировать переменную воздушного потока в PySpark - PullRequest
0 голосов
/ 12 декабря 2018

В последнее время я играю с Airflow и PySpark.Я видел, что Airflow имеет ряд переменных.Моя цель - проанализировать одну из этих переменных и импортировать ее в мой скрипт pySpark.До сих пор я пытался отобразить значение переменной (сработало), но затем я не смог найти способ импортировать в pySpark (я хочу передать значение этой переменной другой переменной в моем скрипте pyspark).Я также прилагаю свой код (job_id - это переменная, о которой я говорю).

test_bash = """
export un_id={{ti.job_id}}
echo $un_id
"""

bash_task = BashOperator(
    task_id='test',
    bash_command=test_bash,
    xcom_push=True,
    provide_context=True,
    dag=dag)

def pull_function(**kwargs):
    ti = kwargs['ti']
    rt = ti.xcom_pull(task_ids='test')
    print(rt)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag
)

#############
bash_task >> pull_task

Есть идеи, как мне продолжать или я делаю что-то не так?

Ответы [ 2 ]

0 голосов
/ 13 декабря 2018

Я не пробовал то, что предложил @kaxil, но если я правильно понял ваш вопрос, вы хотите получить переменную run_id из Airflow и использовать ее в своем скрипте python (pySpark).Если это так, я предполагаю, что вы используете свою работу от BashOperator до spark-submit.При отправке искальной работы вам разрешено (вместе с вашей работой) предоставить некоторые аргументы .Эти аргументы отображаются как системные аргументы, которые вы можете увидеть, если выполните print(sys.argv) (полезно узнать, в какой позиции находится ваша переменная).Поскольку вы уже нажали на переменную bash_task, вам придется ее вытянуть.Поэтому, когда вы отправляете свое искровое задание, вы также должны добавить дополнительный аргумент, подобный этому:

cmd=""spark-submit your-pyspark-file.py {{ ti.xcom_pull("test") }}

retrieval = BashOperator(
    namespace='randomname',
    arguments=[cmd],
    name='example-dag1',
    task_id='name-you-desire',
    provide_context=True,
    get_logs=True, 
    dag=dag)

Затем, если вы выполнили print(sys.argv), вы сможете увидеть свою переменную в качестве аргумента и в вашемСценарий, к которому вы можете обратиться, используя sys.argv[1] (если он находится во второй позиции, 0, если он находится в первой и т. д.).

0 голосов
/ 12 декабря 2018

Это значение на самом деле называется run_id и может быть доступно через контекст или макросы.

В Pythonoperator это доступно через контекст, а в BashOperator это доступно через шаблонизацию jinjaв поле bash_command.

Подробнее о том, что доступно в макросах:

https://airflow.incubator.apache.org/code.html#macros

Подробнее о jinja:

https://airflow.incubator.apache.org/concepts.html#jinja-templating

from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


dag = DAG(
    dag_id='run_id',
    schedule_interval=None,
    start_date=datetime(2017, 2, 26)
)

def my_func(**kwargs):
    context = kwargs
    print(context['dag_run'].run_id)

t1 = PythonOperator(
    task_id='python_run_id',
    python_callable=my_func,
    provide_context=True,
    dag=dag
    )

t2 = BashOperator(
    task_id='bash_run_id',
    bash_command='echo {{run_id}}',
    dag=dag)

t1.set_downstream(t2)

Используйте этот знак в качестве примера и проверьте журнал для каждого оператора, вы должны увидеть run_id, напечатанный в журнале.

...