Мне нужно программно создать удаленных работников и использовать их для выполнения задачи, а затем закрыть их.
Пример кода, приведенный в документации, прекрасно работает для написанного:
import asyncio
from distributed import Worker, Scheduler, Client
from distributed.scheduler import WorkerState
s = "x.x.x.x:8786" # remote IP, not local, started from command line.
async def f():
async with Worker(s) as w1, Worker(s) as w2:
async with Client(s, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
Допустим, у меня есть n
разных машин, кроме dask scheduler - ip1, ip2, ..ipn
. Теперь я сталкиваюсь с двумя проблемами:
- После подключения к удаленному планировщику я хочу создать рабочих на нескольких машинах. Допустим,
ip1, ip2, ip3
. Попытка использовать оба параметра host
и contact_address
при создании Worker
. Рабочие запускаются в самом локальном планировщике, но не на нужных машинах. Как запустить удаленно рабочих на нужных машинах, подключающихся к одному и тому же планировщику? - Мне нужно
client
, созданный в функции async
, для использования во множественных вызовах submit
, map
во времени. Также у меня есть много пользовательских python функций. Итак, как мне программно создавать рабочих на разных машинах, создавать client
и использовать его во времени вне функции asyn c. Я попытался следовать, безуспешно.
s_address = "x.x.x.x:8786" # remote scheduler IP
async def f():
async with Worker(s_address) as w1, Worker(s_address) as w2:
async with Client(s_address, asynchronous=True) as client:
return client
client_to_use = f() # expecting client object which I can use and...
# ...when everything finishes, hoping context manager kills the workers.
# This clearly doesn't work
asyncio.get_event_loop().run_until_complete(f()) # not sure if this is valid anymore
# What I need to do
custom_module.call_some_fn_to_use_dask_client(client_to_use) # Does not work as well!! ```