Streamz с распределенным Dask - PullRequest
       8

Streamz с распределенным Dask

0 голосов
/ 02 октября 2018

На основе документации streamz можно использовать распределенный кластер dask следующим образом:

from distributed import Client
client = Client('tcp://localhost:8786')  # Connect to scheduler that has distributed workers

from streamz import Stream
source = Stream()
(source.scatter()       # scatter local elements to cluster, creating a DaskStream
       .map(increment)  # map a function remotely
       .buffer(5)       # allow five futures to stay on the cluster at any time
       .gather()        # bring results back to local process
       .sink(write))    # call write locally

for x in range(10):
    source.emit(x)

Концептуально не ясно, почему нам не нужнопередать распределенный dask client в качестве параметра для создания экземпляра Stream().В частности, как Stream() узнает, к какому планировщику подключаться?

Что бы вы сделали, если бы у вас было два планировщика, которые работают на несвязанных узлах, таких как:

from distributed import Client
client_1 = Client('tcp://1.2.3.4:8786')
client_2 = Client('tcp://10.20.30.40:8786')

Каксоздать два потока для client_1 и client_2 соответственно?

1 Ответ

0 голосов
/ 03 октября 2018

Основное правило в Dask: если определен распределенный клиент, используйте его для любых вычислений Dask.Если имеется более одного распределенного клиента, используйте последний созданный, который еще жив.

Streamz не позволяет явно выбирать, какой клиент использовать, когда вы .scatter(), он использует dask.distributed.default_client() для выбораодин.Возможно, вы захотите поднять проблему, разрешив использование ключевого слова client=.Рабочий процесс даже не соответствует контекстно-ориентированному подходу.На данный момент, если вы хотите, чтобы несколько потоков одновременно работали с данными в разных кластерах Dask, вам, вероятно, придется манипулировать состоянием dask.distributed.client._global_clients.

...