Я пытаюсь оптимизировать расчет расстояния по большому файлу с помощью многопроцессорной обработки. Я разработал приведенный ниже код, но может ли кто-нибудь объяснить, почему он выдает ошибку [объект DataFrame не вызывается]?
Похоже, что-то связано с «картой» внутри parallelize_dataframe, возможно, вызванной как я разработал test_fun c, но не уверен, как решить. Заранее спасибо за любую помощь.
import multiprocessing as mp
nearest_calc3 = None
nearest_calc3 = postcodes.head(1000).copy() # Test top 1000
partitions = 5
cores = mp.cpu_count()
def parallelize_dataframe(data, func):
data_split = np.array_split(data, partitions)
pool = mp.Pool(cores)
data = pd.concat(pool.map(func, data_split)) # <-- Problem here?
pool.close()
pool.join()
return data
def nearest(inlat1, inlon1, inlat2, inlon2, store, postcode):
lat1 = np.radians(inlat1)
lat2 = np.radians(inlat2)
longdif = np.radians(inlon2 - inlon1)
r = 6371.1009 # gives d in kilometers
d = np.arccos(np.sin(lat1)*np.sin(lat2) + np.cos(lat1)*np.cos(lat2) * np.cos(longdif)) * r
near = pd.DataFrame({'store': store, 'postcode': postcode, 'distance': d})
near_min = near.loc[near['distance'].idxmin()]
x = str(near_min['store']) + '~' + str(near_min['postcode']) + '~' + str(near_min['distance'])
return x
def test_func(data, stores): # <-- Or maybe here?
data['appended'] = data['lat'].apply(nearest, args=(data['long'], stores['lat'], stores['long'], stores['index'], stores['pcds']))
data[['store','store_postcode','distance_km']] = data['appended'].str.split("~",expand=True)
return data
if __name__ == '__main__':
test = parallelize_dataframe(nearest_calc3, test_func(nearest_calc3, stores))
И полная ошибка:
---------------------------------------------------------------------------
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 44, in mapstar
return list(map(*args))
TypeError: 'DataFrame' object is not callable
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<ipython-input-21-501c0f76106a> in <module>
32 #x = test_func(nearest_calc3, stores)
33
---> 34 test = parallelize_dataframe(nearest_calc3, test_func(nearest_calc3, stores))
<ipython-input-21-501c0f76106a> in parallelize_dataframe(data, func)
9 data_split = np.array_split(data, partitions)
10 pool = mp.Pool(cores)
---> 11 data = pd.concat(pool.map(func, data_split))
12 pool.close()
13 pool.join()
~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
TypeError: 'DataFrame' object is not callable