Ibis создала таблицу impala с pandas фреймом данных и получила [Ошибка 61] Соединение отклонено - PullRequest
0 голосов
/ 30 апреля 2020

После выполнения выражения sql, я преобразовываю результаты в формат pandas. Но теперь я хочу автоматически создать временную таблицу на импале, используя Apache Ibis для создания таблицы и загрузки в нее кадра данных. Следующие коды делятся на 3 фазы:

  1. фаза 1 создает пустую таблицу с определенной пользователем схемой
  2. фаза 2 создает таблицу, схема и данные которой получены из другой таблицы
  3. фаза 3 - это основная часть, которую я хочу сделать (предположим, что указан кадр данных)

и код ошибки показан ниже. Кто-нибудь знает, что случилось? Потому что конфигурации одинаковы, я не знаю, почему он получит исключение подключения при выполнении этапа 3.

Большое спасибо!

import ibis
import os
import pandas as pd

hdfs_host = os.environ.get('IBIS_TEST_NN_HOST', 'xx.xx.xx.xx')
hdfs_port = int(os.environ.get('IBIS_TEST_NN_PORT', 50070))
impala_host = os.environ.get('IBIS_TEST_IMPALA_HOST', 'xx.xx.xx.xx')
impala_port = int(os.environ.get('IBIS_TEST_IMPALA_PORT', 21050))
hdfs = ibis.hdfs_connect(host=hdfs_host, port=hdfs_port)


client = ibis.impala.connect(
    host=impala_host,
    port=impala_port,
    hdfs_client=hdfs,
)
print(hdfs)  # doctest: +ELLIPSIS
print(client)  # doctest: +ELLIPSIS

db=client.database('tmp')

table = db.test_table
schema = table.schema()
--------------------------------------------------------------------------phase 1
table_name = 'tmp.test1'
if not client.exists_table(table_name):
    db.create_table(table_name, schema=schema)
--------------------------------------------------------------------------phase 2
table_name = 'tmp.test2'
expr = table.group_by('id').size()
if not client.exists_table(table_name): 
    db.create_table(table_name,expr,format='parquet')
--------------------------------------------------------------------------phase 3
data = pd.DataFrame({'foo': [1, 2, 3, 4], 'bar': ['a', 'b', 'c', 'd']})
table_name = 'tmp.test3'
if not client.exists_table(table_name): 
    db.create_table(table_name, data)

И результаты показывают, что:

<ibis.filesystems.WebHDFS object at 0x10f2195f8>
<ibis.impala.client.ImpalaClient object at 0x10f201860>
Traceback (most recent call last):
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1239, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1285, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1234, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 964, in send
    self.connect()
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/connection.py", line 187, in connect
    conn = self._new_conn()
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/connection.py", line 172, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x10f26c2b0>: Failed to establish a new connection: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/connectionpool.py", line 725, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/urllib3/util/retry.py", line 439, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='xx.xx.xx.xx', port=50070): Max retries exceeded with url: /webhdfs/v1/tmp/ibis/pandas_600666daa0c247daba610fd5b640525e?user.name=xy.l.&op=MKDIRS (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x10f26c2b0>: Failed to establish a new connection: [Errno 61] Connection refused',))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/xy.l./Import_table/test.py", line 37, in <module>
    db.create_table(table_name, data)
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/ibis/impala/client.py", line 39, in create_table
    table_name, obj=obj, database=self.name, **kwargs
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/ibis/impala/client.py", line 1171, in create_table
    writer, to_insert = write_temp_dataframe(self, obj)
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/ibis/impala/pandas_interop.py", line 117, in write_temp_dataframe
    path = writer.write_temp_csv()
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/ibis/impala/pandas_interop.py", line 45, in write_temp_csv
    self.hdfs.mkdir(temp_hdfs_dir)
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/ibis/filesystems.py", line 511, in mkdir
    self.client.makedirs(dir_path)
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/hdfs/client.py", line 986, in makedirs
    self._mkdirs(hdfs_path, permission=permission)
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/hdfs/client.py", line 125, in api_handler
    raise err
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/hdfs/client.py", line 107, in api_handler
    **self.kwargs
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/hdfs/client.py", line 214, in _request
    **kwargs
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "/Users/xy.l./.local/share/virtualenvs/Import_table-UzARfE0y/lib/python3.6/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='xx.xx.xx.xx', port=50070): Max retries exceeded with url: /webhdfs/v1/tmp/ibis/pandas_600666daa0c247daba610fd5b640525e?user.name=xy.l.&op=MKDIRS (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x10f26c2b0>: Failed to establish a new connection: [Errno 61] Connection refused',))

С уважением, XY.Ltw

1 Ответ

0 голосов
/ 04 мая 2020

решаемая. Наша среда - CDH6.3.2, и я проверяю порт веб-интерфейса Namenode dfs.namenode.http-адрес на CM: 9870 вместо 50070. Измените подключение клиента hdfs conf. по приведенному выше коду, и он будет хорошо работать

...