Pandas Ошибка типа маркировки индекса в многопроцессорном коде - PullRequest
0 голосов
/ 20 февраля 2020

Я боролся за то, чтобы заставить работать приведенный ниже многопроцессорный код (чтобы добавить ближайшие магазины в файл клиента с помощью координат).

Я полагаю, что это проблема pandas, которая вызывает проблему, потенциально что-то связанное с передачей dataframe в функцию parallelize_dataframe (), где он разбивается на различные массивы numpy (это только предположение). Как ни странно, когда я запускаю полный файл почтовых индексов (а не тестовый файл клиента), он не обрабатывает sh (работал 15 минут, пока я его не остановил), однако, поскольку почтовые индексы имеют длину 2,6 м, я не понимаю Не знаю, достигло ли оно точки, когда оно достигнет sh, или я представляю проблему при создании тестовых файлов.

Это длительный процесс, который использует большую часть моего процессора, поэтому я хочу доказать, что он работает с тестовыми файлами, прежде чем позволять ему долго работать с полным файлом.

Либо Кстати, он постоянно выдает ошибку типа маркировки индекса (в конце сообщения).

Любая помощь с этим приветствуется.

import multiprocess as mp #pip install multiprocess
import pandas as pd
import numpy as np 
import functools

postcodes = pd.read_csv('national_postcode_stats_file.csv')

customers = postcodes.sample(n = 10000, random_state=1) # customers test file
stores = postcodes.sample(n = 100, random_state=1) # store test file
stores.reset_index(inplace=True)

cores = mp.cpu_count() # 8 CPUs
partitions = cores

def parallelize_dataframe(data, func):
    data_split = np.array_split(data, partitions)
    pool = mp.Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def dist_func(stores, data):

    # Reimport libraries (parellel processes completed in fresh interpretter each time)

    import pandas as pd
    import numpy as np 

    def nearest(inlat1, inlon1, inlat2, inlon2, store, postcode):
        lat1 = np.radians(inlat1)
        lat2 = np.radians(inlat2)
        longdif = np.radians(inlon2 - inlon1)
        r = 6371.1009 # gives d in kilometers
        d = np.arccos(np.sin(lat1)*np.sin(lat2) + np.cos(lat1)*np.cos(lat2) * np.cos(longdif)) * r
        near = pd.DataFrame({'store': store, 'postcode': postcode, 'distance': d})
        near_min = near.loc[near['distance'].idxmin()]
        x = str(near_min['store']) + '~' + str(near_min['postcode']) + '~' + str(near_min['distance'])
        return x

    data['appended'] = data['lat'].apply(nearest, args=(data['long'], stores['lat'], stores['long'], stores['index'], stores['pcds']))
    data[['store','store_postcode','distance_km']] = data['appended'].str.split("~",expand=True)
    return data

dist_func_with_stores = functools.partial(dist_func, stores) # Needed to pass stores to parrellize_dataframe

dist = parallelize_dataframe(customers, dist_func_with_stores)

И полная ошибка:

---------------------------------------------------------------------------
RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py", line 44, in mapstar
    return list(map(*args))
  File "<ipython-input-34-7a1b788055e2>", line 41, in dist_func
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\series.py", line 3591, in apply
    mapped = lib.map_infer(values, f, convert=convert_dtype)
  File "pandas/_libs/lib.pyx", line 2217, in pandas._libs.lib.map_infer
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\series.py", line 3578, in f
    return func(x, *args, **kwds)
  File "<ipython-input-34-7a1b788055e2>", line 37, in nearest
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 1500, in __getitem__
    return self._getitem_axis(maybe_callable, axis=axis)
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 1912, in _getitem_axis
    self._validate_key(key, axis)
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 1799, in _validate_key
    self._convert_scalar_indexer(key, axis)
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 262, in _convert_scalar_indexer
    return ax._convert_scalar_indexer(key, kind=self.name)
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexes\numeric.py", line 211, in _convert_scalar_indexer
    ._convert_scalar_indexer(key, kind=kind))
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexes\base.py", line 2877, in _convert_scalar_indexer
    return self._invalid_indexer('label', key)
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexes\base.py", line 3067, in _invalid_indexer
    kind=type(key)))
TypeError: cannot do label indexing on <class 'pandas.core.indexes.numeric.Int64Index'> with these indexers [nan] of <class 'float'>
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
<ipython-input-34-7a1b788055e2> in <module>
     45 dist_func_with_stores = functools.partial(dist_func, stores) # Needed to pass stores to parrellise_dataframe
     46 
---> 47 dist = parallelize_dataframe(customers, dist_func_with_stores)

<ipython-input-34-7a1b788055e2> in parallelize_dataframe(data, func)
     16     data_split = np.array_split(data, partitions)
     17     pool = mp.Pool(cores)
---> 18     data = pd.concat(pool.map(func, data_split))
     19     pool.close()
     20     pool.join()

~\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

~\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

TypeError: cannot do label indexing on <class 'pandas.core.indexes.numeric.Int64Index'> with these indexers [nan] of <class 'float'>

1 Ответ

0 голосов
/ 21 февраля 2020

Решил - изменил применение к лямбда-функции - теперь работает нормально.

data['appended'] = data.apply(lambda row: nearest(row['lat'], row['long'], stores['lat'], stores['long'], stores['index'], stores['pcds']),axis=1)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...