Итак, у меня есть Python multiprocessing.Queue, в который я добавляю элементы в начале прогона X. Поскольку qsize () не работает должным образом, я добавил Y Sentinels (queue.put(None)
) для Y процессов, которые будут потреблять из очереди. Это хорошо работало в одной из моих предыдущих реализаций.
Теперь, в настоящее время, я хочу иметь возможность добавлять новые записи в очередь, что означает, что очередь будет расти во время выполнения. Хотя добавление часовых было довольно элегантно, в этом сценарии оно больше не работает.
Поэтому мой вопрос: есть ли у вас какие-либо другие решения для проверки, пуста ли очередь, кроме часовых? Или есть способ реально добавить в начало очереди?
См. Ниже мою текущую реализацию для добавления часовых и запуска процессов.
def processes_handler(self,target_worker_function,worker_count):
processes = [Process(target=target_worker_function, args=(self.queue,)) for _ in range(worker_count)]
#adding sentinel record since queue can be signaled as empty when its really not
for _ in range(worker_count): self.queue.put(None)
for process in processes: process.start()
for process in processes: process.join()
def worker_function(self, queue):
while True:
record = queue.get()
if record is None: break
else: do_something(record)