У меня есть список файлов CSV. Я хочу выполнить набор операций с каждой из них, а затем создать контрдикет, и я хочу создать главный список, содержащий индивидуальные контрдикеты из всех файлов CSV. Я хочу распараллелить обработку каждого файла CSV и затем вернуть счетчик dict из каждого файла. Я нашел подобное решение здесь: Как я могу восстановить возвращаемое значение функции, переданной в multiprocessing.Process?
Я использовал решение, предложенное Дэвидом Калленом. Это решение отлично работает для строк, но когда я пытался вернуть встречный или нормальный диктат. Все CSV-файлы обрабатываются до send_end.send (результат), и он навсегда зависает при выполнении и затем выдает ошибку памяти Я запускаю это на сервере Linux с более чем достаточным объемом памяти для создания списка встречных сообщений.
Я использовал следующий код:
import multiprocessing
#get current working directory
cwd = os.getcwd()
#take a list of all files in cwd
files = os.listdir(cwd)
#defining the function that needs to be done on all csv files
def worker(f,send_end):
infile= open(f)
#read liens in csv file
lines = infile.readlines()
#split the lines by "," and store it in a list of lists
master_lst = [line.strip().split(“,”) for line in lines]
#extract the second field in each sublist
counter_lst = [ element[1] for element in master_lst]
print “Total elements in the list” + str(len(counter_lst))
#create a dictionary of count elements
a = Counter(counter_lst)
# return the counter dict
send_end.send(a)
def main():
jobs = []
pipe_list = []
for f in files:
if f.endswith('.csv'):
recv_end, send_end = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=worker, args=(f, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print len(result_list)
if __name__ == '__main__':
main()
Я получаю следующую ошибку:
Process Process-42:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
Process Process-17:
Traceback (most recent call last):
Process Process-6:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
Process Process-8:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
self.run()
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
self._target(*self._args, **self._kwargs)
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst_lst)
a = Counter(counter_lst_lst)
a = Counter(counter_lst_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
self.update(iterable, **kwds)
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
self[elem] = self_get(elem, 0) + 1
MemoryError
MemoryError
Process Process-10:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
^Z
[18]+ Stopped collapse_multiprocessing_return.py
Теперь вместо «a» в send_end.send (a), если я заменю f, имя файла. Он печатает количество csv-файлов в каталоге (что и делает len (result_list) в этом случае). Но когда возвращается счетчик "a", он застревает навсегда, выдавая вышеуказанную ошибку.
Я бы хотел, чтобы код проходил счетчик, чтобы получить end без каких-либо ошибок / проблем. Есть ли работа вокруг? Может кто-нибудь предложить, пожалуйста, возможное решение?
p.s: Я новичок в многопроцессорном модуле, извините, если этот вопрос звучит наивно. Кроме того, я попробовал multiprocessing.Manager (), но получил похожую ошибку