Airflow SimpleHttpOperator - PullRequest
       71

Airflow SimpleHttpOperator

1 голос
/ 29 января 2020

Привет. У меня странное поведение от SimpleHttpOperator. Я расширил этот оператор следующим образом:

class EPOHttpOperator(SimpleHttpOperator):
    """
    Operator for retrieving data from EPO API, performs token validity check,
    gets a new one, if old one close to not valid.
    """

    @apply_defaults
    def __init__(self, entity_code, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.entity_code = entity_code
        self.endpoint = self.endpoint + self.entity_code

    def execute(self, context):
        try:
            token_data = json.loads(Variable.get(key="access_token_data", deserialize_json=False))
            if (datetime.now() - datetime.strptime(token_data["created_at"],
                                                   '%Y-%m-%d %H:%M:%S.%f')).seconds >= 19 * 60:

                Variable.set(value=json.dumps(get_EPO_access_token(), default=str), key="access_token_data")

            self.headers = {
                "Authorization": f"Bearer {token_data['token']}",
                "Accept": "application/json"
            }

            super(EPOHttpOperator, self).execute(context)

        except HTTPError as http_err:
            logging.error(f'HTTP error occurred during getting EPO data: {http_err}')
            raise http_err

        except Exception as e:
            logging.error(e)
            raise e

И я написал простой модульный тест:

def test_get_EPO_data(requests_mock):
    requests_mock.get('http://ops.epo.org/rest-services/published-data/publication/epodoc/EP1522668',
                      text='{"text": "test"}')
    requests_mock.post('https://ops.epo.org/3.2/auth/accesstoken',
                       text='{"access_token":"test", "status": "we just testing"}')

    dag = DAG(dag_id='test_data', start_date=datetime.now())
    task = EPOHttpOperator(
        xcom_push=True,
        do_xcom_push=True,
        http_conn_id='http_EPO',
        endpoint='published-data/publication/epodoc/',
        entity_code='EP1522668',
        method='GET',
        task_id='get_data_task',
        dag=dag,
    )
    ti = TaskInstance(task=task, execution_date=datetime.now(), )
    task.execute(ti.get_template_context())
    assert ti.xcom_pull(task_ids='get_data_task') == {"text": "test"}

Тест не проходит, хотя значение XCOM из HttpHook никогда не отправляется как XCOM, я проверил, что код, отвечающий за pu sh logi c в классе ловушек, вызывается:

....
  if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Response check returned False.")
  if self.xcom_push_flag:
     return response.text

Что я сделал не так? Это ошибка?

1 Ответ

0 голосов
/ 15 февраля 2020

Так что мне действительно удалось заставить его работать, установив значение xcom для результата super(EPOHttpOperator, self).execute(context).

def execute(self, context):
        try:
             .
             .
             .
            self.headers = {
                "Authorization": f"Bearer {token_data['token']}",
                "Accept": "application/json"
            }

            super(EPOHttpOperator, self).execute(context) -> Variable.set(value=super(EPOHttpOperator, self).execute(context),key='foo')

Документация вводит в заблуждение; или я все-таки что-то не так делаю?

...