многопроцессорная обработка - отмена оставшихся заданий в пуле без уничтожения пула - PullRequest
0 голосов
/ 01 сентября 2018

Я использую map_async для создания пула из 4 рабочих. И предоставив ему список файлов изображений для обработки [Set 1].
Иногда мне нужно отменить промежуточную обработку, чтобы вместо этого я мог обработать другой набор файлов [Набор 2].

Так, например, я дал map_async 1000 файлов для обработки. А затем хотите отменить обработку оставшихся заданий после обработки около 200 файлов.
Кроме того, я хочу сделать это отмена без разрушения / прекращения пула. Возможно ли это?

Я не хочу завершать пул , потому что воссоздание пула - медленный процесс в Windows (потому что он использует «spawn» вместо «fork»). И мне нужно использовать этот же пул для обработки другого набора файлов изображений [Набор 2] ..

# Putting job_set1 through processing. It may consist of 1000 images
cpu = multiprocessing.cpu_count()
pool = Pool(processes=cpu)
result = pool.map_async(job_set1, thumb_ts_list, chunksize=chunksize)

Теперь между ними мне нужно отменить обработку этого набора 1. И перейти к другому набору (ожидание завершения обработки всех 1000 изображений не вариант, но я могу дождаться завершения обработки текущего изображения). )

<Somehow cancel processing of job_set1>
result = pool.map_async(job_set2, thumb_ts_list, chunksize=chunksize)

Ответы [ 2 ]

0 голосов
/ 04 сентября 2018

Пришло время для фундаментальной теоремы разработки программного обеспечения : хотя multiprocessing.Pool не предоставляет отмену как функцию, мы можем добавить ее, прочитав Pool из тщательно созданной итерации. Однако недостаточно иметь генератор, который yield s получает значения из списка, но останавливается на каком-то сигнале, потому что Pool охотно истощает любой данный ему генератор. Итак, нам нужна очень тщательно продуманная итерация.

Ленивый Pool

Универсальный инструмент, который нам нужен, - это способ построения задач для Pool только тогда, когда работник становится доступным (или не более одной задачи впереди, в случае, если их создание занимает значительное время). Основная идея состоит в том, чтобы замедлить работу по сбору потоков для Pool с использованием семафора, только когда задача завершена. (Мы знаем, что такая нить существует из наблюдаемого поведения imap_unordered.)

import multiprocessing
from threading import Semaphore

size=multiprocessing.cpu_count()  # or whatever Pool size to use

# How many workers are waiting for work?  Add one to buffer one task.
work=Semaphore(size)

def feed0(it):
  it=iter(it)
  try:
    while True:
      # Don't ask the iterable until we have a customer, in case better
      # instructions become available:
      work.acquire()
      yield next(it)
  except StopIteration: pass
  work.release()
def feed(p,f,it):
  import sys,traceback
  iu=p.imap_unordered(f,feed0(it))
  while True:
    try: x=next(iu)
    except StopIteration: return
    except Exception: traceback.print_exception(*sys.exc_info())
    work.release()
    yield x

try в feed не позволяет сбоям дочерних элементов нарушать счет семафора, но имейте в виду, что он не защищает от сбоев в родительском элементе.

Отменяемый итератор

Теперь, когда мы имеем контроль над вводом Pool в режиме реального времени, все политики планирования просты. Например, вот что-то вроде itertools.chain, но с возможностью асинхронно отбрасывать любые оставшиеся элементы из одной из входных последовательностей:

import collections,queue

class Cancel:
  closed=False
  cur=()
  def __init__(self): self.data=queue.Queue() # of deques
  def add(self,d):
    d=collections.deque(d)
    self.data.put(d)
    return d
  def __iter__(self):
    while True:
      try: yield self.cur.popleft()
      except IndexError:
        self.cur=self.data.get()
        if self.cur is None: break
  @staticmethod
  def cancel(d): d.clear()
  def close(self): self.data.put(None)

Это поточно-ориентированный (по крайней мере, в CPython), несмотря на отсутствие блокировки, потому что такие операции, как deque.clear, являются атомарными по отношению к проверке Python (и мы не проверяем отдельно, является ли self.cur пустым).

Использование

Создание одного из них выглядит как

pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
  if happy_with(x): can.cancel(many)  # straight onto few, then out

где, конечно, add s и close сами могут быть в цикле.

0 голосов
/ 02 сентября 2018

Модуль multiprocessing, похоже, не имеет понятия отмены. Вы можете использовать оболочку concurrent.futures.ProcessPoolExecutor и отменить ожидающие фьючерсы, когда у вас будет достаточно результатов.

Вот пример, который выбирает 10 JPEG-файлов из набора путей и отменяет ожидающие фьючерсы, оставляя пул процессов пригодным для использования впоследствии:

import concurrent.futures


def interesting_path(path):
    """Gives path if is a JPEG else ``None``."""
    with open(path, 'rb') as f:
        if f.read(3) == b'\xff\xd8\xff':
            return path
        return None


def find_interesting(paths, count=10):
     """Yields count from paths which are 'interesting' by multiprocess task."""
    with concurrent.futures.ProcessPoolExecutor() as pool:
        futures = {pool.submit(interesting_path, p) for p in paths}
        print ('Started {}'.format(len(futures)))
        for future in concurrent.futures.as_completed(futures):
            res = future.result()
            futures.remove(future)
            if res is not None:
                yield res
                count -= 1
                if count == 0:
                    break
        cancelled = 0
        for future in futures:
            cancelled += future.cancel()
        print ('Cancelled {}'.format(cancelled))
        concurrent.futures.wait(futures)
        # Can still use pool here for more processing as needed

Обратите внимание, что выбор того, как разбить работу на фьючерсы, все еще сложен, больший набор требует больше накладных расходов, но также может означать меньше напрасной работы. Это также может быть легко адаптировано к асинхронному синтаксису Python 3.6.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...