Как получить значение xcom в обычной функции или как использовать pubsubOperator - PullRequest
0 голосов
/ 28 июня 2019

Я пытаюсь передать значение xcom в нормальную функцию, но оно передает фактическое значение

Я пытался использовать ниже, используя пример кода

def getArgsForPractice(practice, messageId, status_result):
    practice_args = dict()
    practice_args['practice_id'] = practice
    practice_args['message_id'] = messageId
    practice_args['status'] = status_result
    practice_args_json = json.dumps(practice_args)
    message = {'data': base64.b64encode(practice_args_json.encode('utf-8')).decode()}
    return message

PubSubPublishSuccess = PubSubPublishOperator(task_id='publish-messages_success',
    topic=PUB_SUB_TOPIC,
    project=PROJECT_ID,
    messages=[
        getArgsForPractice(
            "{{ task_instance.xcom_pull('get_practice_id_task', key='return_value')[0]}}",
            "{{ task_instance.xcom_pull('get_measure_id_task', key='return_value')[0]}}",
            "SUCCESS"
        )
    ],
    dag=dag)

Я видел значение ниже, как этовведите код здесь:

{"practice_id": "{{ task_instance.xcom_pull('get_practice_id_task',     key='return_value')[0]}}", "message_id": "{{ task_instance.xcom_pull('get_measure_id_task', key='return_value')[0]}}", "status": "SUCCESS"} │ 599454601822320 │

1 Ответ

0 голосов
/ 03 июля 2019

Проблема возникает потому, что сначала вызывается функция getArgsForPractice с любым передаваемым аргументом. Вывод отправляется как шаблонное поле оператору. Поскольку ваш вывод из функции закодирован в base64, оператор не находит в нем никаких переменных шаблона.

Решение состоит в том, чтобы использовать PythonOperator между ними для анализа данных и получения их в требуемом формате. См. Код ниже.

def getArgsForPractice(**context):
    practice = context.get('ti').xcom_pull('get_practice_id_task')[0]
    messageId = context.get('ti').xcom_pull('get_measure_id_task')[0]
    practice_args = dict()
    practice_args['practice_id'] = practice
    practice_args['message_id'] = messageId
    practice_args['status'] = "SUCCESS"
    practice_args_json = json.dumps(practice_args)
    message = base64.b64encode(practice_args_json.encode('utf-8')).decode()
    return message

middle_task = PythonOperator(
    dag=dag,
    task_id='middle_task',
    python_callable=getArgsForPractice,
    provide_context=True
)

PubSubPublishSuccess = PubSubPublishOperator(task_id='publish-messages_success',
    topic=PUB_SUB_TOPIC,
    project=PROJECT_ID,
    messages=[{"data": "{{task_instance.xcom_pull('middle_task')}}"}],
    dag=dag)

middle_task >> PubSubPublishSuccess

Обратите внимание, что я не использовал key='return_value' ни для какого xcom_pull, потому что это значение по умолчанию.

...