Я пытаюсь использовать шаблонное значение в заголовках моего http-запроса, используя мой пользовательский http-оператор (расширяет simpleHttpOperator). Кажется, что шаблоны поддерживаются только в поле данных. Как можно реализовать то же самое в поле заголовков. Я хотел передать шаблон авторизации заголовка. Пожалуйста, найдите мой код ниже.
import airflow
from airflow import DAG
from airflow.configuration import conf
from airflow.operators.python_operator import PythonOperator
from airflow.operators.auth_plugins import SipAuthOperator
from airflow.operators.util_plugins import AuthUtility
DEFAULT_VERSION = '2.0'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
DAG_ID = 'test_dag'
dag = DAG(DAG_ID, default_args=default_args,
schedule_interval=None,
catchup=False)
dag.doc_md = __doc__
auth_endpoint = conf['auth_apis']['authenticate_end_point']
def inspect_params(**context):
token = context['task_instance'].xcom_push(key='JWT_TOKEN',value='helloooo'
)
print(token)
test_operator = PythonOperator(dag=dag,task_id='init_api',
python_callable=inspect_params,
provide_context=True, )
# data={'token':'{{task_instance.xcom_pull(key=\'JWT_TOKEN\')}}'}
# {'Authorization':'Bearer '+'{{task_instance.xcom_pull(key=\'JWT_TOKEN\')}}'
http_operator = SipAuthOperator( dag=dag,task_id='authenticate_api',http_conn_id='auth_api',endpoint=auth_endpoint,headers={'token':'{{task_instance.xcom_pull(key=\'JWT_TOKEN\')}}'})
test_operator >> http_operator
Значение заголовка, полученное как {'token': "{{task_instance.xcom_pull(key='JWT_TOKEN')}}"}
, которое не является таким же, как desred. Если я введу то же значение в поле данных, оно будет работать нормально, как и ожидалось. Поддерживается ли использование шаблонов jinja в заголовках? Любой способ обойти эту проблему?