методы экземпляра в функции во время многопроцессорной обработки (python) - PullRequest
1 голос
/ 20 апреля 2020

Я пытался запустить многопроцессорную обработку с большим набором данных.

, когда я запускаю сценарий, указанный ниже, для l oop, общее время выполнения составляет 1,5 сек c.

def get_vars(accessCode, user_profile, wt, meals, instance_method='get_wt_adherence'):
   '''

   Examples
   --------
   >> n_cpus = multiprocessing.cpu_count()
   >> get_wt_adherence = partial(get_vars, user_profile, wt, meals, 
                                 instance_method='get_wt_adherence')
   >> pool = multiprocessing.Pool(n_cpus-5)
   >> result = pool.map(get_wt_adherence, accessCodes)
   >> concated_result = pd.concat(result)


   Version
   -------
   # 2020.03.26 Updated
       : Class name edited. 'NOOM' -> 'DATA_GEN'

   '''

   #
   COL_WEEK = ['{}week'.format(i) for i in range(1, 17)]


   data_gen = DATA_GEN(accessCode, user_profile, wt, meals)

   if instance_method == 'get_wt_adherence':
       func = data_gen.get_wt_adherence
   elif instance_method == 'get_meal_adherence':
       func = data_gen.get_meal_adherence
   elif instance_method == 'get_color_food':
       func = data_gen.get_color_food
   elif instance_method == 'get_daily_cal':
       func = data_gen.get_daily_cal

   row = pd.DataFrame([func(weeks) for weeks in range(1, 17)]).T
   row.columns = COL_WEEK
   row['accessCode'] = accessCode
   return row


from noom.handler import DATA_GEN
from functools import partial
import multiprocessing


# start_time = time.time()

get_wt = partial(get_vars, user_profile=user_profile, wt=wt_logs, meals=meals, instance_method='get_wt_adherence')

for i in range(10):
    get_wt(accessCodes[i])

однако, когда я пытался запустить этот скрипт, используя многопроцессорную обработку, скрипт не отвечал. Даже, accessCodes - это список, содержащий 100 элементов.

Я подозреваю, что функция get_wt использует частичный модуль.

n_cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(n_cpus-15)
result_wt = pool.map(get_wt, accessCodes)         ; print('wt adherence finished')
pool.close()

Как решить эту проблему? ошибка ниже

---------------------------------------------------------------------------
error                                     Traceback (most recent call last)
<ipython-input-22-73ddf2e21bbd> in <module>
      2 n_cpus = multiprocessing.cpu_count()
      3 pool = multiprocessing.Pool(n_cpus-15)
----> 4 result_wt = pool.map(get_wt_adherence, accessCodes[1:10])         ; print('wt adherence finished')
      5 pool.close()
      6 time.time() - start_time

/usr/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    264         in a list that is returned.
    265         '''
--> 266         return self._map_async(func, iterable, mapstar, chunksize).get()
    267 
    268     def starmap(self, func, iterable, chunksize=None):

/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

/usr/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    422                         break
    423                     try:
--> 424                         put(task)
    425                     except Exception as e:
    426                         job, idx = task[:2]

/usr/lib/python3.6/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

/usr/lib/python3.6/multiprocessing/connection.py in _send_bytes(self, buf)
    391         n = len(buf)
    392         # For wire compatibility with 3.2 and lower
--> 393         header = struct.pack("!i", n)
    394         if n > 16384:
    395             # The payload is large so Nagle's algorithm won't be triggered

error: 'i' format requires -2147483648 <= number <= 2147483647
...