Одна часть моего приложения производит много задач (по сути, словарь).Каждая задача (словарь) по сути представляет собой данные POST для стороннего API.
Поэтому я хотел многопроцессорное выполнение этих задач.
То, что я думал, было: Создание многопроцессорного пулакоторый считывает задачи из очереди и отправляет их в функцию (FN), которая затем вызывает API.
Проблема заключается в следующем: я хочу, чтобы я мог продолжать выдвигать задачи в очереди, которыепостоянно читаются в бассейне.Но этого не происходит.При обновлении очереди (то есть при переносе задач в очередь) функция (FN) не вызывается.Ниже приведен код:
from multiprocessing import Pool, Queue, cpu_count, Manager
class PoolThread:
running = False
def __init__(self, a):
self.initialisepool()
@staticmethod
def poolfunction(arg):
print(arg)
print(arg.qsize())
return "processed"
#return arg * arg
def callb(self, a):
print(a)
def errcall(self, b):
print(b)
def initialisepool(self):
print(cpu_count())
self.p = Pool(cpu_count())
self.m = Manager()
self.q = self.m.Queue()
self.p.map_async(self.poolfunction, (self.q,), cpu_count(), self.callb, self.errcall)
def readcache(self):
if not self.running:
self.running = True
self.threadedhandling()
def threadedhandling(self):
tasks = self.gettasks()
# Push to queue
self.q.put(tasks)
def gettasks(self):
dict_r = {'randomval' : 2}
return dict_r
pt = PoolThread()
pt.readcache()
В приведенном выше пул инициализируется.Однако при вызове readcache () функция пула не вызывается повторно.Если очередь заполняется данными до self.p.map_async(self.poolfunction, (self.q,), cpu_count(), self.callb, self.errcall)
, то вызывается только функция пула.
Мое требование заключается в том, что:
- Я должен продолжать вставлять в очередь, и очередь автоматическиконтролируется для вызова функции пула.
- как я могу отслеживать результат работы пула?Функция пула должна получать только один аргумент за раз из очереди, которая передается в API.Как только ответ получен, я хочу прочитать отдельный ответ и выполнить дальнейшую обработку в отдельной функции.Но насколько я читал о пуле, результат всех процессов, запущенных одновременно, возвращается после завершения каждого процесса.Это правда?Чтобы справиться с этим, я использую опцию «chunksize», так что даже если у меня в очереди 20 задач, но только 8 ядер, то одновременно обрабатываются 8 задач и возвращается их ответ.Тем не менее, можно ли продолжать получать ответ по завершении задач?