Я пытаюсь использовать Dask для распределения работы с машины (назовите ее A) по 4 серверам в центре обработки данных (назовите их B, C, D и E). A должен настроить SSHCluster, назначив планировщику жить на B, который затем должен порождать рабочих на B, C, D и E. Хитрость в том, что открыты только некоторые порты и поэтому должны быть указаны. Это легко сделать для планировщика, но я не могу заставить его работать для рабочих.
Если они не указаны, A успешно запускает планировщик на B. Затем планировщик считает, что он успешно запускает все рабочие на случайных портах, но при сборе результатов обнаруживают, что он может связываться только с рабочими на B. Пока это имеет смысл. Код для этого:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
})
Как только я пытаюсь установить порты для рабочих, он не запускает рабочие. Кажется, это происходит независимо от того, какой вклад я даю. Я попытался запустить по одному воркеру на каждом сервере, указав порт для использования как int:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
'port': 60000,
})
Я попытался запустить несколько воркеров на каждом сервере, указав диапазон используемых портов:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
'port': '{}:{}'.format(
60000, 60000 + procs_per_node - 1),
})
Я попытался запустить несколько рабочих процессов на каждом сервере, указав полный диапазон доступных портов:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
'port': '60000:61000'
})
Каждый раз, когда он возвращает четыре ошибки (от B, C, D и E) с сообщением «Исключение: рабочий процесс не запустился»
В итоге, это мои вопросы:
- Как я могу назначить порты рабочим в Dask SSHCluster?
- Как только это будет сделано, нужно ли мне сделать то же самое для процессов няни? Если да, то как?
Для справки, вот версии, которые я использую (возможно, не все актуальны): python 3.8.3, dask 2.18.1, dask-core 2.18 .1, раздача 2.18.0, торнадо 6.0.4, боке 2.01