Streamz / Dask: сбор не ждет всех результатов буфера - PullRequest
4 голосов
/ 03 февраля 2020

Импорт:

from dask.distributed import Client
import streamz
import time

Имитированная рабочая нагрузка:

def increment(x):
    time.sleep(0.5)
    return x + 1

Предположим, я хотел бы обработать некоторую рабочую нагрузку на локальном клиенте Dask:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).gather().sink(print)

        for i in range(10):
            ps.emit(i)

Это работает, как и ожидалось, но sink(print), конечно, заставит ожидать каждого результата , поэтому поток не будет выполняться параллельно.

Однако, если я использую buffer() чтобы разрешить кэширование результатов, gather(), похоже, больше не собирает все результаты правильно, и интерпретатор завершает работу, прежде чем получить результаты. Этот подход:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)
                                     # ^
        for i in range(10):          # - allow parallel execution 
            ps.emit(i)               # - before gather()

... не выводит никаких результатов для меня. Интерпретатор Python просто завершается вскоре после запуска скрипта, а до того, как buffer() выдает свои результаты, таким образом, Ничего не печатается.

Однако, если основной процесс вынужден ждать некоторое время, результаты печатаются параллельно (поэтому они не ждут друг друга, а печатаются почти одновременно):

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)

        for i in range(10):
            ps.emit(i)

        time.sleep(10)  # <- force main process to wait while ps is working

Почему это так? Я подумал, что gather() должен ждать пакет из 10 результатов, поскольку buffer() должен кешировать ровно 10 результатов параллельно, прежде чем сбросить их до gather(). Почему gather() не блокируется в этом случае?

Есть ли хороший способ в противном случае проверить, содержит ли поток все еще обрабатываемые элементы , чтобы предотвратить преждевременный выход из основного процесса?

1 Ответ

1 голос
/ 07 февраля 2020
  1. «Почему это так?»: Потому что распределенный планировщик Dask (который выполняет функции отображения потоков и приемника) и ваш сценарий python выполняются в разных процессах. Когда контекст блока «with» заканчивается, ваш Dask Client закрывается, и выполнение останавливается, прежде чем элементы, отправленные в поток, смогут достичь функции приемника.

  2. "Есть хороший способ иначе проверить, содержит ли Поток все еще обрабатываемые элементы ": не то, что я знаю. Однако : если требуемое поведение (я просто угадываю здесь) - параллельная обработка множества элементов, тогда Streamz - это не то, что вам следует использовать, ванильного Dask должно быть достаточно.

...