Используйте HTTP-соединение воздушного потока с дополнительным - PullRequest
0 голосов
/ 28 октября 2019

Я пытаюсь создать SimpleHttpOperator. Я впервые попробовал

    task1 = SimpleHttpOperator(
        task_id="task1"
        , http_conn_id=http_connection_for_task_1  #Pre-created HTTP connection with an HTTPS host, no extra info. 
        , endpoint="v1/dataset"
        , method="GET"
        , headers={"Content-Type": "application/json", "x-api-key": "MyKey"}
        , xcom_push=True
        , dag = dag
    )

Это прекрасно работает. Чтобы избежать жесткого кодирования ключа API в коде, я решил поместить заголовки в дополнительное поле определения соединения.

Сначала я попытался поставить необработанный JSON

{"Content-Type": "application/json", "x-api-key": "MyKey"}

Задача завершилась неудачно и пожаловалась

[2019-10-28 03:30:20,446] {{taskinstance.py:1058}} ERROR - Value for header {headers: {'cache-control': 'no-cache', 'content-type': 'application/json', 'x-api-key': 'MyKey'}} must be of type str or bytes, not <class 'dict'>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/requests/utils.py", line 941, in check_header_validity
    if not pat.match(value):
TypeError: expected string or bytes-like object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/operators/http_operator.py", line 92, in execute
    self.extra_options)
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 130, in run
    prepped_request = session.prepare_request(req)
  File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 462, in prepare_request
    hooks=merge_hooks(request.hooks, self.hooks),
  File "/usr/local/lib/python3.6/site-packages/requests/models.py", line 314, in prepare
    self.prepare_headers(headers)
  File "/usr/local/lib/python3.6/site-packages/requests/models.py", line 448, in prepare_headers
    check_header_validity(header)
  File "/usr/local/lib/python3.6/site-packages/requests/utils.py", line 945, in check_header_validity
    "bytes, not %s" % (name, value, type(value)))
requests.exceptions.InvalidHeader: Value for header {headers: {'cache-control': 'no-cache', 'content-type': 'application/json', 'x-api-key': 'MyKey'}} must be of type str or bytes, not <class 'dict'>

Затем я попытался:

'{"cache-control": "no-cache", "content-type": "application/json", "x-api-key": "Ig9LMThPu7azYxTcCONF07kQJJFQrURI1hQr8xmj"}'

и получил

[2019-10-28 03:22:16,741] {{logging_mixin.py:112}} INFO - [2019-10-28 03:22:16,740] {connection.py:296} ERROR - Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/models/connection.py", line 294, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Я в последний раз пытался:

"{'cache-control': 'no-cache', 'content-type': 'application/json', 'x-api-key': 'Ig9LMThPu7azYxTcCONF07kQJJFQrURI1hQr8xmj'}"

и получил

[2019-10-28 04:31:54,760] {{taskinstance.py:1058}} ERROR - not enough values to unpack (expected 2, got 1)
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/operators/http_operator.py", line 92, in execute
    self.extra_options)
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 103, in run
    session = self.get_conn(headers)
  File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 78, in get_conn
    session.headers.update(conn.extra_dejson)
  File "/usr/local/lib/python3.6/_collections_abc.py", line 846, in update
    for key, value in other:
ValueError: not enough values to unpack (expected 2, got 1)

Копая код SimpleHttpOperator и HttpHook, я могусм. первый и второй подходы: оба не смогли проанализировать дополнительный JSON, а последний подход сумел проанализировать json, но не смог отобразить свойства json в заголовки HTTP.

Может кто-нибудь сообщить, как правильно установить дополнительную информацию JSON?

Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...