Распределить рабочую нагрузку в очереди между несколькими разными работниками - PullRequest
1 голос
/ 05 февраля 2020

О

У меня есть класс DataRetriever, для которого требуется создать экземпляр с учетными данными API. У меня есть пять различных наборов учетных данных API, поэтому я хочу создать пять экземпляров DataRetriever. DataRetriever имеет только один метод c publi retrieve, который, как следует из названия, извлекает некоторые данные, используя subprocess на основе id, переданного методу.

  • данные учетные данные API не могут открывать более одного потока (с любым идентификатором) одновременно
  • a DataRetriever может иметь не более одного подключения к API, поэтому DataRetriever#retrieve(id) должен не вызываться для экземпляра DataRetriever, который все еще извлекает поток данных
  • , объем данных меняется, поэтому время до завершения подпроцесса может быть любым, от нескольких секунд до нескольких минут

Текущий подход

Я использую queue, как видно из примера фрагмента. Я заполняю очередь всеми id s потоков данных, которые необходимо извлечь.

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

Вопрос

Я всегда могу go с шаблоном наблюдателя, но я Интересно, есть ли Python способ сделать это?

  • Как я могу убедиться, что worker из приведенного выше фрагмента кода распределяет рабочую нагрузку в очереди только на холостом ходу DataRetriever с при использовании все пять экземпляров DataRetriever без проблем?
  • Во время исследования я обнаружил, что ProcessPoolExecutor не может адаптировать примеры к моему сценарию. Может ли это быть решением?

1 Ответ

1 голос
/ 05 февраля 2020

Вы можете сделать следующее:

def worker(q_request, q_response, api_cred):
    dr = DataRetriever(api_cred)
    while True:
        stream_id = q_request.get() # that's blocking unless q.get(False)
        if stream_id == "stop":
            sys.exit(0)
        dr.retrieve(stream_id) # that can take some time (assume blocking)
        q_response.put(stream_id) # signal job has ended to parent process

api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
    t.start()
    threads.append(t)

for item in source():
    q_request.put(item)
    print("Stream ID %s was successfully retrieved." %q_response.get())

Это предполагает, что dr.retrieve(stream_id) блокирует, или что у вас есть какой-то способ узнать, что подпроцесс, запущенный dr.retrieve(stream_id), не завершился, поэтому ваш работник будет блокироваться, пока это не будет сделано (иначе реализация DataRetriever должна измениться).

q.get() блокируется по умолчанию, поэтому ваши процессы worker будут ждать в соответствии с другими, пока объект не будет принят Это. Объект Queue() также является FIFO, поэтому вы можете быть уверены, что работа будет равномерно распределена между вашими worker процессами.

...