dask dataframe из read_csv с использованием ненадежного поведения движка python - PullRequest
1 голос
/ 28 мая 2020

мои данные представляют собой файл размером 10 ГБ в следующем формате:

[ 1234567890 ][ 2020052701020201 ][ value1 ][ value2 ][ key3 = value3 ]...[ keyn = valuen ]

Примечание:

  1. Может быть любое количество блоков [key = value].
  2. символы [ и ] находятся в самих значениях, например: [ hello = wo[rld] ]
  3. У меня нет контроля над файлом abinput, за исключением того, что я могу изменить / обработать его в моем скрипте.
  4. Мне нужно только несколько столбцов, однако в них есть символы [ и ] в значениях.

В моей простой функции for line in f: я могу разделить на ' ][ ' узор. однако, учитывая размер файла, dask очень прибылен.

Я знаю, что с engine='c' у меня не может быть многозначного разделителя, но переход на engine='python' приводит к непредсказуемым результатам. вот пример:

def init_ddf(filename):
    return ddf.read_csv(
        filename,
        blocksize="1GB",
        sep="]",
        usecols=[1, 8],
        na_filter=False,
        names=["hello", World" ],
        engine="c",
    )

Приведенный выше код, как и ожидалось, приводит к ParserError: Too many columns specified: expected 25 and found 24. Эту ошибку очень сложно воспроизвести, так как она возникает только из-за некоторых c строк, которые мне сложно идентифицировать. Это не происходит каждый раз, когда появляются новые столбцы. Итак, в приведенной выше функции я изменил: engine="python" и sep=" \]\[ ". Это работает с небольшими выборками данных, с которыми я тестирую. но в файле 10G я получаю следующее непредсказуемое поведение:

def init_pyddf(filename, usecols, names):
    return ddf.read_csv(
        filename,
        blocksize="1GB",
        sep=" \]\[ ",
        usecols=usecols,
        na_filter=False,
        names=names,
        engine="python",
    )
In [50]: !head   /tmp/foo /tmp/bar
==> /tmp/foo <==
[ 1234567890 ][ 2020052701020201 ][ value1 ][ value2 ][ key3 = value3 ][ keyn = valuen ]
[ 1590471107 ][ 20200526T0731460 ][ THEOQQ ][ e = CL ][ Even = 175134 ][ rded = a12344 ][ blah = INVALID ][ N = T ][ ED = 13606 ]                       

==> /tmp/bar <==
[ 1234567890 ][ 2020052701020201 ][ value1 ][ value2 ][ key3 = value3 ][ keyn = valuen ]
[ 1590471107 ][ 20200526T0731460 ][ THEOQQ ][ e = CL ][ Even = 175134 ][ rded = a12344 ]

In [51]: init_pyddf("/tmp/foo", [1,2], ["time", "name"]).compute()
Out[51]: 
                                               time             name
[ 1234567890 2020052701020201 value1  key3 = value3  keyn = valuen ]
[ 1590471107 20200526T0731460 THEOQQ  Even = 175134    rded = a12344

In [52]: init_pyddf("/tmp/bar", [1,2], ["time", "name"]).compute()
Out[52]: 
               time    name
0  2020052701020201  value1
1  20200526T0731460  THEOQQ

Еще несколько примеров:

In [110]: !cat /tmp/dummy
[ 0 ][ 000000000000000000000000000 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ]
[ 1 ][ 20200526T073146.901861+0200 ][ T ][ E ][ E ][ F ][ W ][ N ][ E ][ E ][ 5 ]

In [111]: init_pyddf("/tmp/dummy", [1,7], ["time", "name"]).compute().head()
Out[111]: 
    time name
[ 0    0    0
[ 1    T    E

In [112]: !cat /tmp/dummy
[ 0 ][ 000000000000000000000000000 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ]
[ 1 ][ 20200526T073146.901861+0200 ][ T ][ E ][ E ][ F ][ W ][ N ][ E ][ E ]

In [113]: init_pyddf("/tmp/dummy", [1,7], ["time", "name"]).compute().head()
Out[113]: 
                          time name
0  000000000000000000000000000    0
1  20200526T073146.901861+0200    N

In [119]: !cat /tmp/dummy
[ 0 ][ 000000000000000 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ]
[ 1 ][ 20200526T073146 ][ T ][ D ][ F ][ W ][ e ][ E ][ E ][ I ][ T ][ T ][ S ][ S ][ B ][ A ][ E ][ F ][ S ][ P][ T = Y ][ 0 ]

In [120]: init_pyddf("/tmp/dummy", [1,7], ["time", "name"]).compute()
Out[120]: 
                                           time  name
[ 0 000000000000000 0 0 0 0 0 0 0 0 ] NaN  None  None
[ 1 20200526T073146 T D F W e E E I   T       S     S

1 Ответ

0 голосов
/ 13 июня 2020

Учитывая, что у вас есть более сложный текстовый формат файла, вы можете сначала начать с Dask Bag, использовать обычные Python функции для создания python словарей, а затем преобразовать этот Bag в Dask Dataframe с to_dataframe метод.

import dask.bag

b = dask.bag.read_text("my-files.*.txt")

def parse(line: str) -> dict:
    ...

records = b.map(parse)
df = b.to_dataframe()
...