Многопроцессорность с прогрессом - PullRequest
0 голосов
/ 09 марта 2012

Я никогда раньше не работал с multiprocessing, так что терпите меня, если я задаю простой вопрос.

Этот ответ предоставил очень хороший класс обработки, который я адаптировал кмои потребности, и это работает очень хорошо.Я пытаюсь реализовать базовый индикатор выполнения, который я тестирую, используя операторы print, но он вообще не работает (без вывода).

Мой текущий код такой:

class ParsingMaster(object):
  def __init__(self, parser, input_file, output_file):
    self.parser = parser

    self.num_processes = cpu_count()
    self.input_file = input_file
    self.output_file = output_file

    self.input_queue = Queue()
    self.output_queue = Queue()

    self.input_size = 0

    self.input_process = Process(target=self.parse_input)
    self.output_process = Process(target=self.write_output)
    self.processes = [Process(target=self.process_row) for row in range(self.num_processes)]

    self.input_process.start()
    self.output_process.start()

    for process in self.processes:
      process.start()

    self.input_process.join()

    for process in self.processes:
      process.join()

    self.output_process.join()

  def parse_input(self):
    for index, row in enumerate(self.input_file):
      self.input_queue.put([index, row])
      self.input_size = self.input_queue.qsize()

    for i in range(self.num_processes):
      self.input_queue.put('STOP')

  def process_row(self):
    for index, row in iter(self.input_queue.get, 'STOP'):
      self.output_queue.put([index, row[0], self.parser.parse(row[1])])

    self.output_queue.put('STOP')

  def write_output(self):
    current = 0
    buffer = {}

    for works in range(self.num_processes):
      for index, id, row in iter(self.output_queue.get, 'STOP'):
        if index != current:
          buffer[index] = [id] + row
        else:
          self.output_file.writerow([id] + row)
          current += 1

          while current in buffer:
            self.output_file.writerow(buffer[current])
            del buffer[current]
            current += 1

            if self.input_size:
              print float(current * 100) / float(self.input_size)

После некоторого тестирования я обнаружил несколько странных вещей:

  • self.input_size правильно обновлен в parse_input().
  • parse_input() заканчивается, пока write_output() все еще работает.
  • write_output() всегда сообщает, что self.input_size = 0.

Может кто-нибудь сказать мне, где я здесь не так?Любая помощь полезна, поэтому спасибо заранее.

1 Ответ

2 голосов
/ 09 марта 2012

self.input_size - локальная переменная процесса, каждый процесс будет иметь свою собственную копию. Согласно многопроцессорной документации , вам нужно поместить данные в контейнеры, такие как Value и Array, чтобы сделать их общими.

...