Как передать токен на предъявителя в Airflow - PullRequest
0 голосов
/ 07 мая 2019

У меня есть работа с 3 задачами 1) Получить токен, используя запрос POST 2) Получить значение токена и сохранить в переменной 3) Сделайте запрос GET с помощью токена из шага 2 и передайте токен на предъявителя

Проблема в том, что шаг 3 не работает, и я получаю ошибку HTTP. Я смог напечатать значение токена на шаге 2 и подтвердил код

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
token ="mytoken" //defined with some value which will be updated later

get_token = SimpleHttpOperator(
        task_id='get_token',
        method='POST',
        headers={"Authorization": "Basic xxxxxxxxxxxxxxx=="},
        endpoint='/token?username=user&password=pass&grant_type=password',
        http_conn_id = 'test_http',
        trigger_rule="all_done",
        xcom_push=True,
        dag=dag
    )

def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='get_token')
    print("printing token")
    print value
    wjdata = json.loads(value)
    print(wjdata['access_token'])
    token=wjdata['access_token']
    print token


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=pull_function,
    dag=dag,
)

get_config = SimpleHttpOperator(
        task_id='get_config',
        method='GET',
        headers={"Authorization": "Bearer " + token},
        endpoint='someendpoint',
        http_conn_id = 'test_conn',
        trigger_rule="all_done",
        xcom_push=True,
        dag=dag
    )

get_token >> run_this >> get_config

1 Ответ

1 голос
/ 08 мая 2019

То, как вы храните token как «глобальную» переменную, не будет работать.Файл определения Dag (сценарий, в котором вы определили задачи) - это не тот же контекст времени выполнения, что и для выполнения каждой задачи.Каждая задача может быть запущена в отдельном потоке, процессе или даже на другом компьютере, в зависимости от исполнителя.Вы передаете данные между задачами не с помощью глобальных переменных, а с помощью XCom, что вы уже частично делаете.Попробуйте выполнить следующее: - удаленная глобальная переменная token - в pull_function вместо print token do return token - это снова отправит значение в XCom, так что следующая задача сможет получить к нему доступ - получить доступ к токену из XComв вашем следующем задании.

Последний шаг немного сложен, так как вы используете SimpleHttpOperator, и это только шаблонные поля endpoint и data, но не headers.Например, если вы хотите передать data из XCom предыдущей задачи, вы бы сделали что-то вроде этого:

get_config = SimpleHttpOperator(
        task_id='get_config',
        endpoint='someendpoint',
        http_conn_id = 'test_conn',
        dag=dag,
        data='{{ task_instance.xcom_pull(task_ids="print_the_context", key="some_key") }}'
    )

Но, к сожалению, вы не можете сделать то же самое с заголовками,так что вы должны либо сделать это «вручную» через PythonOperator, либо вы можете унаследовать SimpleHttpOperator и создать свой собственный, что-то вроде:

class HeaderTemplatedHttpOperator(SimpleHttpOperator):
    template_fields = ('endpoint', 'data', 'headers')  # added 'headers' headers

, а затем использовать его, что-то вроде:

get_config = HeaderTemplatedHttpOperator(
        task_id='get_config',
        endpoint='someendpoint',
        http_conn_id = 'test_conn',
        dag=dag,
        headers='{{ task_instance.xcom_pull(task_ids="print_the_context") }}'
    )

Имейте в виду, я не проверял это, это просто для объяснения концепции.Поиграйте с подходом, и вы должны добраться туда.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...