То, как вы храните token
как «глобальную» переменную, не будет работать.Файл определения Dag (сценарий, в котором вы определили задачи) - это не тот же контекст времени выполнения, что и для выполнения каждой задачи.Каждая задача может быть запущена в отдельном потоке, процессе или даже на другом компьютере, в зависимости от исполнителя.Вы передаете данные между задачами не с помощью глобальных переменных, а с помощью XCom, что вы уже частично делаете.Попробуйте выполнить следующее: - удаленная глобальная переменная token
- в pull_function
вместо print token
do return token
- это снова отправит значение в XCom, так что следующая задача сможет получить к нему доступ - получить доступ к токену из XComв вашем следующем задании.
Последний шаг немного сложен, так как вы используете SimpleHttpOperator
, и это только шаблонные поля endpoint
и data
, но не headers
.Например, если вы хотите передать data
из XCom предыдущей задачи, вы бы сделали что-то вроде этого:
get_config = SimpleHttpOperator(
task_id='get_config',
endpoint='someendpoint',
http_conn_id = 'test_conn',
dag=dag,
data='{{ task_instance.xcom_pull(task_ids="print_the_context", key="some_key") }}'
)
Но, к сожалению, вы не можете сделать то же самое с заголовками,так что вы должны либо сделать это «вручную» через PythonOperator, либо вы можете унаследовать SimpleHttpOperator
и создать свой собственный, что-то вроде:
class HeaderTemplatedHttpOperator(SimpleHttpOperator):
template_fields = ('endpoint', 'data', 'headers') # added 'headers' headers
, а затем использовать его, что-то вроде:
get_config = HeaderTemplatedHttpOperator(
task_id='get_config',
endpoint='someendpoint',
http_conn_id = 'test_conn',
dag=dag,
headers='{{ task_instance.xcom_pull(task_ids="print_the_context") }}'
)
Имейте в виду, я не проверял это, это просто для объяснения концепции.Поиграйте с подходом, и вы должны добраться туда.