Dask Dataframe .read_csv не уважает dtypes - PullRequest
0 голосов
/ 13 марта 2020

Ошибка при попытке чтения большого файла с dask.dataframe с некоторыми столбцами смешанного типа, чтобы подтвердить, что «22» и «32» - это имена фактических столбцов, следовательно, строк.

df = dd.read_csv('s3://myfile.csv'
                 , encoding = 'latin', sample=250000000, dtype={'22': str,'32': str})

df = df.compute()

Возвращает:

ValueError: Metadata mismatch found in `from_delayed`.

Partition type: `pandas.core.frame.DataFrame`
+--------------------+---------+----------+
| Column             | Found   | Expected |
+--------------------+---------+----------+
| 22                 | int64   | object   |
| 32                 | float64 | object   |
+--------------------+---------+----------+

Я не уверен, почему read_csv здесь не интерпретирует все как строку, даже если выборка показала, что столбец имеет целочисленный формат / формат с плавающей запятой, все еще должен интерпретировать как строку.

Спасибо!

Более длинное сообщение об ошибке

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-65-2e7a5a158ac4> in <module>()
----> 1 df = df.compute()

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1891                 direct=direct,
   1892                 local_worker=local_worker,
-> 1893                 asynchronous=asynchronous,
   1894             )
   1895 

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    778         else:
    779             return sync(
--> 780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )
    782 

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/dataframe/utils.py in check_meta()
    663     raise ValueError(
    664         "Metadata mismatch found%s.\n\n"
--> 665         "%s" % ((" in `%s`" % funcname if funcname else ""), errmsg)
    666     )
    667 

ValueError: Metadata mismatch found in `from_delayed`.

Partition type: `pandas.core.frame.DataFrame`
+--------------------+---------+----------+
| Column             | Found   | Expected |
+--------------------+---------+----------+
| 22                 | int64   | object   |
| 32                 | float64 | object   |
+--------------------+---------+----------+
...