Я хочу получать данные из API в пакетном режиме, а затем обрабатывать эти данные. Пока процессор не используется во время загрузки текущего пакета, было бы неплохо, если бы другой поток мог обрабатывать данные из предыдущего потока.
В следующем примере используются потоки n
для загрузки пакета и его обработки. , Я наблюдаю за тем, что все потоки n
будут загружать свои партии одновременно, а затем обрабатывать данные одновременно. Не совсем то, что я хочу. Любые предложения?
import concurrent.futures
import threading
thread_local = threading.local()
def fetch_async(batch_size, threads=3):
def fetch(offset):
print(f'Starting download offset={offset}, id={threading.get_ident()}')
res = download(offset, batch_size)
return process(res)
print(f'Fetching async: batch_size={batch_size}')
offsets = list(range(0, total + batch_size, batch_size))
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
res = executor.map(fetch, offsets)
# materialize + flatten iterator
res = list(res)
return [y for x in res for y in x]
В журналах я вижу выражение Starting download
печать в быстрой последовательности для каждого потока, затем пауза во время загрузки и обработки. Опять же, несколько утверждений снова в основном одновременно, et c. Загрузка части занимает 3-5 секунд. Я экспериментировал с 3, 4, 8 и 12 потоками. То же поведение и примерно столько же времени для завершения всего процесса (выполнение ~ 100 отдельных вызовов API).