Пришло время для фундаментальной теоремы разработки программного обеспечения : хотя multiprocessing.Pool
не предоставляет отмену как функцию, мы можем добавить ее, прочитав Pool
из тщательно созданной итерации. Однако недостаточно иметь генератор, который yield
s получает значения из списка, но останавливается на каком-то сигнале, потому что Pool
охотно истощает любой данный ему генератор. Итак, нам нужна очень тщательно продуманная итерация.
Ленивый Pool
Универсальный инструмент, который нам нужен, - это способ построения задач для Pool
только тогда, когда работник становится доступным (или не более одной задачи впереди, в случае, если их создание занимает значительное время). Основная идея состоит в том, чтобы замедлить работу по сбору потоков для Pool
с использованием семафора, только когда задача завершена. (Мы знаем, что такая нить существует из наблюдаемого поведения imap_unordered
.)
import multiprocessing
from threading import Semaphore
size=multiprocessing.cpu_count() # or whatever Pool size to use
# How many workers are waiting for work? Add one to buffer one task.
work=Semaphore(size)
def feed0(it):
it=iter(it)
try:
while True:
# Don't ask the iterable until we have a customer, in case better
# instructions become available:
work.acquire()
yield next(it)
except StopIteration: pass
work.release()
def feed(p,f,it):
import sys,traceback
iu=p.imap_unordered(f,feed0(it))
while True:
try: x=next(iu)
except StopIteration: return
except Exception: traceback.print_exception(*sys.exc_info())
work.release()
yield x
try
в feed
не позволяет сбоям дочерних элементов нарушать счет семафора, но имейте в виду, что он не защищает от сбоев в родительском элементе.
Отменяемый итератор
Теперь, когда мы имеем контроль над вводом Pool
в режиме реального времени, все политики планирования просты. Например, вот что-то вроде itertools.chain
, но с возможностью асинхронно отбрасывать любые оставшиеся элементы из одной из входных последовательностей:
import collections,queue
class Cancel:
closed=False
cur=()
def __init__(self): self.data=queue.Queue() # of deques
def add(self,d):
d=collections.deque(d)
self.data.put(d)
return d
def __iter__(self):
while True:
try: yield self.cur.popleft()
except IndexError:
self.cur=self.data.get()
if self.cur is None: break
@staticmethod
def cancel(d): d.clear()
def close(self): self.data.put(None)
Это поточно-ориентированный (по крайней мере, в CPython), несмотря на отсутствие блокировки, потому что такие операции, как deque.clear
, являются атомарными по отношению к проверке Python (и мы не проверяем отдельно, является ли self.cur
пустым).
Использование
Создание одного из них выглядит как
pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
if happy_with(x): can.cancel(many) # straight onto few, then out
где, конечно, add
s и close
сами могут быть в цикле.