У меня есть метка, которая создает искровое задание и выполняет определенный скрипт, расположенный в определенной директории. Есть две задачи, подобные этой. Обе эти задачи должны получить один и тот же идентификатор, сгенерированный в файле DAG, прежде чем эти задачи будут выполнены. Если я просто сохраню и передам значение только через скрипт python, идентификаторы будут другими, что нормально. Поэтому я пытаюсь передать значение в XCOM с помощью PythonOperator и задачи.
Мне нужно вытащить значения из XCOM и обновить словарь 'params' этой информацией, чтобы можно было передать ее моей задаче-искру.
Не могли бы вы мне помочь, я бью головой об стену и просто не могу понять.
Я попробовал следующее:
- создать функцию только для извлечения данных из xcom и их возврата. Назначил эту функцию переменной params, но не работает. Я не могу вернуться из функции python внутри DAG, которая использует функцию xcom_pull
- попытался присвоить пустой список и добавить к нему функцию python. а затем окончательный список, чтобы предоставить непосредственно к моей искровой задачи. Тоже не работает Пожалуйста, помогите!
Большое спасибо заранее за любую помощь, связанную с этим. Мне понадобится то же значение для этой и нескольких других задач запуска, которые могут входить в один и тот же файл DAG.
ФАЙЛ DAG
import..
from common.base_tasks import spark_task
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
dag_id='dag',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
log_level = "info"
id_info = {
"id": str(uuid.uuid1()),
"start_time": str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S,%f'))
}
# this stores the value to XCOM successfully
def store_id(**kwargs):
kwargs['ti'].xcom_push(key='id_info', value=id_info)
store_trace_task = PythonOperator(task_id='store_id', provide_context=True, python_callable=store_id, dag=dag)
extra_config = {'log_level': log_level}
config = '''{"config":"data"}'''
params = {'config': config,'extra_config': json.dumps(extra_config}
# ---------- this doesn't work ----------
pars = []
pars.append(params)
def task1_pull_params(**kwargs):
tracing = kwargs['ti'].xcom_pull(task_ids='store_trace_task')
pars.append(tracing)
# params = {
# 'parsed_config': parsed_config,
# 'extra_config': json.dumps(extra_config),
# 'trace_data': tracing
# }
# return params # return pushes to xcom, xcom_push does the same
task1_pull_params = PythonOperator(task_id='task1_pull_params', provide_context=True, python_callable=task1_pull_params, dag=dag)
store_trace_task >> task1_pull_params
# returning value from the function and calling it to assign res to the params variable below also doesn't work
# params = task1_pull_params
# this prints only what's outside of the function, i.e. params
print("===== pars =====> ", pars)
pipeline_task1 = spark_task(
name='task1',
script='app.py',
params=params,
dag=dag
)
task1_pull_params >> pipeline_task1