Импорт:
from dask.distributed import Client
import streamz
import time
Имитированная рабочая нагрузка:
def increment(x):
time.sleep(0.5)
return x + 1
Предположим, я хотел бы обработать некоторую рабочую нагрузку на локальном клиенте Dask:
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).gather().sink(print)
for i in range(10):
ps.emit(i)
Это работает, как и ожидалось, но sink(print)
, конечно, заставит ожидать каждого результата , поэтому поток не будет выполняться параллельно.
Однако, если я использую buffer()
чтобы разрешить кэширование результатов, gather()
, похоже, больше не собирает все результаты правильно, и интерпретатор завершает работу, прежде чем получить результаты. Этот подход:
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
# ^
for i in range(10): # - allow parallel execution
ps.emit(i) # - before gather()
... не выводит никаких результатов для меня. Интерпретатор Python просто завершается вскоре после запуска скрипта, а до того, как buffer()
выдает свои результаты, таким образом, Ничего не печатается.
Однако, если основной процесс вынужден ждать некоторое время, результаты печатаются параллельно (поэтому они не ждут друг друга, а печатаются почти одновременно):
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
for i in range(10):
ps.emit(i)
time.sleep(10) # <- force main process to wait while ps is working
Почему это так? Я подумал, что gather()
должен ждать пакет из 10 результатов, поскольку buffer()
должен кешировать ровно 10 результатов параллельно, прежде чем сбросить их до gather()
. Почему gather()
не блокируется в этом случае?
Есть ли хороший способ в противном случае проверить, содержит ли поток все еще обрабатываемые элементы , чтобы предотвратить преждевременный выход из основного процесса?