Как синхронизировать поток для обработки данных, в то время как другой извлекает данные из API? - PullRequest
1 голос
/ 06 февраля 2020

Я хочу получать данные из API в пакетном режиме, а затем обрабатывать эти данные. Пока процессор не используется во время загрузки текущего пакета, было бы неплохо, если бы другой поток мог обрабатывать данные из предыдущего потока.

В следующем примере используются потоки n для загрузки пакета и его обработки. , Я наблюдаю за тем, что все потоки n будут загружать свои партии одновременно, а затем обрабатывать данные одновременно. Не совсем то, что я хочу. Любые предложения?

import concurrent.futures
import threading


thread_local = threading.local()

def fetch_async(batch_size, threads=3):
    def fetch(offset):
        print(f'Starting download offset={offset}, id={threading.get_ident()}')
        res = download(offset, batch_size)
        return process(res)


    print(f'Fetching async: batch_size={batch_size}')
    offsets = list(range(0, total + batch_size, batch_size))

    with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
        res = executor.map(fetch, offsets)

    # materialize + flatten iterator
    res = list(res)
    return [y for x in res for y in x]

В журналах я вижу выражение Starting download печать в быстрой последовательности для каждого потока, затем пауза во время загрузки и обработки. Опять же, несколько утверждений снова в основном одновременно, et c. Загрузка части занимает 3-5 секунд. Я экспериментировал с 3, 4, 8 и 12 потоками. То же поведение и примерно столько же времени для завершения всего процесса (выполнение ~ 100 отдельных вызовов API).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...