невозможно передавать параметры между задачами simplehttpconector с xcom - PullRequest
0 голосов
/ 09 мая 2020

когда я пытаюсь отправить токен-носитель в заголовке в воздушный поток api / rest, он не обрабатывает шаблон и дает имя ошибки 'task_instance' не определено

Мой код:

    default_args = {
        "owner": "Airflow",
        "depends_on_past": False,
        "start_date": days_ago(2),
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "log_response":True,
        "retry_delay": timedelta(minutes=5),
    }

    dag = DAG("PruebaServicioLocalidadesV023",default_args=default_args,tags=["prueba"],description="Prueba DAG definido en anfitrion Recoger token en /token",schedule_interval=timedelta(days=1))


    token = ""


    def pull_function(**context):

        response_str = context["task_instance"].xcom_pull(task_ids="Get_TokenV23")
        token= json.loads(response_str)["access_token"]
        return token




    def check(response):
        if response == 200:
            print("return OK")
            return True
        else:
            print("return KO")
            return False

    output_token = PythonOperator(
        task_id="output_token",
        python_callable=pull_function,
        provide_context=True,
        xcom_push=True,
        dag=dag,  
    )


    t1 = SimpleHttpOperator(
        task_id="Get_TokenV23",
        method="POST",
        http_conn_id="http_netocio",     
        endpoint="/token",
        data={"grant_type":"password","username":"xxxxxxx@gmail.com","password":"xxxxxx"},
        headers={"Content-Type": "application/x-www-form-urlencoded"}, 
        xcom_push=True,
        response_check=lambda response: response.json()["token_type"]  == "bearer",
        dag=dag,
    )

    t2 = SimpleHttpOperator(
        task_id="Get_LOcalidadesMadridV23",
        method="GET",
        headers = {"Authorization": "Bearer" + {{task_instance.xcom_pull (task_ids = "output_token")}}},    
        http_conn_id="http_netocio",
        endpoint="/api/LocalidadesProvincia/28",
        trigger_rule = "all_done",
        xcom_push=True,  
        response_check=lambda response: True if check(response.status_code) else False,
        dag=dag,
    )

    t1 >> output_token >> t2

...