Как вернуть счетчик словаря из функции, переданной в многопроцессорную обработку? - PullRequest
0 голосов
/ 04 мая 2018

У меня есть список файлов CSV. Я хочу выполнить набор операций с каждой из них, а затем создать контрдикет, и я хочу создать главный список, содержащий индивидуальные контрдикеты из всех файлов CSV. Я хочу распараллелить обработку каждого файла CSV и затем вернуть счетчик dict из каждого файла. Я нашел подобное решение здесь: Как я могу восстановить возвращаемое значение функции, переданной в multiprocessing.Process?

Я использовал решение, предложенное Дэвидом Калленом. Это решение отлично работает для строк, но когда я пытался вернуть встречный или нормальный диктат. Все CSV-файлы обрабатываются до send_end.send (результат), и он навсегда зависает при выполнении и затем выдает ошибку памяти Я запускаю это на сервере Linux с более чем достаточным объемом памяти для создания списка встречных сообщений.

Я использовал следующий код:

import multiprocessing

#get current working directory
cwd = os.getcwd()

#take a list of all files in cwd
files = os.listdir(cwd)

#defining the function that needs to be done on all csv files
def worker(f,send_end):
    infile= open(f) 
    #read liens in csv file
    lines = infile.readlines()
    #split the lines by "," and store it in a list of lists
    master_lst = [line.strip().split(“,”) for line in lines]
    #extract the second field in each sublist 
    counter_lst = [ element[1] for element in master_lst]
    print “Total elements in the list” + str(len(counter_lst))
    #create a dictionary of count elements
    a = Counter(counter_lst)
    # return the counter dict
    send_end.send(a)

def main():
    jobs = []
    pipe_list = []
    for f in files:
        if f.endswith('.csv'):
           recv_end, send_end = multiprocessing.Pipe(duplex=False)
           p = multiprocessing.Process(target=worker, args=(f, send_end))
           jobs.append(p)
           pipe_list.append(recv_end)
           p.start()

    for proc in jobs:
       proc.join()
    result_list = [x.recv() for x in pipe_list]
    print len(result_list)

if __name__ == '__main__':
     main()

Я получаю следующую ошибку:

Process Process-42:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
  _bootstrap
  self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
  self._target(*self._args, **self._kwargs)
  File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
  worker
  a = Counter(counter_lst)
  File "/usr/lib64/python2.7/collections.py", line 444, in __init__
  self.update(iterable, **kwds)
  File "/usr/lib64/python2.7/collections.py", line 526, in update
  self[elem] = self_get(elem, 0) + 1
 MemoryError
 Process Process-17:
 Traceback (most recent call last):
 Process Process-6:
 Traceback (most recent call last):
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 Process Process-8:
 Traceback (most recent call last):
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 self.run()
 self.run()
 self.run()
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 self._target(*self._args, **self._kwargs)
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 self._target(*self._args, **self._kwargs)
 self._target(*self._args, **self._kwargs)
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 a = Counter(counter_lst_lst)
 a = Counter(counter_lst_lst)
 a = Counter(counter_lst_lst)
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 self.update(iterable, **kwds)
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 self[elem] = self_get(elem, 0) + 1
 MemoryError
 self.update(iterable, **kwds)
 self.update(iterable, **kwds)
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 self[elem] = self_get(elem, 0) + 1
 self[elem] = self_get(elem, 0) + 1
 MemoryError
 MemoryError
 Process Process-10:
 Traceback (most recent call last):
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 self.run()
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 self._target(*self._args, **self._kwargs)
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 a = Counter(counter_lst)
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 self.update(iterable, **kwds)
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 self[elem] = self_get(elem, 0) + 1
 MemoryError
 ^Z
 [18]+  Stopped                 collapse_multiprocessing_return.py

Теперь вместо «a» в send_end.send (a), если я заменю f, имя файла. Он печатает количество csv-файлов в каталоге (что и делает len (result_list) в этом случае). Но когда возвращается счетчик "a", он застревает навсегда, выдавая вышеуказанную ошибку.

Я бы хотел, чтобы код проходил счетчик, чтобы получить end без каких-либо ошибок / проблем. Есть ли работа вокруг? Может кто-нибудь предложить, пожалуйста, возможное решение?

p.s: Я новичок в многопроцессорном модуле, извините, если этот вопрос звучит наивно. Кроме того, я попробовал multiprocessing.Manager (), но получил похожую ошибку

1 Ответ

0 голосов
/ 04 мая 2018

В вашей трассировке упоминается Process Process-42:, поэтому создается не менее 42 процессов. Вы создаете процесс для каждого файла CSV, который бесполезен и, вероятно, вызывает ошибку памяти.

Ваша проблема может быть решена намного проще, используя multiprocessing.Pool.map. Функцию worker также можно значительно сократить:

def worker(f):
    with open(f) as infile:
        return Counter(line.strip().split(",")[1]
                       for line in infile)

def main():
    pool = multiprocessing.Pool()
    result_list = pool.map(worker, [f for f in files if f.endswith('.csv')])

Передача аргументов в пул не означает, что он создаст столько процессов, сколько у вас процессорных ядер. Использование большего количества может повысить или не увеличить производительность.

...