Dask: ValueError после get_dummies и слияния - PullRequest
0 голосов
/ 16 апреля 2020

Я получаю сообщение об ошибке, когда пытаюсь объединить сгенерированные макеты в кадр данных dask.
Ошибка возникает не каждый раз, хотя (возможно, один из 3-4 прогонов).
Кажется, что созданные фиктивные столбцы используйте значения столбцов без дублирования в качестве суффикса для меток столбцов столбца dummified .
Я полагаю, это связано с предоставлением большего количества метаинформации для работы, но я не удалось найти информацию о том, как это сделать.

Вот минимальный пример:

import pandas as pd
import dask.dataframe as dd
import numpy as np

pddf = pd.DataFrame({"A":["a","b","c","b","c"], "B":["yes","no","no","no","yes"],"C":[0,1,0,2,4]})
ddf = dd.from_pandas(pddf, chunksize=2)

def dummify(dataframe):
    discrete_columns = dataframe.select_dtypes("object").columns
    dataframe = dataframe.categorize()
    dummy_data = dd.reshape.get_dummies(dataframe[discrete_columns])
    dataframe = dataframe.drop(discrete_columns, axis="columns")
    dataframe = dataframe.merge(dummy_data, left_index=True, right_index=True)
    return dataframe

new_df = dummify(ddf)
new_df.compute()

Ошибка:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-15-40cb5a83cacc> in <module>
      8 
      9 new_df = dummify(ddf)
---> 10 new_df.compute()

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     82         get_id=_thread_get_id,
     83         pack_exception=pack_exception,
---> 84         **kwargs
     85     )
     86 

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in <genexpr>(.0)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/optimization.py in __call__(self, *args)
    980         if not len(args) == len(self.inkeys):
    981             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 982         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    983 
    984     def __reduce__(self):

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in get(dsk, out, cache)
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/utils.py in apply(func, args, kwargs)
     28 def apply(func, args, kwargs=None):
     29     if kwargs:
---> 30         return func(*args, **kwargs)
     31     else:
     32         return func(*args)

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/dataframe/core.py in apply_and_enforce(*args, **kwargs)
   5080             return meta
   5081         if is_dataframe_like(df):
-> 5082             check_matching_columns(meta, df)
   5083             c = meta.columns
   5084         else:

~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/dataframe/utils.py in check_matching_columns(meta, actual)
    677             extra_info = "Order of columns does not match"
    678         raise ValueError(
--> 679             "The columns in the computed data do not match"
    680             " the columns in the provided metadata\n"
    681             f"{extra_info}"

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['A_no', 'A_yes', 'B_0', 'B_1']
  Missing: ['A_a', 'A_b', 'A_c', 'B_no', 'B_yes']


  Missing: ['A_a', 'A_b', 'A_c', 'B_no', 'B_yes']

Я был бы признателен, если бы кто-то мог помочь мне в этом.
Спасибо.

...