КАК перенести / вытащить в / из Airflow X_COM с помощью spark_task и pythonOperator? - PullRequest
0 голосов
/ 23 марта 2019

У меня есть метка, которая создает искровое задание и выполняет определенный скрипт, расположенный в определенной директории. Есть две задачи, подобные этой. Обе эти задачи должны получить один и тот же идентификатор, сгенерированный в файле DAG, прежде чем эти задачи будут выполнены. Если я просто сохраню и передам значение только через скрипт python, идентификаторы будут другими, что нормально. Поэтому я пытаюсь передать значение в XCOM с помощью PythonOperator и задачи.

Мне нужно вытащить значения из XCOM и обновить словарь 'params' этой информацией, чтобы можно было передать ее моей задаче-искру.

Не могли бы вы мне помочь, я бью головой об стену и просто не могу понять.

Я попробовал следующее:

  • создать функцию только для извлечения данных из xcom и их возврата. Назначил эту функцию переменной params, но не работает. Я не могу вернуться из функции python внутри DAG, которая использует функцию xcom_pull
  • попытался присвоить пустой список и добавить к нему функцию python. а затем окончательный список, чтобы предоставить непосредственно к моей искровой задачи. Тоже не работает Пожалуйста, помогите!

Большое спасибо заранее за любую помощь, связанную с этим. Мне понадобится то же значение для этой и нескольких других задач запуска, которые могут входить в один и тот же файл DAG.

ФАЙЛ DAG

import..
from common.base_tasks import spark_task

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    dag_id='dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)

log_level = "info"

id_info = {
        "id": str(uuid.uuid1()),
        "start_time": str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S,%f'))
    }

# this stores the value to XCOM successfully
def store_id(**kwargs):
    kwargs['ti'].xcom_push(key='id_info', value=id_info)

store_trace_task = PythonOperator(task_id='store_id', provide_context=True, python_callable=store_id, dag=dag)           

extra_config = {'log_level': log_level}

config = '''{"config":"data"}'''
params = {'config': config,'extra_config': json.dumps(extra_config}

# ---------- this doesn't work ----------
pars = []
pars.append(params)

def task1_pull_params(**kwargs):
    tracing = kwargs['ti'].xcom_pull(task_ids='store_trace_task')
    pars.append(tracing)
    # params = {
    #     'parsed_config': parsed_config,
    #     'extra_config': json.dumps(extra_config),
    #     'trace_data': tracing
    # }
    # return params     # return pushes to xcom, xcom_push does the same

task1_pull_params = PythonOperator(task_id='task1_pull_params', provide_context=True, python_callable=task1_pull_params, dag=dag)

store_trace_task >> task1_pull_params

# returning value from the function and calling it to assign res to the params variable below also doesn't work
# params = task1_pull_params

# this prints only what's outside of the function, i.e. params
print("===== pars =====> ", pars)

pipeline_task1 = spark_task(
    name='task1',
    script='app.py',
    params=params,
    dag=dag
    )

task1_pull_params >> pipeline_task1
...