Как правильно установить рабочие порты в SSHCluster, распределенном по Dask? - PullRequest
2 голосов
/ 18 июня 2020

Я пытаюсь использовать Dask для распределения работы с машины (назовите ее A) по 4 серверам в центре обработки данных (назовите их B, C, D и E). A должен настроить SSHCluster, назначив планировщику жить на B, который затем должен порождать рабочих на B, C, D и E. Хитрость в том, что открыты только некоторые порты и поэтому должны быть указаны. Это легко сделать для планировщика, но я не могу заставить его работать для рабочих.

Если они не указаны, A успешно запускает планировщик на B. Затем планировщик считает, что он успешно запускает все рабочие на случайных портах, но при сборе результатов обнаруживают, что он может связываться только с рабочими на B. Пока это имеет смысл. Код для этого:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                 })

Как только я пытаюсь установить порты для рабочих, он не запускает рабочие. Кажется, это происходит независимо от того, какой вклад я даю. Я попытался запустить по одному воркеру на каждом сервере, указав порт для использования как int:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': 60000,
                                 })

Я попытался запустить несколько воркеров на каждом сервере, указав диапазон используемых портов:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': '{}:{}'.format(
                                         60000, 60000 + procs_per_node - 1),
                                 })

Я попытался запустить несколько рабочих процессов на каждом сервере, указав полный диапазон доступных портов:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': '60000:61000'
                                 })

Каждый раз, когда он возвращает четыре ошибки (от B, C, D и E) с сообщением «Исключение: рабочий процесс не запустился»

В итоге, это мои вопросы:

  • Как я могу назначить порты рабочим в Dask SSHCluster?
  • Как только это будет сделано, нужно ли мне сделать то же самое для процессов няни? Если да, то как?

Для справки, вот версии, которые я использую (возможно, не все актуальны): python 3.8.3, dask 2.18.1, dask-core 2.18 .1, раздача 2.18.0, торнадо 6.0.4, боке 2.01

1 Ответ

2 голосов
/ 27 июня 2020

Кажется, это нормально работает при использовании worker_port вместо port

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'worker_port': '60000:61000'
                                 })

https://github.com/dask/distributed/blob/93701f82c2cef46d4e68696bf48af0fc65ea9159/distributed/cli/dask_worker.py#L54

...