Чтение пользовательского формата файла в Dask dataframe - PullRequest
2 голосов
/ 24 января 2020

У меня есть огромный пользовательский текстовый файл (не могу загрузить все данные в один pandas dataframe), который я хочу прочитать в Dask dataframe. Я написал генератор для чтения и разбора данных по частям и создания pandas данных. Я хочу загрузить эти pandas фреймы данных в dask-фрейм данных и выполнить операции с результирующим фреймом данных (такие как создание вычисляемых столбцов, извлечение частей фрейма данных, построение графиков и т. Д. c). Я пытался использовать сумку Dask, но не смог добиться успеха. Поэтому я решил записать полученный фрейм данных в хранилище HDFS, а затем использовать Dask для чтения из файла HDFStore. Это работало хорошо, когда я делал это со своего компьютера. Код ниже.

cc = read_custom("demo.xyz", chunks=1000) # Generator of pandas dataframes
from pandas import HDFStore
s = HDFStore("demo.h5")
for c in cc:
    s.append("data", c, format='t', append=True)
s.close()

import dask.dataframe as dd
ddf = dd.read_hdf("demo.h5", "data", chunksize=100000)
seqv = (
    (
        (ddf.sxx - ddf.syy) ** 2
        + (ddf.syy - ddf.szz) ** 2
        + (ddf.szz - ddf.sxx) ** 2
        + 6 * (ddf.sxy ** 2 + ddf.syz ** 2 + ddf.sxz ** 2)
    )
    / 2
) ** 0.5
seqv.compute()

Поскольку последнее вычисление было медленным, я решил распределить его по нескольким системам в моей локальной сети и запустил планировщик на своем компьютере и несколько рабочих в других системах. И запустил Client, как показано ниже.

from dask.distributed import Client
client = Client('mysystemip:8786') #Establishing connection with the scheduler all fine.

И затем прочитал в кадре данных Dask. Тем не менее, я получил ошибку ниже, когда я выполнил seqv.compute().

HDF5ExtError: HDF5 error back trace

  File "H5F.c", line 509, in H5Fopen
    unable to open file
  File "H5Fint.c", line 1400, in H5F__open
    unable to open file
  File "H5Fint.c", line 1615, in H5F_open
    unable to lock the file
  File "H5FD.c", line 1640, in H5FD_lock
    driver lock request failed
  File "H5FDsec2.c", line 941, in H5FD_sec2_lock
    unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'

End of HDF5 error back trace

Unable to open/create file 'demo.h5'

Я убедился, что все работники имеют доступ к файлу demo.h5. Я попытался передать в lock=False в read_hdf. Получена та же ошибка.

Разве это невозможно сделать? Может быть, попробовать другой формат файла? Я думаю, что запись каждого pandas кадра данных в отдельные файлы может работать, но я пытаюсь избежать этого (я даже не хочу промежуточный файл HDFS). Но прежде чем я доберусь до этого пути, я хотел бы узнать, есть ли какой-нибудь другой лучший способ решить проблему.

Спасибо за любые предложения!

1 Ответ

1 голос
/ 25 января 2020

Если вы хотите прочитать данные из пользовательского формата в текстовом файле, я рекомендую использовать функцию dask.bytes.read_bytes, которая возвращает список отложенных объектов, каждый из которых указывает на блок байтов из вашего файла. По умолчанию эти блоки будут четко разделены разделителем строк.

Примерно так может работать:

def parse_bytes(b: bytes) -> pandas.DataFrame:
    ...

blocks = dask.bytes.read_bytes("my-file.txt", delimiter=b"\n")
dataframes = [dask.delayed(parse_bytes)(block) for block in blocks]
df = dask.dataframe.from_delayed(dataframes)
...