Python многопроцессорная очередь, а не пользовательские классы для сбора мусора - PullRequest
0 голосов
/ 29 мая 2020

У меня есть многопроцессорный конвейер типа производитель / потребитель для сотен миллионов элементов, который отлично работает (в очень упрощенной форме с некоторым псевдокодом) следующим образом:

from multiprocessing import Process, Manager

def process(batch):
  for thing in batch:
    result_things = []
    for a, b in some_func(thing): # a and b are reasonably short strings
      result_things.append(dict(a=a, b=b))
    yield result_things
  return

STOP_MSG = 'STOP!'
def wrapped_process(q_in, q_out):
  msg = q_in.get()
  while msg != STOP_MSG:
    for result_things in process(msg):
      q_out.put(result_things)
    msg = q_in.get()
  q_out.put(STOP_MSG)
  return

def main():
  num_workers = 20
  mgr = Manager()
  q_worker = mgr.Queue()
  q_master = mgr.Queue()
  for batch in source_of_data:
    q_master.put(batch)
  agents = []
  for i in range(num_workers):
    p = Process(
      target=wrapped_process,
      kwargs=dict(
        q_in=q_master, q_out=q_worker))
    agents.append(p)
  for p in agents:
    p.start()

  stop_msg_count = 0
  while stop_msg_count < num_workers:
    msg = q_worker.get()
    if msg == STOP_MSG:
      stop_msg_count += 1
    else:
      result_things = msg
      add_to_db(result_things)

Вышеупомянутое отлично работает, никогда не превышая Всего 10 ГБ в соответствии с обработчиком заданий на наших серверах.

Я решил сделать несколько OOP и создал простой класс на месте словаря, например:

class Result:
  def __init__(self, a, b):
    self.a = a
    self.b = b

def process(batch):
  for thing in batch:
    result_things = []
    for a, b in some_func(thing): 
      result_things.append(Result(a=a, b=b)) # instead of a dict, I now use the Result class
    yield result_things
  return

Это привело к тому, что мои рабочие места были убиты из-за чрезмерного использования памяти, и даже после того, как я запросил 100 ГБ, эти задания будут d ie.

Мне потребовалось время, чтобы понять, что на самом деле это был новый класс. создавая проблемы с памятью, так как я никогда не думал, что такое безобидное изменение может вызвать проблемы с памятью.

И я подтвердил, что новый класс был проблемой, потому что следующее изменение исправило его (вместо возврата к словарю):


def wrapped_process(q_in, q_out):
  msg = q_in.get()
  while msg != STOP_MSG:
    for result_things in process(msg):
      q_out.put(result_things)
      for result in result_things:
        del result     # explicit deallocation of the simple objects
    msg = q_in.get()
  q_out.put(STOP_MSG)
  return

Почему python не выполняет сборку мусора, когда, по крайней мере, согласно { ссылка }, это sh мог бы, даже с очередями.

И есть ли лучший, более стандартный способ сделать это, чем ограничиваться встроенными структурами или ручным управлением памятью?

Я использую Python 3.6.2, если важно.

...