У меня работает система очередей с двумя потоками (polling
и fill_polling_queue
). Очередь имеет фиксированную максимальную длину, которую я не могу превышать. Первый поток создает некоторые URL-адреса, и потребитель должен выполнить итерацию по очереди и проверить, насколько этот ULR дает определенную обратную связь, прежде чем я удаляю его из очереди.
Моя проблема в том, что я не нахожу способ проверить статус для определенного ULR без использования метода .get()
внутри моей функции polling
. Это проблема c, потому что, когда я удаляю его из очереди и возвращаю (если статус не успешен), может случиться так, что fill_polling_queue
уже занял место, а очередь заполнена и, следовательно, заблокирована.
Вот минимальный пример, иллюстрирующий мою проблему:
import numpy as np
import threading, queue
import time
polling_queue = queue.Queue(maxsize=3)
def fill_polling_queue():
for url in [f"someurl_{i}" for i in range(10)]:
while True:
if not polling_queue.full():
polling_queue.put(url)
print("add a item to the queue")
break
else:
time.sleep(0.5) # wait 500ms
continue
def polling():
while True:
print(polling_queue.queue)
# iterate over the urls and check if
# some external condition is given
# here I modeled that condition by a
# random number
url = polling_queue.get()
sucess = bool(np.random.randint(2))
time.sleep(.2)
if sucess:
pass
else:
polling_queue.put(url)
threading.Thread(target=fill_polling_queue, daemon=True).start()
threading.Thread(target=polling, daemon=True).start()
# block until all tasks are done
polling_queue.join()
print("finished")
другая идея, которую я пробовал, заключалась в повторении очереди без вызова .get()
, но затем я не смог найти способ удалить это элемент из очереди, когда статус был успешным.
Есть ли более изощренный способ подойти к этому?