Воздушный поток: получить идентификатор предыдущей задачи в следующей задаче - PullRequest
1 голос
/ 26 мая 2019

У меня 2 задания. В первом случае оператор python что-то вычисляет, а во втором я хочу использовать вывод оператора python в операторе Http. Вот мой код:

source_list = ['account', 'sales']

for source_type in source_list:
    t2 = PythonOperator(
                task_id='compute_next_gather_time_for_' + source_type,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )

Запрос: я хочу передать предыдущий идентификатор задачи в t3 в ее переменную данных. Я не уверен, как это сделать, поскольку идентификатор задачи t2 не является постоянным. Это меняется с изменением source_type. Очевидно, когда я пытался это не сделать это.

1 Ответ

1 голос
/ 28 мая 2019

Раньше я не использовал шаблоны Jinja ни в одном из своих групп обеспечения доступности баз данных, но я столкнулся с похожими проблемами, когда мне нужно было извлечь значения XCOM из конкретной задачи, которая имеет динамически генерируемый task_id.

Вы можете определить task_ids в T3 так же, как вы определили task_id в T2. Например:

source_list = ['account', 'sales']

for source_type in source_list:

    task_id='compute_next_gather_time_for_' + source_type

    t2 = PythonOperator(
                task_id=task_id,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )
...