Python multiprocess.Pool.map не может обрабатывать большие массивы. - PullRequest
0 голосов
/ 24 апреля 2018

Это код, который я использую для паррелялизации функции apply в строках объекта pandas.DataFrame:

from multiprocessing import cpu_count, Pool
from functools import partial

def parallel_applymap_df(df: DataFrame, func, num_cores=cpu_count(),**kargs):

partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))
pool.close()
pool.join()

return series

Он работает с подвыборками из 200 000 строк, но когда я пробую на полную200 000 000 примеров, я получаю следующее сообщение об ошибке:

~/anaconda3/lib/python3.6/site-packages/multiprocess/connection.py in _send_bytes(self, buf)
394         n = len(buf)
395         # For wire compatibility with 3.2 and lower
—> 396         header = struct.pack("!i", n)
397         if n > 16384:
398             # The payload is large so Nagle's algorithm won't be triggered

error: 'i' format requires -2147483648 <= number <= 2147483647

Генерируется строкой:

series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))

Это очень странно, потому что немного другая версия, которую я использую для распараллеливания операций, которыене векторизованы в пандах (например, Series.dt.time), работает с одинаковым количеством строк.Это версия для экзаменов работает:

def parallel_map_df(df: DataFrame, func, num_cores=cpu_count()):

partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()

return df
...