Вы можете сделать следующее:
def worker(q_request, q_response, api_cred):
dr = DataRetriever(api_cred)
while True:
stream_id = q_request.get() # that's blocking unless q.get(False)
if stream_id == "stop":
sys.exit(0)
dr.retrieve(stream_id) # that can take some time (assume blocking)
q_response.put(stream_id) # signal job has ended to parent process
api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
t.start()
threads.append(t)
for item in source():
q_request.put(item)
print("Stream ID %s was successfully retrieved." %q_response.get())
Это предполагает, что dr.retrieve(stream_id)
блокирует, или что у вас есть какой-то способ узнать, что подпроцесс, запущенный dr.retrieve(stream_id)
, не завершился, поэтому ваш работник будет блокироваться, пока это не будет сделано (иначе реализация DataRetriever
должна измениться).
q.get()
блокируется по умолчанию, поэтому ваши процессы worker
будут ждать в соответствии с другими, пока объект не будет принят Это. Объект Queue()
также является FIFO, поэтому вы можете быть уверены, что работа будет равномерно распределена между вашими worker
процессами.