Ошибка памяти Dask при группировании DF из данных паркета - PullRequest
0 голосов
/ 27 апреля 2018

Я создал набор данных паркета, прочитав данные в pandas df, используя get_dummies() для данных и записав их в файл паркета:

df = pd.read_sql(query, engine)
encoded = pd.get_dummies(df, columns=['account'])
encoded.to_parquet('encoded.parquet')

У панд df было 2,7 млн ​​строк и 4000 столбцов. Затем я прочитал данные паркета в dask df и попытался сгруппировать их:

c = Client()
df = dd.read_parquet('encoded.parquet')
result = c.compute(df.groupby(df.journal_entry).max())

В результате df составляет 600 000 строк и 4000 столбцов. У меня 32 ГБ ОЗУ на моей машине. После небольшого промежутка времени попытки вычисления возникает ошибка памяти. Это обратная связь:

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
<ipython-input-29-083067d43616> in <module>()
----> 1 result.result()

~\_installed\anaconda\lib\site-packages\distributed\client.py in result(self, timeout)
    156                       self._result, raiseit=False, callback_timeout=timeout)
    157         if self.status == 'error':
--> 158             six.reraise(*result)
    159         elif self.status == 'cancelled':
    160             raise result

~\_installed\anaconda\lib\site-packages\six.py in reraise(tp, value, tb)
    683             value = tp()
    684         if value.__traceback__ is not tb:
--> 685             raise value.with_traceback(tb)
    686         raise value
    687 

~\_installed\anaconda\lib\site-packages\zict\buffer.py in __setitem__()
     80             self.fast[key] = value
     81         else:
---> 82             self.slow[key] = value
     83 
     84     def __delitem__(self, key):

~\_installed\anaconda\lib\site-packages\zict\func.py in __setitem__()
     40 
     41     def __setitem__(self, key, value):
---> 42         self.d[key] = self.dump(value)
     43 
     44     def __contains__(self, key):

~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize_bytelist()
    342 
    343 def serialize_bytelist(x):
--> 344     header, frames = serialize(x)
    345     frames = frame_split_size(frames)
    346     if frames:

~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize()
    136         if _find_lazy_registration(name):
    137             return serialize(x)  # recurse
--> 138         header, frames = {}, [pickle.dumps(x)]
    139 
    140     return header, frames

~\_installed\anaconda\lib\site-packages\distributed\protocol\pickle.py in dumps()
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dumps()
    898 
    899     cp = CloudPickler(file,protocol)
--> 900     cp.dump(obj)
    901 
    902     return file.getvalue()

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dump()
    232         self.inject_addons()
    233         try:
--> 234             return Pickler.dump(self, obj)
    235         except RuntimeError as e:
    236             if 'recursion' in e.args[0]:

~\_installed\anaconda\lib\pickle.py in dump()
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

~\_installed\anaconda\lib\pickle.py in save()
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
    784 
    785         if state is not None:
--> 786             save(state)
    787             write(pickle.BUILD)
    788 

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_dict()
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

~\_installed\anaconda\lib\pickle.py in _batch_setitems()
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

~\_installed\anaconda\lib\pickle.py in save()
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
    784 
    785         if state is not None:
--> 786             save(state)
    787             write(pickle.BUILD)
    788 

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_tuple()
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_list()
    779 
    780         self.memoize(obj)
--> 781         self._batch_appends(obj)
    782 
    783     dispatch[list] = save_list

~\_installed\anaconda\lib\pickle.py in _batch_appends()
    803                 write(MARK)
    804                 for x in tmp:
--> 805                     save(x)
    806                 write(APPENDS)
    807             elif n:

~\_installed\anaconda\lib\pickle.py in save()
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
    784 
    785         if state is not None:
--> 786             save(state)
    787             write(pickle.BUILD)
    788 

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_tuple()
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_bytes()
    697             self.write(SHORT_BINBYTES + pack("<B", n) + obj)
    698         elif n > 0xffffffff and self.proto >= 4:
--> 699             self.write(BINBYTES8 + pack("<Q", n) + obj)
    700         else:
    701             self.write(BINBYTES + pack("<I", n) + obj)

MemoryError: 

В дополнение к этому методу я попытался пропустить шаги паркета и напрямую преобразовать панд df в dask df. Это не привело к немедленной проблеме с памятью, но оно работало в течение часа с выполнением 0% (согласно диагностической диагностике). Что я могу сделать, чтобы избежать этой ошибки памяти?

1 Ответ

0 голосов
/ 27 апреля 2018

Dask должен иметь возможность загружать целые разделы в память, а также временную память для групповых операций. В вашем случае данные были записаны в один раздел.

Предполагая, что вы записываете свои данные с помощью fastparquet, вы можете указать пандам записать свои данные в разделы:

encoded.to_parquet('encoded.parquet', row_group_offsets=200000)

с pyarrow, синтаксис немного больше громоздкий :

with pyarrow.parquet.ParquetWriter('encoded.parquet', 
        pyarrow.Table.from_pandas(encoded).schema) as writer:
    for i in range(0, 47000000, 200000):
        writer.write_table(pyarrow.Table.from_pandas(df[i:i+200000]))

Правильное число будет зависеть от того, сколько рабочих у вас есть.

Это важно, потому что количество столбцов довольно велико, и разбиение по умолчанию (в fastparquet - в pyarrow нет вообще) выполняется по количеству строк.

...