Я пытаюсь создать SimpleHttpOperator. Я впервые попробовал
task1 = SimpleHttpOperator(
task_id="task1"
, http_conn_id=http_connection_for_task_1 #Pre-created HTTP connection with an HTTPS host, no extra info.
, endpoint="v1/dataset"
, method="GET"
, headers={"Content-Type": "application/json", "x-api-key": "MyKey"}
, xcom_push=True
, dag = dag
)
Это прекрасно работает. Чтобы избежать жесткого кодирования ключа API в коде, я решил поместить заголовки в дополнительное поле определения соединения.
Сначала я попытался поставить необработанный JSON
{"Content-Type": "application/json", "x-api-key": "MyKey"}
Задача завершилась неудачно и пожаловалась
[2019-10-28 03:30:20,446] {{taskinstance.py:1058}} ERROR - Value for header {headers: {'cache-control': 'no-cache', 'content-type': 'application/json', 'x-api-key': 'MyKey'}} must be of type str or bytes, not <class 'dict'>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/requests/utils.py", line 941, in check_header_validity
if not pat.match(value):
TypeError: expected string or bytes-like object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/operators/http_operator.py", line 92, in execute
self.extra_options)
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 130, in run
prepped_request = session.prepare_request(req)
File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 462, in prepare_request
hooks=merge_hooks(request.hooks, self.hooks),
File "/usr/local/lib/python3.6/site-packages/requests/models.py", line 314, in prepare
self.prepare_headers(headers)
File "/usr/local/lib/python3.6/site-packages/requests/models.py", line 448, in prepare_headers
check_header_validity(header)
File "/usr/local/lib/python3.6/site-packages/requests/utils.py", line 945, in check_header_validity
"bytes, not %s" % (name, value, type(value)))
requests.exceptions.InvalidHeader: Value for header {headers: {'cache-control': 'no-cache', 'content-type': 'application/json', 'x-api-key': 'MyKey'}} must be of type str or bytes, not <class 'dict'>
Затем я попытался:
'{"cache-control": "no-cache", "content-type": "application/json", "x-api-key": "Ig9LMThPu7azYxTcCONF07kQJJFQrURI1hQr8xmj"}'
и получил
[2019-10-28 03:22:16,741] {{logging_mixin.py:112}} INFO - [2019-10-28 03:22:16,740] {connection.py:296} ERROR - Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/models/connection.py", line 294, in extra_dejson
obj = json.loads(self.extra)
File "/usr/local/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Я в последний раз пытался:
"{'cache-control': 'no-cache', 'content-type': 'application/json', 'x-api-key': 'Ig9LMThPu7azYxTcCONF07kQJJFQrURI1hQr8xmj'}"
и получил
[2019-10-28 04:31:54,760] {{taskinstance.py:1058}} ERROR - not enough values to unpack (expected 2, got 1)
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/operators/http_operator.py", line 92, in execute
self.extra_options)
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 103, in run
session = self.get_conn(headers)
File "/usr/local/airflow/.local/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 78, in get_conn
session.headers.update(conn.extra_dejson)
File "/usr/local/lib/python3.6/_collections_abc.py", line 846, in update
for key, value in other:
ValueError: not enough values to unpack (expected 2, got 1)
Копая код SimpleHttpOperator
и HttpHook
, я могусм. первый и второй подходы: оба не смогли проанализировать дополнительный JSON, а последний подход сумел проанализировать json, но не смог отобразить свойства json в заголовки HTTP.
Может кто-нибудь сообщить, как правильно установить дополнительную информацию JSON?
Спасибо.