Продолжайте добавлять задачи в многопроцессорный пул для чтения пулом - PullRequest
0 голосов
/ 11 сентября 2018

Одна часть моего приложения производит много задач (по сути, словарь).Каждая задача (словарь) по сути представляет собой данные POST для стороннего API.

Поэтому я хотел многопроцессорное выполнение этих задач.

То, что я думал, было: Создание многопроцессорного пулакоторый считывает задачи из очереди и отправляет их в функцию (FN), которая затем вызывает API.

Проблема заключается в следующем: я хочу, чтобы я мог продолжать выдвигать задачи в очереди, которыепостоянно читаются в бассейне.Но этого не происходит.При обновлении очереди (то есть при переносе задач в очередь) функция (FN) не вызывается.Ниже приведен код:

from multiprocessing import Pool, Queue, cpu_count, Manager


class PoolThread:
    running = False

    def __init__(self, a):
        self.initialisepool()

    @staticmethod
    def poolfunction(arg):
        print(arg)
        print(arg.qsize())
        return "processed"
        #return arg * arg

    def callb(self, a):
        print(a)

    def errcall(self, b):
        print(b)

    def initialisepool(self):
        print(cpu_count())
        self.p = Pool(cpu_count())
        self.m = Manager()
        self.q = self.m.Queue()
        self.p.map_async(self.poolfunction, (self.q,), cpu_count(), self.callb, self.errcall)

    def readcache(self):
        if not self.running:
            self.running = True
            self.threadedhandling()

    def threadedhandling(self):
        tasks = self.gettasks()
        # Push to queue
        self.q.put(tasks)


    def gettasks(self):
        dict_r = {'randomval' : 2}
        return dict_r


pt = PoolThread()
pt.readcache()

В приведенном выше пул инициализируется.Однако при вызове readcache () функция пула не вызывается повторно.Если очередь заполняется данными до self.p.map_async(self.poolfunction, (self.q,), cpu_count(), self.callb, self.errcall), то вызывается только функция пула.

Мое требование заключается в том, что:

  • Я должен продолжать вставлять в очередь, и очередь автоматическиконтролируется для вызова функции пула.
    • как я могу отслеживать результат работы пула?Функция пула должна получать только один аргумент за раз из очереди, которая передается в API.Как только ответ получен, я хочу прочитать отдельный ответ и выполнить дальнейшую обработку в отдельной функции.Но насколько я читал о пуле, результат всех процессов, запущенных одновременно, возвращается после завершения каждого процесса.Это правда?Чтобы справиться с этим, я использую опцию «chunksize», так что даже если у меня в очереди 20 задач, но только 8 ядер, то одновременно обрабатываются 8 задач и возвращается их ответ.Тем не менее, можно ли продолжать получать ответ по завершении задач?
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...