Многопроцессорное добавление дополнительных задач в середине выполнения - проблема с часовыми - PullRequest
0 голосов
/ 11 марта 2020

Итак, у меня есть Python multiprocessing.Queue, в который я добавляю элементы в начале прогона X. Поскольку qsize () не работает должным образом, я добавил Y Sentinels (queue.put(None)) для Y процессов, которые будут потреблять из очереди. Это хорошо работало в одной из моих предыдущих реализаций.
Теперь, в настоящее время, я хочу иметь возможность добавлять новые записи в очередь, что означает, что очередь будет расти во время выполнения. Хотя добавление часовых было довольно элегантно, в этом сценарии оно больше не работает.
Поэтому мой вопрос: есть ли у вас какие-либо другие решения для проверки, пуста ли очередь, кроме часовых? Или есть способ реально добавить в начало очереди?

См. Ниже мою текущую реализацию для добавления часовых и запуска процессов.

def processes_handler(self,target_worker_function,worker_count):
    processes = [Process(target=target_worker_function, args=(self.queue,)) for _ in range(worker_count)]
    #adding sentinel record since queue can be signaled as empty when its really not
    for _ in range(worker_count):   self.queue.put(None)
    for process in processes:   process.start()
    for process in processes:   process.join()

def worker_function(self, queue):
    while True:
        record = queue.get()
        if record is None: break
        else: do_something(record)


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...