Какие рекомендуемые настройки конфигурации dask-kubernetes для длительных задач? - PullRequest
0 голосов
/ 29 января 2020

Я использую что-то в соответствии с примером, приведенным в документации

import dask.bag
from dask_kubernetes import KubeCluster


cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster.adapt(minimum=0, maximum=24, interval="20000ms")
dag = dask.bag.from_sequence(tasks).map(lambda x: make_task(x).execute())

with distributed.Client(dask_cluster) as client:
    results = dag.compute(scheduler=client)

cluster.close()

В моем случае, функция execute() выполняет много операций ввода-вывода и занимает примерно 5-10 минут. Я хочу настроить планировщик KubeCluster и dask таким образом, чтобы максимально увеличить шансы на успешное выполнение этих долгосрочных задач.

Мой вопрос состоит из двух частей. Во-первых, как мне переопределить настройку конфигурации distributed? Я хотел попробовать что-то вроде

dask.config.set({'scheduler.work-stealing': False})

, но я не знаю, как правильно установить это. В частности, я не знаю, должно ли это быть известно каждому работнику, или это то, что я могу сойти с рук, указав только в точке, где я создаю экземпляр KubeCluster.

Второй часть моего вопроса связана с рекомендациями для задач, которые будут долгосрочными (более нескольких минут). Я экспериментировал с этим, используя настройки по умолчанию. Иногда все идет хорошо, а иногда вызов compute() завершается неудачей со следующим исключением:

  <... omitting caller from the traceback ...>
  File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 2587, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1885, in gather
    asynchronous=asynchronous,
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 767, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 329, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1741, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('lambda-364defe33868bf6e4864da2933065a12', 3)", <Worker 'tcp://172.18.7.71:39029', name: 9, memory: 0, processing: 4>)

Я выполняю недавний коммит из основной ветки: dask-kubernetes@git+git://github.com/dask/dask-kubernetes.git@add93d56ba1ac2f7d00576bd3f2d1be0db3e1757.

Редактировать:

Я обновил свой фрагмент кода, чтобы показать, что я вызываю функцию adapt() с минимальным числом рабочих, установленным на 0. Я начал задаваться вопросом, получение 0 рабочих может привести к отключению планировщика, прежде чем он вернет результат compute().

...