когда я пытаюсь отправить токен-носитель в заголовке в воздушный поток api / rest, он не обрабатывает шаблон и дает имя ошибки 'task_instance' не определено
Мой код:
default_args = {
"owner": "Airflow",
"depends_on_past": False,
"start_date": days_ago(2),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"log_response":True,
"retry_delay": timedelta(minutes=5),
}
dag = DAG("PruebaServicioLocalidadesV023",default_args=default_args,tags=["prueba"],description="Prueba DAG definido en anfitrion Recoger token en /token",schedule_interval=timedelta(days=1))
token = ""
def pull_function(**context):
response_str = context["task_instance"].xcom_pull(task_ids="Get_TokenV23")
token= json.loads(response_str)["access_token"]
return token
def check(response):
if response == 200:
print("return OK")
return True
else:
print("return KO")
return False
output_token = PythonOperator(
task_id="output_token",
python_callable=pull_function,
provide_context=True,
xcom_push=True,
dag=dag,
)
t1 = SimpleHttpOperator(
task_id="Get_TokenV23",
method="POST",
http_conn_id="http_netocio",
endpoint="/token",
data={"grant_type":"password","username":"xxxxxxx@gmail.com","password":"xxxxxx"},
headers={"Content-Type": "application/x-www-form-urlencoded"},
xcom_push=True,
response_check=lambda response: response.json()["token_type"] == "bearer",
dag=dag,
)
t2 = SimpleHttpOperator(
task_id="Get_LOcalidadesMadridV23",
method="GET",
headers = {"Authorization": "Bearer" + {{task_instance.xcom_pull (task_ids = "output_token")}}},
http_conn_id="http_netocio",
endpoint="/api/LocalidadesProvincia/28",
trigger_rule = "all_done",
xcom_push=True,
response_check=lambda response: True if check(response.status_code) else False,
dag=dag,
)
t1 >> output_token >> t2