Ошибка в pandas многопроцессорном коде - датафрейм не вызывается - PullRequest
0 голосов
/ 19 февраля 2020

Я пытаюсь оптимизировать расчет расстояния по большому файлу с помощью многопроцессорной обработки. Я разработал приведенный ниже код, но может ли кто-нибудь объяснить, почему он выдает ошибку [объект DataFrame не вызывается]?

Похоже, что-то связано с «картой» внутри parallelize_dataframe, возможно, вызванной как я разработал test_fun c, но не уверен, как решить. Заранее спасибо за любую помощь.

import multiprocessing as mp

nearest_calc3 = None
nearest_calc3 = postcodes.head(1000).copy() # Test top 1000

partitions = 5
cores = mp.cpu_count()

def parallelize_dataframe(data, func):
    data_split = np.array_split(data, partitions)
    pool = mp.Pool(cores)
    data = pd.concat(pool.map(func, data_split)) # <-- Problem here?
    pool.close()
    pool.join()
    return data

def nearest(inlat1, inlon1, inlat2, inlon2, store, postcode):
    lat1 = np.radians(inlat1)
    lat2 = np.radians(inlat2)
    longdif = np.radians(inlon2 - inlon1)
    r = 6371.1009 # gives d in kilometers
    d = np.arccos(np.sin(lat1)*np.sin(lat2) + np.cos(lat1)*np.cos(lat2) * np.cos(longdif)) * r
    near = pd.DataFrame({'store': store, 'postcode': postcode, 'distance': d})
    near_min = near.loc[near['distance'].idxmin()]
    x = str(near_min['store']) + '~' + str(near_min['postcode']) + '~' + str(near_min['distance'])
    return x

def test_func(data, stores): # <-- Or maybe here?
    data['appended'] = data['lat'].apply(nearest, args=(data['long'], stores['lat'], stores['long'], stores['index'], stores['pcds']))
    data[['store','store_postcode','distance_km']] = data['appended'].str.split("~",expand=True)
    return data

if __name__ == '__main__':
    test = parallelize_dataframe(nearest_calc3, test_func(nearest_calc3, stores))

И полная ошибка:

---------------------------------------------------------------------------
RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 44, in mapstar
    return list(map(*args))
TypeError: 'DataFrame' object is not callable
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
<ipython-input-21-501c0f76106a> in <module>
     32 #x = test_func(nearest_calc3, stores)
     33 
---> 34 test = parallelize_dataframe(nearest_calc3, test_func(nearest_calc3, stores))

<ipython-input-21-501c0f76106a> in parallelize_dataframe(data, func)
      9     data_split = np.array_split(data, partitions)
     10     pool = mp.Pool(cores)
---> 11     data = pd.concat(pool.map(func, data_split))
     12     pool.close()
     13     pool.join()

~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

TypeError: 'DataFrame' object is not callable

1 Ответ

2 голосов
/ 19 февраля 2020

Проблема в последней строке:

test = parallelize_dataframe(nearest_calc3, test_func(nearest_calc3, stores))

test_func(...) вернет кадр данных, и вы передадите его в parallelize_dataframe. Но эта функция ожидает вызова.

Вы хотите что-то вроде этого:

test = parallelize_dataframe(nearest_calc3, test_func)

Поскольку вы хотите всегда передавать stores в test_func в дополнение к nearest_calc3, вы для этого можно использовать partial:

test_func_with_stores = functools.partial(test_func, stores)

test_func_with_stores - это вызываемый объект, который принимает один параметр. К сожалению, partial заполняет параметры слева направо, поэтому вам придется изменить test_func, чтобы stores был первым параметром.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...