Мне нужно сравнить два больших CSV и выходные данные с CSV. Я использовал панд, но это показывает предупреждение памяти. Теперь используется Dask Dataframe для чтения и объединения, а затем вывода в CSV. Но оно застряло на 15% и ничего не происходит. Вот мой код
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
dtypes = {'B_Number' : 'float64', 'Real_Length' : 'Int64'}
df = dd.read_csv("./docs/Turk_CDR.csv", parse_dates={'datetime':[0,1]}, dtype = dtypes)
df1 = dd.read_csv("./docs/Test.csv", parse_dates={'datetime':[0,1]}, dtype = dtypes)
import numpy as np
df['B_Number'] = df['B_Number'].astype(np.float64)
df1['B_Number'] = df1['B_Number'].astype(np.float64)
new_df = dd.merge(df, df1, how='outer', left_on=['datetime','B_Number'], right_on = ['datetime','B_Number'])
new_df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 4 entries, datetime to Real_Length_y
dtypes: Int64(2), datetime64[ns](1), float64(1)
new_df.to_csv("./test.csv",mode='a', chunksize=100000, compression = 'gzip')
[############ ] | 30% Completed | 5min 44.8s
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-3-fe5dee9d0fbf> in <module>
----> 1 new_df.to_csv("./test.csv",mode='a', chunksize=100000, compression = 'gzip')
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\dataframe\core.py in to_csv(self, filename, **kwargs)
1297 from .io import to_csv
1298
-> 1299 return to_csv(self, filename, **kwargs)
1300
1301 def to_json(self, filename, *args, **kwargs):
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\dataframe\io\csv.py in to_csv(df, filename, name_function, compression, compute, scheduler, storage_options, header_first_partition_only, **kwargs)
759
760 if compute:
--> 761 delayed(values).compute(scheduler=scheduler)
762 return [f.path for f in files]
763 else:
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\base.py in compute(self, **kwargs)
173 dask.base.compute
174 """
--> 175 (result,) = compute(self, traverse=False, **kwargs)
176 return result
177
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\base.py in compute(*args, **kwargs)
444 keys = [x.__dask_keys__() for x in collections]
445 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446 results = schedule(dsk, keys, **kwargs)
447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
448
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
80 get_id=_thread_get_id,
81 pack_exception=pack_exception,
---> 82 **kwargs
83 )
84
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
489 _execute_task(task, data) # Re-execute locally
490 else:
--> 491 raise_exception(exc, tb)
492 res, worker_id = loads(res_info)
493 state["cache"][key] = res
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
128 if exc.__traceback__ is not tb:
129 raise exc.with_traceback(tb)
--> 130 raise exc
131
132 import pickle as cPickle
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
231 try:
232 task, data = loads(task_info)
--> 233 result = _execute_task(task, data)
234 id = get_id()
235 result = dumps((result, id))
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
113 """
114 if isinstance(arg, list):
--> 115 return [_execute_task(a, cache) for a in arg]
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in <listcomp>(.0)
113 """
114 if isinstance(arg, list):
--> 115 return [_execute_task(a, cache) for a in arg]
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\dask\dataframe\shuffle.py in shuffle_group_3(df, col, npartitions, p)
621 g = df.groupby(col)
622 d = {i: g.get_group(i) for i in g.groups}
--> 623 p.append(d, fsync=True)
624
625
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\encode.py in append(self, data, **kwargs)
21
22 def append(self, data, **kwargs):
---> 23 data = valmap(self.encode, data)
24 data = valmap(frame, data)
25 self.partd.append(data, **kwargs)
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\toolz\dicttoolz.py in valmap(func, d, factory)
81 """
82 rv = factory()
---> 83 rv.update(zip(iterkeys(d), map(func, itervalues(d))))
84 return rv
85
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\pandas.py in serialize(df)
156
157 for block in df._data.blocks:
--> 158 h, b = block_to_header_bytes(block)
159 headers.append(h)
160 bytes.append(b)
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\pandas.py in block_to_header_bytes(block)
126
127 header = (block.mgr_locs.as_array, values.dtype, values.shape, extension)
--> 128 bytes = pnp.compress(pnp.serialize(values), values.dtype)
129 return header, bytes
130
F:\python\WPy64-3740\python-3.7.4.amd64\lib\site-packages\partd\numpy.py in serialize(x)
99 return frame(pickle.dumps(l, protocol=pickle.HIGHEST_PROTOCOL))
100 else:
--> 101 return x.tobytes()
102
103
AttributeError: 'IntegerArray' object has no attribute 'tobytes'