Я тестирую производительность Dask, используя "Распределенные панды в кластере с Dask DataFrames" в качестве руководства.
В примере с Мэтью у него есть файл 20 ГБ и 64 рабочих (8физические узлы).
В моем случае у меня есть файл объемом 82 ГБ и 288 рабочих (12 физических узлов; на каждом из них есть узел данных HDFS).
На всех 12 узлах я могу получить доступHDFS и выполните простой скрипт Python, который отображает информацию о файле:
import pyarrow as pa
fs = pa.hdfs.connect([url], 8022)
print(str(fs.info('/path/to/file.csv')))
Если я создаю кластер с одним узлом (всего 24 рабочих), используя только машину, на которой работает Dask Scheduler, я могу прочитать файл .csvиз HDFS и выведите длину:
import dask
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
dask.config.set(hdfs_backend='pyarrow')
df = dd.read_csv('hdfs://[url]:8022/path/to/file.csv')
df = client.persist(df)
print(str(len(df)))
Эта последняя строка дает 1046250873 (хорошо!) и требует 3m17s для запуска.
Однако, когда я использую полный кластер, эта последняя строка вызываетlen(df)
умирает, и я получаю эту ошибку:
KilledWorker: ("('pandas_read_text-read-block-from-delayed-9ad3beb62f0aea4a07005d5c98749d7e', 1201)", 'tcp: // [url]: 42866 ')
Это похоже на упомянутую проблему hВот , у которого есть решение здесь , включающее Dask Yarn и конфигурацию (?), которая выглядит следующим образом: worker_env={'ARROW_LIBHDFS_DIR': ...}
Однако я не использую Yarn, хотя я предполагаю, чточто Dask Workers каким-то образом не настроены с путями HDFS / Arrow, которые им необходимы для подключения.
Я не вижу никакой документации по этому вопросу, поэтому мой вопрос относительно того, что мне не хватает.
Редактировать:
Вот трассировка ошибок, которую я вижу в выводе Dask Workers:
distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95N\x05\x00\x00\x00\x00\x00\x00(\x8c\x14dask.dataframe.utils\x94\x8c\ncheck_meta\x94\x93\x94(\x8c\x12dask
.compatibility\x94\x8c\x05apply\x94\x93\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(
\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94h\r\x8c\x08OpenFile\x94\x93\x94(\x8c\x12dask.bytes.pyarrow\x94\x8c\x17PyArrowHadoopFileSystem\x94\x93\x94)\x8
1\x94}\x94(\x8c\x02fs\x94\x8c\x0cpyarrow.hdfs\x94\x8c\x10HadoopFileSystem\x94\x93\x94(\x8c\r10.255.200.91\x94MV\x1fNN\x8c\x07libhdfs\x94Nt\x94R\x94\x8c\x08protocol\x94\x8c\x04h
dfs\x94ub\x8c\x1a/path/to/file.csv\x94\x8c\x02rb\x94NNNt\x94R\x94K\x00J\x00\x90\xd0\x03C\x01\n\x94t\x94C\x12animal,weight,age\n\x94\x8c\x08builtins\x94\x8c\x04dict\x94
\x93\x94]\x94\x86\x94h*]\x94(]\x94(\x8c\x06animal\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02O8\x94K\x00K\x01\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff
\xff\xff\xffK?t\x94be]\x94(\x8c\x06weight\x94h2\x8c\x02i8\x94K\x00K\x01\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94be]\x94(\x8c\x03age\x94h<e
e\x86\x94]\x94(h/h9h@eeh*]\x94(]\x94(\x8c\x0cwrite_header\x94\x89e]\x94(\x8c\x07enforce\x94\x89e]\x94(\x8c\x04path\x94Nee\x86\x94t\x94\x8c\x11pandas.core.frame\x94\x8c\tDataFra
me\x94\x93\x94)\x81\x94}\x94(\x8c\x05_data\x94\x8c\x15pandas.core.internals\x94\x8c\x0cBlockManager\x94\x93\x94)\x81\x94(]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_In
dex\x94\x93\x94hW\x8c\x05Index\x94\x93\x94}\x94(\x8c\x04data\x94\x8c\x15numpy.core.multiarray\x94\x8c\x0c_reconstruct\x94\x93\x94h0\x8c\x07ndarray\x94\x93\x94K\x00\x85\x94C\x01
b\x94\x87\x94R\x94(K\x01K\x03\x85\x94h5\x89]\x94(h/h9h@et\x94b\x8c\x04name\x94Nu\x86\x94R\x94hY\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(hjN\x8c\x0
5start\x94K\x00\x8c\x04stop\x94K\x00\x8c\x04step\x94K\x01u\x86\x94R\x94e]\x94(h`hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x02K\x00\x86\x94h<\x89C\x00\x94t\x94bh`hbK\x00\x85\x94hd\x
87\x94R\x94(K\x01K\x01K\x00\x86\x94h5\x89]\x94t\x94be]\x94(hYh[}\x94(h]h`hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x02\x85\x94h5\x89]\x94(h9h@et\x94bhjNu\x86\x94R\x94hYh[}\x94(h]h`
hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x01\x85\x94h5\x89]\x94h/at\x94bhjNu\x86\x94R\x94e}\x94\x8c\x060.14.1\x94}\x94(\x8c\x04axes\x94hV\x8c\x06blocks\x94]\x94(}\x94(\x8c\x06valu
es\x94hy\x8c\x08mgr_locs\x94h(\x8c\x05slice\x94\x93\x94K\x01K\x03K\x01\x87\x94R\x94u}\x94(h\x9dh\x7fh\x9eh\xa0K\x00K\x01K\x01\x87\x94R\x94ueust\x94b\x8c\x04_typ\x94\x8c\tdatafr
ame\x94\x8c\t_metadata\x94]\x94ub\x8c\x0cfrom_delayed\x94t\x94.'
Traceback (most recent call last):
File "/usr/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
return pickle.loads(x)
File "/usr/lib64/python3.6/site-packages/pyarrow/hdfs.py", line 38, in __init__
self._connect(host, port, user, kerb_ticket, driver, extra_conf)
File "pyarrow/io-hdfs.pxi", line 89, in pyarrow.lib.HadoopFileSystem._connect
File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Unable to load libjvm
Опять же, я могу использовать pyarrow дляуспешно прочитал файл из HDFS с любого из 12 узлов.