Я создал набор данных паркета, прочитав данные в pandas df, используя get_dummies()
для данных и записав их в файл паркета:
df = pd.read_sql(query, engine)
encoded = pd.get_dummies(df, columns=['account'])
encoded.to_parquet('encoded.parquet')
У панд df было 2,7 млн строк и 4000 столбцов. Затем я прочитал данные паркета в dask df и попытался сгруппировать их:
c = Client()
df = dd.read_parquet('encoded.parquet')
result = c.compute(df.groupby(df.journal_entry).max())
В результате df составляет 600 000 строк и 4000 столбцов. У меня 32 ГБ ОЗУ на моей машине. После небольшого промежутка времени попытки вычисления возникает ошибка памяти. Это обратная связь:
---------------------------------------------------------------------------
MemoryError Traceback (most recent call last)
<ipython-input-29-083067d43616> in <module>()
----> 1 result.result()
~\_installed\anaconda\lib\site-packages\distributed\client.py in result(self, timeout)
156 self._result, raiseit=False, callback_timeout=timeout)
157 if self.status == 'error':
--> 158 six.reraise(*result)
159 elif self.status == 'cancelled':
160 raise result
~\_installed\anaconda\lib\site-packages\six.py in reraise(tp, value, tb)
683 value = tp()
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
687
~\_installed\anaconda\lib\site-packages\zict\buffer.py in __setitem__()
80 self.fast[key] = value
81 else:
---> 82 self.slow[key] = value
83
84 def __delitem__(self, key):
~\_installed\anaconda\lib\site-packages\zict\func.py in __setitem__()
40
41 def __setitem__(self, key, value):
---> 42 self.d[key] = self.dump(value)
43
44 def __contains__(self, key):
~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize_bytelist()
342
343 def serialize_bytelist(x):
--> 344 header, frames = serialize(x)
345 frames = frame_split_size(frames)
346 if frames:
~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize()
136 if _find_lazy_registration(name):
137 return serialize(x) # recurse
--> 138 header, frames = {}, [pickle.dumps(x)]
139
140 return header, frames
~\_installed\anaconda\lib\site-packages\distributed\protocol\pickle.py in dumps()
49 except Exception:
50 try:
---> 51 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dumps()
898
899 cp = CloudPickler(file,protocol)
--> 900 cp.dump(obj)
901
902 return file.getvalue()
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dump()
232 self.inject_addons()
233 try:
--> 234 return Pickler.dump(self, obj)
235 except RuntimeError as e:
236 if 'recursion' in e.args[0]:
~\_installed\anaconda\lib\pickle.py in dump()
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
~\_installed\anaconda\lib\pickle.py in save()
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
784
785 if state is not None:
--> 786 save(state)
787 write(pickle.BUILD)
788
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_dict()
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
~\_installed\anaconda\lib\pickle.py in _batch_setitems()
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
~\_installed\anaconda\lib\pickle.py in save()
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
784
785 if state is not None:
--> 786 save(state)
787 write(pickle.BUILD)
788
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_tuple()
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_list()
779
780 self.memoize(obj)
--> 781 self._batch_appends(obj)
782
783 dispatch[list] = save_list
~\_installed\anaconda\lib\pickle.py in _batch_appends()
803 write(MARK)
804 for x in tmp:
--> 805 save(x)
806 write(APPENDS)
807 elif n:
~\_installed\anaconda\lib\pickle.py in save()
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
784
785 if state is not None:
--> 786 save(state)
787 write(pickle.BUILD)
788
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_tuple()
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_bytes()
697 self.write(SHORT_BINBYTES + pack("<B", n) + obj)
698 elif n > 0xffffffff and self.proto >= 4:
--> 699 self.write(BINBYTES8 + pack("<Q", n) + obj)
700 else:
701 self.write(BINBYTES + pack("<I", n) + obj)
MemoryError:
В дополнение к этому методу я попытался пропустить шаги паркета и напрямую преобразовать панд df в dask df. Это не привело к немедленной проблеме с памятью, но оно работало в течение часа с выполнением 0% (согласно диагностической диагностике). Что я могу сделать, чтобы избежать этой ошибки памяти?