Я боролся за то, чтобы заставить работать приведенный ниже многопроцессорный код (чтобы добавить ближайшие магазины в файл клиента с помощью координат).
Я полагаю, что это проблема pandas, которая вызывает проблему, потенциально что-то связанное с передачей dataframe в функцию parallelize_dataframe (), где он разбивается на различные массивы numpy (это только предположение). Как ни странно, когда я запускаю полный файл почтовых индексов (а не тестовый файл клиента), он не обрабатывает sh (работал 15 минут, пока я его не остановил), однако, поскольку почтовые индексы имеют длину 2,6 м, я не понимаю Не знаю, достигло ли оно точки, когда оно достигнет sh, или я представляю проблему при создании тестовых файлов.
Это длительный процесс, который использует большую часть моего процессора, поэтому я хочу доказать, что он работает с тестовыми файлами, прежде чем позволять ему долго работать с полным файлом.
Либо Кстати, он постоянно выдает ошибку типа маркировки индекса (в конце сообщения).
Любая помощь с этим приветствуется.
import multiprocess as mp #pip install multiprocess
import pandas as pd
import numpy as np
import functools
postcodes = pd.read_csv('national_postcode_stats_file.csv')
customers = postcodes.sample(n = 10000, random_state=1) # customers test file
stores = postcodes.sample(n = 100, random_state=1) # store test file
stores.reset_index(inplace=True)
cores = mp.cpu_count() # 8 CPUs
partitions = cores
def parallelize_dataframe(data, func):
data_split = np.array_split(data, partitions)
pool = mp.Pool(cores)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def dist_func(stores, data):
# Reimport libraries (parellel processes completed in fresh interpretter each time)
import pandas as pd
import numpy as np
def nearest(inlat1, inlon1, inlat2, inlon2, store, postcode):
lat1 = np.radians(inlat1)
lat2 = np.radians(inlat2)
longdif = np.radians(inlon2 - inlon1)
r = 6371.1009 # gives d in kilometers
d = np.arccos(np.sin(lat1)*np.sin(lat2) + np.cos(lat1)*np.cos(lat2) * np.cos(longdif)) * r
near = pd.DataFrame({'store': store, 'postcode': postcode, 'distance': d})
near_min = near.loc[near['distance'].idxmin()]
x = str(near_min['store']) + '~' + str(near_min['postcode']) + '~' + str(near_min['distance'])
return x
data['appended'] = data['lat'].apply(nearest, args=(data['long'], stores['lat'], stores['long'], stores['index'], stores['pcds']))
data[['store','store_postcode','distance_km']] = data['appended'].str.split("~",expand=True)
return data
dist_func_with_stores = functools.partial(dist_func, stores) # Needed to pass stores to parrellize_dataframe
dist = parallelize_dataframe(customers, dist_func_with_stores)
И полная ошибка:
---------------------------------------------------------------------------
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py", line 44, in mapstar
return list(map(*args))
File "<ipython-input-34-7a1b788055e2>", line 41, in dist_func
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\series.py", line 3591, in apply
mapped = lib.map_infer(values, f, convert=convert_dtype)
File "pandas/_libs/lib.pyx", line 2217, in pandas._libs.lib.map_infer
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\series.py", line 3578, in f
return func(x, *args, **kwds)
File "<ipython-input-34-7a1b788055e2>", line 37, in nearest
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 1500, in __getitem__
return self._getitem_axis(maybe_callable, axis=axis)
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 1912, in _getitem_axis
self._validate_key(key, axis)
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 1799, in _validate_key
self._convert_scalar_indexer(key, axis)
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexing.py", line 262, in _convert_scalar_indexer
return ax._convert_scalar_indexer(key, kind=self.name)
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexes\numeric.py", line 211, in _convert_scalar_indexer
._convert_scalar_indexer(key, kind=kind))
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexes\base.py", line 2877, in _convert_scalar_indexer
return self._invalid_indexer('label', key)
File "C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\indexes\base.py", line 3067, in _invalid_indexer
kind=type(key)))
TypeError: cannot do label indexing on <class 'pandas.core.indexes.numeric.Int64Index'> with these indexers [nan] of <class 'float'>
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<ipython-input-34-7a1b788055e2> in <module>
45 dist_func_with_stores = functools.partial(dist_func, stores) # Needed to pass stores to parrellise_dataframe
46
---> 47 dist = parallelize_dataframe(customers, dist_func_with_stores)
<ipython-input-34-7a1b788055e2> in parallelize_dataframe(data, func)
16 data_split = np.array_split(data, partitions)
17 pool = mp.Pool(cores)
---> 18 data = pd.concat(pool.map(func, data_split))
19 pool.close()
20 pool.join()
~\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
TypeError: cannot do label indexing on <class 'pandas.core.indexes.numeric.Int64Index'> with these indexers [nan] of <class 'float'>