Я получаю сообщение об ошибке, когда пытаюсь объединить сгенерированные макеты в кадр данных dask.
Ошибка возникает не каждый раз, хотя (возможно, один из 3-4 прогонов).
Кажется, что созданные фиктивные столбцы используйте значения столбцов без дублирования в качестве суффикса для меток столбцов столбца dummified .
Я полагаю, это связано с предоставлением большего количества метаинформации для работы, но я не удалось найти информацию о том, как это сделать.
Вот минимальный пример:
import pandas as pd
import dask.dataframe as dd
import numpy as np
pddf = pd.DataFrame({"A":["a","b","c","b","c"], "B":["yes","no","no","no","yes"],"C":[0,1,0,2,4]})
ddf = dd.from_pandas(pddf, chunksize=2)
def dummify(dataframe):
discrete_columns = dataframe.select_dtypes("object").columns
dataframe = dataframe.categorize()
dummy_data = dd.reshape.get_dummies(dataframe[discrete_columns])
dataframe = dataframe.drop(discrete_columns, axis="columns")
dataframe = dataframe.merge(dummy_data, left_index=True, right_index=True)
return dataframe
new_df = dummify(ddf)
new_df.compute()
Ошибка:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-15-40cb5a83cacc> in <module>
8
9 new_df = dummify(ddf)
---> 10 new_df.compute()
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
435 keys = [x.__dask_keys__() for x in collections]
436 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437 results = schedule(dsk, keys, **kwargs)
438 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
439
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
82 get_id=_thread_get_id,
83 pack_exception=pack_exception,
---> 84 **kwargs
85 )
86
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
484 _execute_task(task, data) # Re-execute locally
485 else:
--> 486 raise_exception(exc, tb)
487 res, worker_id = loads(res_info)
488 state["cache"][key] = res
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/local.py in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
220 try:
221 task, data = loads(task_info)
--> 222 result = _execute_task(task, data)
223 id = get_id()
224 result = dumps((result, id))
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in <genexpr>(.0)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/optimization.py in __call__(self, *args)
980 if not len(args) == len(self.inkeys):
981 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 982 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
983
984 def __reduce__(self):
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in get(dsk, out, cache)
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/utils.py in apply(func, args, kwargs)
28 def apply(func, args, kwargs=None):
29 if kwargs:
---> 30 return func(*args, **kwargs)
31 else:
32 return func(*args)
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/dataframe/core.py in apply_and_enforce(*args, **kwargs)
5080 return meta
5081 if is_dataframe_like(df):
-> 5082 check_matching_columns(meta, df)
5083 c = meta.columns
5084 else:
~/.pyenv/versions/3.7.6/lib/python3.7/site-packages/dask/dataframe/utils.py in check_matching_columns(meta, actual)
677 extra_info = "Order of columns does not match"
678 raise ValueError(
--> 679 "The columns in the computed data do not match"
680 " the columns in the provided metadata\n"
681 f"{extra_info}"
ValueError: The columns in the computed data do not match the columns in the provided metadata
Extra: ['A_no', 'A_yes', 'B_0', 'B_1']
Missing: ['A_a', 'A_b', 'A_c', 'B_no', 'B_yes']
Missing: ['A_a', 'A_b', 'A_c', 'B_no', 'B_yes']
Я был бы признателен, если бы кто-то мог помочь мне в этом.
Спасибо.