У меня есть многопроцессорный конвейер типа производитель / потребитель для сотен миллионов элементов, который отлично работает (в очень упрощенной форме с некоторым псевдокодом) следующим образом:
from multiprocessing import Process, Manager
def process(batch):
for thing in batch:
result_things = []
for a, b in some_func(thing): # a and b are reasonably short strings
result_things.append(dict(a=a, b=b))
yield result_things
return
STOP_MSG = 'STOP!'
def wrapped_process(q_in, q_out):
msg = q_in.get()
while msg != STOP_MSG:
for result_things in process(msg):
q_out.put(result_things)
msg = q_in.get()
q_out.put(STOP_MSG)
return
def main():
num_workers = 20
mgr = Manager()
q_worker = mgr.Queue()
q_master = mgr.Queue()
for batch in source_of_data:
q_master.put(batch)
agents = []
for i in range(num_workers):
p = Process(
target=wrapped_process,
kwargs=dict(
q_in=q_master, q_out=q_worker))
agents.append(p)
for p in agents:
p.start()
stop_msg_count = 0
while stop_msg_count < num_workers:
msg = q_worker.get()
if msg == STOP_MSG:
stop_msg_count += 1
else:
result_things = msg
add_to_db(result_things)
Вышеупомянутое отлично работает, никогда не превышая Всего 10 ГБ в соответствии с обработчиком заданий на наших серверах.
Я решил сделать несколько OOP и создал простой класс на месте словаря, например:
class Result:
def __init__(self, a, b):
self.a = a
self.b = b
def process(batch):
for thing in batch:
result_things = []
for a, b in some_func(thing):
result_things.append(Result(a=a, b=b)) # instead of a dict, I now use the Result class
yield result_things
return
Это привело к тому, что мои рабочие места были убиты из-за чрезмерного использования памяти, и даже после того, как я запросил 100 ГБ, эти задания будут d ie.
Мне потребовалось время, чтобы понять, что на самом деле это был новый класс. создавая проблемы с памятью, так как я никогда не думал, что такое безобидное изменение может вызвать проблемы с памятью.
И я подтвердил, что новый класс был проблемой, потому что следующее изменение исправило его (вместо возврата к словарю):
def wrapped_process(q_in, q_out):
msg = q_in.get()
while msg != STOP_MSG:
for result_things in process(msg):
q_out.put(result_things)
for result in result_things:
del result # explicit deallocation of the simple objects
msg = q_in.get()
q_out.put(STOP_MSG)
return
Почему python не выполняет сборку мусора, когда, по крайней мере, согласно { ссылка }, это sh мог бы, даже с очередями.
И есть ли лучший, более стандартный способ сделать это, чем ограничиваться встроенными структурами или ручным управлением памятью?
Я использую Python 3.6.2, если важно.