Python Dask Dataframe пишет в CSV не работает - PullRequest
0 голосов
/ 03 октября 2019

Мне нужно сравнить два больших CSV и выходные данные с CSV. Я использовал панд, но это показывает предупреждение памяти. Теперь используется Dask Dataframe для чтения и объединения, а затем вывода в CSV. Но оно застряло на 15% и ничего не происходит. Вот мой код

import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

dtypes = {'B_Number' : 'float64', 'Real_Length' : 'Int64'}
df = dd.read_csv("./docs/Turk_CDR.csv", parse_dates={'datetime':[0,1]}, dtype = dtypes)

df1 = dd.read_csv("./docs/Test.csv", parse_dates={'datetime':[0,1]}, dtype = dtypes)

import numpy as np
df['B_Number'] = df['B_Number'].astype(np.float64)
df1['B_Number'] = df1['B_Number'].astype(np.float64)

new_df = dd.merge(df, df1, how='outer', left_on=['datetime','B_Number'], right_on = ['datetime','B_Number'])

new_df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 4 entries, datetime to Real_Length_y
dtypes: Int64(2), datetime64[ns](1), float64(1)
new_df.to_csv("./test.csv",mode='a', chunksize=100000, compression = 'gzip')
[############                            ] | 30% Completed |  5min 44.8s



---------------------------------------------------------------------------

AttributeError                            Traceback (most recent call last)

<ipython-input-3-fe5dee9d0fbf> in <module>
----> 1 new_df.to_csv("./test.csv",mode='a', chunksize=100000, compression = 'gzip')


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\dataframe\core.py in to_csv(self, filename, **kwargs)
   1297         from .io import to_csv
   1298 
-> 1299         return to_csv(self, filename, **kwargs)
   1300 
   1301     def to_json(self, filename, *args, **kwargs):


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\dataframe\io\csv.py in to_csv(df, filename, name_function, compression, compute, scheduler, storage_options, header_first_partition_only, **kwargs)
    759 
    760     if compute:
--> 761         delayed(values).compute(scheduler=scheduler)
    762         return [f.path for f in files]
    763     else:


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     80         get_id=_thread_get_id,
     81         pack_exception=pack_exception,
---> 82         **kwargs
     83     )
     84 


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    489                         _execute_task(task, data)  # Re-execute locally
    490                     else:
--> 491                         raise_exception(exc, tb)
    492                 res, worker_id = loads(res_info)
    493                 state["cache"][key] = res


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
    128         if exc.__traceback__ is not tb:
    129             raise exc.with_traceback(tb)
--> 130         raise exc
    131 
    132     import pickle as cPickle


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    231     try:
    232         task, data = loads(task_info)
--> 233         result = _execute_task(task, data)
    234         id = get_id()
    235         result = dumps((result, id))


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    113     """
    114     if isinstance(arg, list):
--> 115         return [_execute_task(a, cache) for a in arg]
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
    113     """
    114     if isinstance(arg, list):
--> 115         return [_execute_task(a, cache) for a in arg]
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\dataframe\shuffle.py in shuffle_group_3(df, col, npartitions, p)
    621     g = df.groupby(col)
    622     d = {i: g.get_group(i) for i in g.groups}
--> 623     p.append(d, fsync=True)
    624 
    625 


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\encode.py in append(self, data, **kwargs)
     21 
     22     def append(self, data, **kwargs):
---> 23         data = valmap(self.encode, data)
     24         data = valmap(frame, data)
     25         self.partd.append(data, **kwargs)


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\toolz\dicttoolz.py in valmap(func, d, factory)
     81     """
     82     rv = factory()
---> 83     rv.update(zip(iterkeys(d), map(func, itervalues(d))))
     84     return rv
     85 


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\pandas.py in serialize(df)
    156 
    157     for block in df._data.blocks:
--> 158         h, b = block_to_header_bytes(block)
    159         headers.append(h)
    160         bytes.append(b)


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\pandas.py in block_to_header_bytes(block)
    126 
    127     header = (block.mgr_locs.as_array, values.dtype, values.shape, extension)
--> 128     bytes = pnp.compress(pnp.serialize(values), values.dtype)
    129     return header, bytes
    130 


F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\numpy.py in serialize(x)
     99         return frame(pickle.dumps(l, protocol=pickle.HIGHEST_PROTOCOL))
    100     else:
--> 101         return x.tobytes()
    102 
    103 


AttributeError: 'IntegerArray' object has no attribute 'tobytes'

1 Ответ

0 голосов
/ 03 октября 2019

Я бы предложил вам реализовать это MemoryError, когда я объединяю два фрейма данных Pandas , а затем пробую dask. Я также попробовал это на Google Colab, и это сработало.

...