На основе документации streamz можно использовать распределенный кластер dask следующим образом:
from distributed import Client
client = Client('tcp://localhost:8786') # Connect to scheduler that has distributed workers
from streamz import Stream
source = Stream()
(source.scatter() # scatter local elements to cluster, creating a DaskStream
.map(increment) # map a function remotely
.buffer(5) # allow five futures to stay on the cluster at any time
.gather() # bring results back to local process
.sink(write)) # call write locally
for x in range(10):
source.emit(x)
Концептуально не ясно, почему нам не нужнопередать распределенный dask client
в качестве параметра для создания экземпляра Stream()
.В частности, как Stream()
узнает, к какому планировщику подключаться?
Что бы вы сделали, если бы у вас было два планировщика, которые работают на несвязанных узлах, таких как:
from distributed import Client
client_1 = Client('tcp://1.2.3.4:8786')
client_2 = Client('tcp://10.20.30.40:8786')
Каксоздать два потока для client_1
и client_2
соответственно?