Привет. У меня странное поведение от SimpleHttpOperator. Я расширил этот оператор следующим образом:
class EPOHttpOperator(SimpleHttpOperator):
"""
Operator for retrieving data from EPO API, performs token validity check,
gets a new one, if old one close to not valid.
"""
@apply_defaults
def __init__(self, entity_code, *args, **kwargs):
super().__init__(*args, **kwargs)
self.entity_code = entity_code
self.endpoint = self.endpoint + self.entity_code
def execute(self, context):
try:
token_data = json.loads(Variable.get(key="access_token_data", deserialize_json=False))
if (datetime.now() - datetime.strptime(token_data["created_at"],
'%Y-%m-%d %H:%M:%S.%f')).seconds >= 19 * 60:
Variable.set(value=json.dumps(get_EPO_access_token(), default=str), key="access_token_data")
self.headers = {
"Authorization": f"Bearer {token_data['token']}",
"Accept": "application/json"
}
super(EPOHttpOperator, self).execute(context)
except HTTPError as http_err:
logging.error(f'HTTP error occurred during getting EPO data: {http_err}')
raise http_err
except Exception as e:
logging.error(e)
raise e
И я написал простой модульный тест:
def test_get_EPO_data(requests_mock):
requests_mock.get('http://ops.epo.org/rest-services/published-data/publication/epodoc/EP1522668',
text='{"text": "test"}')
requests_mock.post('https://ops.epo.org/3.2/auth/accesstoken',
text='{"access_token":"test", "status": "we just testing"}')
dag = DAG(dag_id='test_data', start_date=datetime.now())
task = EPOHttpOperator(
xcom_push=True,
do_xcom_push=True,
http_conn_id='http_EPO',
endpoint='published-data/publication/epodoc/',
entity_code='EP1522668',
method='GET',
task_id='get_data_task',
dag=dag,
)
ti = TaskInstance(task=task, execution_date=datetime.now(), )
task.execute(ti.get_template_context())
assert ti.xcom_pull(task_ids='get_data_task') == {"text": "test"}
Тест не проходит, хотя значение XCOM из HttpHook никогда не отправляется как XCOM, я проверил, что код, отвечающий за pu sh logi c в классе ловушек, вызывается:
....
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
if self.xcom_push_flag:
return response.text
Что я сделал не так? Это ошибка?