Я пытаюсь использовать dask, чтобы разделить большой текстовый файл на несколько CSV-файлов на основе одного из значений столбцов. Я нашел ту же проблему здесь и попытался реализовать решение, но возникли некоторые проблемы. Я начинаю с Это . Мой фрейм данных называется "df", а столбец, который я хотел бы использовать, - "Магазины". Мой код выглядит следующим образом. Я использую именно метод, описанный в предыдущем посте.
def do_to_csv(df):
df.to_csv(df.name, sep='\t', header=True, index=False)
return df
df.groupby(['Stores']).apply(do_to_csv, meta=df._meta).size.compute()
Код генерирует следующую ошибку.
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in apply(self, func, *args, **kwargs)
724 try:
--> 725 result = self._python_apply_general(f)
726 except Exception:
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _python_apply_general(self, f)
744 return self._wrap_applied_output(
--> 745 keys, values, not_indexed_same=mutated or self.mutated
746 )
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/generic.py in _wrap_applied_output(self, keys, values, not_indexed_same)
371 elif isinstance(v, DataFrame):
--> 372 return self._concat_objects(keys, values, not_indexed_same=not_indexed_same)
373 elif self.grouper.groupings is not None:
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _concat_objects(self, keys, values, not_indexed_same)
954 else:
--> 955 result = result.reindex(ax, axis=self.axis)
956
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
220 def wrapper(*args, **kwargs):
--> 221 return func(*args, **kwargs)
222
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in reindex(self, *args, **kwargs)
3975 kwargs.pop("labels", None)
-> 3976 return super().reindex(**kwargs)
3977
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in reindex(self, *args, **kwargs)
4513 return self._reindex_axes(
-> 4514 axes, level, limit, tolerance, method, fill_value, copy
4515 ).__finalize__(self)
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_axes(self, axes, level, limit, tolerance, method, fill_value, copy)
3863 frame = frame._reindex_index(
-> 3864 index, method, copy, level, fill_value, limit, tolerance
3865 )
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_index(self, new_index, method, copy, level, fill_value, limit, tolerance)
3885 fill_value=fill_value,
-> 3886 allow_dups=False,
3887 )
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in _reindex_with_indexers(self, reindexers, fill_value, copy, allow_dups)
4576 allow_dups=allow_dups,
-> 4577 copy=copy,
4578 )
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/internals/managers.py in reindex_indexer(self, new_axis, indexer, axis, fill_value, allow_dups, copy)
1250 if not allow_dups:
-> 1251 self.axes[axis]._can_reindex(indexer)
1252
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/indexes/base.py in _can_reindex(self, indexer)
3361 if not self.is_unique and len(indexer):
-> 3362 raise ValueError("cannot reindex from a duplicate axis")
3363
ValueError: cannot reindex from a duplicate axis
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
in
3 return df
4
----> 5 df.groupby(['FC']).apply(do_to_csv, meta=df._meta).size.compute()
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
163 dask.base.compute
164 """
--> 165 (result,) = compute(self, traverse=False, **kwargs)
166 return result
167
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
434 keys = [x.__dask_keys__() for x in collections]
435 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436 results = schedule(dsk, keys, **kwargs)
437 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
438
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
78 get_id=_thread_get_id,
79 pack_exception=pack_exception,
---> 80 **kwargs
81 )
82
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
484 _execute_task(task, data) # Re-execute locally
485 else:
--> 486 raise_exception(exc, tb)
487 res, worker_id = loads(res_info)
488 state["cache"][key] = res
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
220 try:
221 task, data = loads(task_info)
--> 222 result = _execute_task(task, data)
223 id = get_id()
224 result = dumps((result, id))
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in (.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
113 """
114 if isinstance(arg, list):
--> 115 return [_execute_task(a, cache) for a in arg]
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in (.0)
113 """
114 if isinstance(arg, list):
--> 115 return [_execute_task(a, cache) for a in arg]
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/optimization.py in __call__(self, *args)
974 if not len(args) == len(self.inkeys):
975 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 976 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
977
978 def __reduce__(self):
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/utils.py in apply(func, args, kwargs)
27 def apply(func, args, kwargs=None):
28 if kwargs:
---> 29 return func(*args, **kwargs)
30 else:
31 return func(*args)
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(*args, **kwargs)
4838 func = kwargs.pop("_func")
4839 meta = kwargs.pop("_meta")
-> 4840 df = func(*args, **kwargs)
4841 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
4842 if not len(df):
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/dataframe/groupby.py in _groupby_slice_apply(df, grouper, key, func, *args, **kwargs)
170 if key:
171 g = g[key]
--> 172 return g.apply(func, *args, **kwargs)
173
174
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in apply(self, func, *args, **kwargs)
735
736 with _group_selection_context(self):
--> 737 return self._python_apply_general(f)
738
739 return result
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _python_apply_general(self, f)
743
744 return self._wrap_applied_output(
--> 745 keys, values, not_indexed_same=mutated or self.mutated
746 )
747
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/generic.py in _wrap_applied_output(self, keys, values, not_indexed_same)
370 return DataFrame()
371 elif isinstance(v, DataFrame):
--> 372 return self._concat_objects(keys, values, not_indexed_same=not_indexed_same)
373 elif self.grouper.groupings is not None:
374 if len(self.grouper.groupings) > 1:
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _concat_objects(self, keys, values, not_indexed_same)
953 result = result.take(indexer, axis=self.axis)
954 else:
--> 955 result = result.reindex(ax, axis=self.axis)
956
957 elif self.group_keys:
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
219 @wraps(func)
220 def wrapper(*args, **kwargs):
--> 221 return func(*args, **kwargs)
222
223 kind = inspect.Parameter.POSITIONAL_OR_KEYWORD
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in reindex(self, *args, **kwargs)
3974 kwargs.pop("axis", None)
3975 kwargs.pop("labels", None)
-> 3976 return super().reindex(**kwargs)
3977
3978 def drop(
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in reindex(self, *args, **kwargs)
4512 # perform the reindex on the axes
4513 return self._reindex_axes(
-> 4514 axes, level, limit, tolerance, method, fill_value, copy
4515 ).__finalize__(self)
4516
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_axes(self, axes, level, limit, tolerance, method, fill_value, copy)
3862 if index is not None:
3863 frame = frame._reindex_index(
-> 3864 index, method, copy, level, fill_value, limit, tolerance
3865 )
3866
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_index(self, new_index, method, copy, level, fill_value, limit, tolerance)
3884 copy=copy,
3885 fill_value=fill_value,
-> 3886 allow_dups=False,
3887 )
3888
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in _reindex_with_indexers(self, reindexers, fill_value, copy, allow_dups)
4575 fill_value=fill_value,
4576 allow_dups=allow_dups,
-> 4577 copy=copy,
4578 )
4579
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/internals/managers.py in reindex_indexer(self, new_axis, indexer, axis, fill_value, allow_dups, copy)
1249 # some axes don't allow reindexing with dups
1250 if not allow_dups:
-> 1251 self.axes[axis]._can_reindex(indexer)
1252
1253 if axis >= self.ndim:
~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/indexes/base.py in _can_reindex(self, indexer)
3360 # trying to reindex on an axis with duplicates
3361 if not self.is_unique and len(indexer):
-> 3362 raise ValueError("cannot reindex from a duplicate axis")
3363
3364 def reindex(self, target, method=None, level=None, limit=None, tolerance=None):
ValueError: cannot reindex from a duplicate axis
Любой совет приветствуется.