Как преобразовать координатные столбцы в столбец Point с Shapely и Dask? - PullRequest
0 голосов
/ 27 июня 2019

У меня следующая проблема.Мои данные - это огромный фрейм данных, похожий на этот (это заголовок фрейма данных)

import pandas
import dask.dataframe as dd

data = dd.read_csv(data_path)
data.persist()

print(data.head())


    Gitter_ID_100m  x_mp_100m   y_mp_100m   Einwohner
0   100mN26840E43341    4334150     2684050     -1
1   100mN26840E43342    4334250     2684050     -1
2   100mN26840E43343    4334350     2684050     -1
3   100mN26840E43344    4334450     2684050     -1
4   100mN26840E43345    4334550     2684050     -1

Я использую Dask для его обработки.Теперь я хочу создать новый столбец, в котором «x_mp_100m» и «y_mp_100m» конвертируются в Shapely Point.Для одной строки это выглядело бы так:

from shapely.geometry import Point

test_df = data.head(1)
test_df = test_df.assign(geom=lambda k: Point(k.x_mp_100m,k.y_mp_100m))
print(test_df)


    Gitter_ID_100m  x_mp_100m   y_mp_100m   Einwohner   geom
0   100mN26840E43341    4334150     2684050     -1  POINT (4334150 2684050)

Я уже пробовал следующий код с Dask:

data_out = data.map_partitions(lambda df: df.assign(geom= lambda k: Point(k.x_mp_100m,k.y_mp_100m)), meta=pd.DataFrame)

При этом я получаю следующую ошибку:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-17-b8de11d9b9b3> in <module>
----> 1 data_out.compute()

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    395     keys = [x.__dask_keys__() for x in collections]
    396     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397     results = schedule(dsk, keys, **kwargs)
    398     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    399 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2319             try:
   2320                 results = self.gather(packed, asynchronous=asynchronous,
-> 2321                                       direct=direct)
   2322             finally:
   2323                 for f in futures.values():

~\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1653             return self.sync(self._gather, futures, errors=errors,
   1654                              direct=direct, local_worker=local_worker,
-> 1655                              asynchronous=asynchronous)
   1656 
   1657     @gen.coroutine

~\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\client.py in sync(self, func, *args, **kwargs)
    671             return future
    672         else:
--> 673             return sync(self.loop, func, *args, **kwargs)
    674 
    675     def __repr__(self):

~\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

~\AppData\Local\Continuum\anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

~\AppData\Local\Continuum\anaconda3\lib\site-packages\tornado\gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~\AppData\Local\Continuum\anaconda3\lib\site-packages\tornado\gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\client.py in _gather(self, futures, errors, direct, local_worker)
   1498                             six.reraise(type(exception),
   1499                                         exception,
-> 1500                                         traceback)
   1501                     if errors == 'skip':
   1502                         bad_keys.add(key)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\core.py in apply_and_enforce()
   3682 
   3683     Ensures the output has the same columns, even if empty."""
-> 3684     df = func(*args, **kwargs)
   3685     if isinstance(df, (pd.DataFrame, pd.Series, pd.Index)):
   3686         if len(df) == 0:

<ipython-input-16-d5710cb00158> in <lambda>()
----> 1 data_out = data.map_partitions(lambda df: df.assign(geom= lambda k: Point(k.x_mp_100m,k.y_mp_100m)), meta=pd.DataFrame)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\frame.py in assign()
   3549         if PY36:
   3550             for k, v in kwargs.items():
-> 3551                 data[k] = com.apply_if_callable(v, data)
   3552         else:
   3553             # <= 3.5: do all calculations first...

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\common.py in apply_if_callable()
    327 
    328     if callable(maybe_callable):
--> 329         return maybe_callable(obj, **kwargs)
    330 
    331     return maybe_callable

<ipython-input-16-d5710cb00158> in <lambda>()
----> 1 data_out = data.map_partitions(lambda df: df.assign(geom= lambda k: Point(k.x_mp_100m,k.y_mp_100m)), meta=pd.DataFrame)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\shapely\geometry\point.py in __init__()
     47         BaseGeometry.__init__(self)
     48         if len(args) > 0:
---> 49             self._set_coords(*args)
     50 
     51     # Coordinate getters and setters

~\AppData\Local\Continuum\anaconda3\lib\site-packages\shapely\geometry\point.py in _set_coords()
    130             self._geom, self._ndim = geos_point_from_py(args[0])
    131         else:
--> 132             self._geom, self._ndim = geos_point_from_py(tuple(args))
    133 
    134     coords = property(BaseGeometry._get_coords, _set_coords)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\shapely\geometry\point.py in geos_point_from_py()
    207         coords = ob
    208     n = len(coords)
--> 209     dx = c_double(coords[0])
    210     dy = c_double(coords[1])
    211     dz = None

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\series.py in wrapper()
     91             return converter(self.iloc[0])
     92         raise TypeError("cannot convert the series to "
---> 93                         "{0}".format(str(converter)))
     94 
     95     wrapper.__name__ = "__{name}__".format(name=converter.__name__)

TypeError: cannot convert the series to <class 'float'>

Так что я думаю, что я использую функцию pandas.assign () неправильно, или должна быть более подходящая функция, я просто не могу обернуть ее вокруг.Вы знаете лучший способ справиться с этим?

Я также нашел этот способ:

data_out = data.map_partitions(lambda df: df.apply(lambda row: Point(row['x_mp_100m'],row['y_mp_100m']), axis=1))

Но это самый эффективный способ?

1 Ответ

0 голосов
/ 02 июля 2019

То, что вы делаете, кажется нормальным.Я хотел бы найти функцию, которая хорошо работает в одной строке, а затем использовать метод apply или функцию, которая хорошо работает в одном кадре данных Pandas, а затем использовать метод map_partitions.

В случае ошибки, которую вы получаете, я сначала проверю, работает ли ваша функция на фрейме данных pandas.

...