Python - Экспорт групп dask в csv - PullRequest
0 голосов
/ 14 января 2020

Я пытаюсь использовать dask, чтобы разделить большой текстовый файл на несколько CSV-файлов на основе одного из значений столбцов. Я нашел ту же проблему здесь и попытался реализовать решение, но возникли некоторые проблемы. Я начинаю с Это . Мой фрейм данных называется "df", а столбец, который я хотел бы использовать, - "Магазины". Мой код выглядит следующим образом. Я использую именно метод, описанный в предыдущем посте.

def do_to_csv(df):
    df.to_csv(df.name, sep='\t', header=True, index=False)
    return df

df.groupby(['Stores']).apply(do_to_csv, meta=df._meta).size.compute()

Код генерирует следующую ошибку.

        ---------------------------------------------------------------------------
    ValueError                                Traceback (most recent call last)
    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in apply(self, func, *args, **kwargs)
        724             try:
    --> 725                 result = self._python_apply_general(f)
        726             except Exception:

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _python_apply_general(self, f)
        744         return self._wrap_applied_output(
    --> 745             keys, values, not_indexed_same=mutated or self.mutated
        746         )

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/generic.py in _wrap_applied_output(self, keys, values, not_indexed_same)
        371         elif isinstance(v, DataFrame):
    --> 372             return self._concat_objects(keys, values, not_indexed_same=not_indexed_same)
        373         elif self.grouper.groupings is not None:

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _concat_objects(self, keys, values, not_indexed_same)
        954                 else:
    --> 955                     result = result.reindex(ax, axis=self.axis)
        956 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
        220         def wrapper(*args, **kwargs):
    --> 221             return func(*args, **kwargs)
        222 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in reindex(self, *args, **kwargs)
       3975         kwargs.pop("labels", None)
    -> 3976         return super().reindex(**kwargs)
       3977 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in reindex(self, *args, **kwargs)
       4513         return self._reindex_axes(
    -> 4514             axes, level, limit, tolerance, method, fill_value, copy
       4515         ).__finalize__(self)

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_axes(self, axes, level, limit, tolerance, method, fill_value, copy)
       3863             frame = frame._reindex_index(
    -> 3864                 index, method, copy, level, fill_value, limit, tolerance
       3865             )

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_index(self, new_index, method, copy, level, fill_value, limit, tolerance)
       3885             fill_value=fill_value,
    -> 3886             allow_dups=False,
       3887         )

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in _reindex_with_indexers(self, reindexers, fill_value, copy, allow_dups)
       4576                 allow_dups=allow_dups,
    -> 4577                 copy=copy,
       4578             )

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/internals/managers.py in reindex_indexer(self, new_axis, indexer, axis, fill_value, allow_dups, copy)
       1250         if not allow_dups:
    -> 1251             self.axes[axis]._can_reindex(indexer)
       1252 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/indexes/base.py in _can_reindex(self, indexer)
       3361         if not self.is_unique and len(indexer):
    -> 3362             raise ValueError("cannot reindex from a duplicate axis")
       3363 

    ValueError: cannot reindex from a duplicate axis

    During handling of the above exception, another exception occurred:

    ValueError                                Traceback (most recent call last)
     in 
          3     return df
          4 
    ----> 5 df.groupby(['FC']).apply(do_to_csv, meta=df._meta).size.compute()

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
        163         dask.base.compute
        164         """
    --> 165         (result,) = compute(self, traverse=False, **kwargs)
        166         return result
        167 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
        434     keys = [x.__dask_keys__() for x in collections]
        435     postcomputes = [x.__dask_postcompute__() for x in collections]
    --> 436     results = schedule(dsk, keys, **kwargs)
        437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
        438 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
         78         get_id=_thread_get_id,
         79         pack_exception=pack_exception,
    ---> 80         **kwargs
         81     )
         82 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
        484                         _execute_task(task, data)  # Re-execute locally
        485                     else:
    --> 486                         raise_exception(exc, tb)
        487                 res, worker_id = loads(res_info)
        488                 state["cache"][key] = res

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
        314     if exc.__traceback__ is not tb:
        315         raise exc.with_traceback(tb)
    --> 316     raise exc
        317 
        318 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
        220     try:
        221         task, data = loads(task_info)
    --> 222         result = _execute_task(task, data)
        223         id = get_id()
        224         result = dumps((result, id))

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
        116     elif istask(arg):
        117         func, args = arg[0], arg[1:]
    --> 118         args2 = [_execute_task(a, cache) for a in args]
        119         return func(*args2)
        120     elif not ishashable(arg):

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in (.0)
        116     elif istask(arg):
        117         func, args = arg[0], arg[1:]
    --> 118         args2 = [_execute_task(a, cache) for a in args]
        119         return func(*args2)
        120     elif not ishashable(arg):

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
        113     """
        114     if isinstance(arg, list):
    --> 115         return [_execute_task(a, cache) for a in arg]
        116     elif istask(arg):
        117         func, args = arg[0], arg[1:]

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in (.0)
        113     """
        114     if isinstance(arg, list):
    --> 115         return [_execute_task(a, cache) for a in arg]
        116     elif istask(arg):
        117         func, args = arg[0], arg[1:]

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
        117         func, args = arg[0], arg[1:]
        118         args2 = [_execute_task(a, cache) for a in args]
    --> 119         return func(*args2)
        120     elif not ishashable(arg):
        121         return arg

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/optimization.py in __call__(self, *args)
        974         if not len(args) == len(self.inkeys):
        975             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
    --> 976         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
        977 
        978     def __reduce__(self):

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in get(dsk, out, cache)
        147     for key in toposort(dsk):
        148         task = dsk[key]
    --> 149         result = _execute_task(task, cache)
        150         cache[key] = result
        151     result = _execute_task(out, cache)

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
        117         func, args = arg[0], arg[1:]
        118         args2 = [_execute_task(a, cache) for a in args]
    --> 119         return func(*args2)
        120     elif not ishashable(arg):
        121         return arg

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/utils.py in apply(func, args, kwargs)
         27 def apply(func, args, kwargs=None):
         28     if kwargs:
    ---> 29         return func(*args, **kwargs)
         30     else:
         31         return func(*args)

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(*args, **kwargs)
       4838     func = kwargs.pop("_func")
       4839     meta = kwargs.pop("_meta")
    -> 4840     df = func(*args, **kwargs)
       4841     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
       4842         if not len(df):

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/dask/dataframe/groupby.py in _groupby_slice_apply(df, grouper, key, func, *args, **kwargs)
        170     if key:
        171         g = g[key]
    --> 172     return g.apply(func, *args, **kwargs)
        173 
        174 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in apply(self, func, *args, **kwargs)
        735 
        736                 with _group_selection_context(self):
    --> 737                     return self._python_apply_general(f)
        738 
        739         return result

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _python_apply_general(self, f)
        743 
        744         return self._wrap_applied_output(
    --> 745             keys, values, not_indexed_same=mutated or self.mutated
        746         )
        747 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/generic.py in _wrap_applied_output(self, keys, values, not_indexed_same)
        370             return DataFrame()
        371         elif isinstance(v, DataFrame):
    --> 372             return self._concat_objects(keys, values, not_indexed_same=not_indexed_same)
        373         elif self.grouper.groupings is not None:
        374             if len(self.grouper.groupings) > 1:

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/groupby/groupby.py in _concat_objects(self, keys, values, not_indexed_same)
        953                     result = result.take(indexer, axis=self.axis)
        954                 else:
    --> 955                     result = result.reindex(ax, axis=self.axis)
        956 
        957         elif self.group_keys:

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
        219         @wraps(func)
        220         def wrapper(*args, **kwargs):
    --> 221             return func(*args, **kwargs)
        222 
        223         kind = inspect.Parameter.POSITIONAL_OR_KEYWORD

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in reindex(self, *args, **kwargs)
       3974         kwargs.pop("axis", None)
       3975         kwargs.pop("labels", None)
    -> 3976         return super().reindex(**kwargs)
       3977 
       3978     def drop(

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in reindex(self, *args, **kwargs)
       4512         # perform the reindex on the axes
       4513         return self._reindex_axes(
    -> 4514             axes, level, limit, tolerance, method, fill_value, copy
       4515         ).__finalize__(self)
       4516 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_axes(self, axes, level, limit, tolerance, method, fill_value, copy)
       3862         if index is not None:
       3863             frame = frame._reindex_index(
    -> 3864                 index, method, copy, level, fill_value, limit, tolerance
       3865             )
       3866 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/frame.py in _reindex_index(self, new_index, method, copy, level, fill_value, limit, tolerance)
       3884             copy=copy,
       3885             fill_value=fill_value,
    -> 3886             allow_dups=False,
       3887         )
       3888 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/generic.py in _reindex_with_indexers(self, reindexers, fill_value, copy, allow_dups)
       4575                 fill_value=fill_value,
       4576                 allow_dups=allow_dups,
    -> 4577                 copy=copy,
       4578             )
       4579 

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/internals/managers.py in reindex_indexer(self, new_axis, indexer, axis, fill_value, allow_dups, copy)
       1249         # some axes don't allow reindexing with dups
       1250         if not allow_dups:
    -> 1251             self.axes[axis]._can_reindex(indexer)
       1252 
       1253         if axis >= self.ndim:

    ~/workspace/AljResearchPython/env/AljResearchPython-1.0/runtime/lib/python3.6/site-packages/pandas/core/indexes/base.py in _can_reindex(self, indexer)
       3360         # trying to reindex on an axis with duplicates
       3361         if not self.is_unique and len(indexer):
    -> 3362             raise ValueError("cannot reindex from a duplicate axis")
       3363 
       3364     def reindex(self, target, method=None, level=None, limit=None, tolerance=None):

    ValueError: cannot reindex from a duplicate axis

Любой совет приветствуется.

...