Я выполняю несколько параллельных задач в многоузловом распределенном кластере Dask. Однако, как только задачи завершены, рабочие по-прежнему занимают большую память, и кластер скоро заполняется.
Я пробовал client.restart()
после каждой задачи и client.cancel(df)
, первая убивает рабочих и отправляет CancelledError
для других запущенных задач, которые являются проблемными и вторыми, это не очень помогло, потому что мы используем много пользовательских объектов и функций внутри функций dask map
. Добавление del
для известных переменных и gc.collect()
также мало чем поможет.
Я уверен, что большая часть памяти удерживается из-за пользовательских функций python и объектов, вызываемых с помощью client.map(..)
.
Мои вопросы:
- Есть ли способ из командной строки или другой способ, который похож на
trigger worker restart if no tasks are running right now
. - Если нет, каковы возможные решения к этой проблеме? Мне будет невозможно избежать пользовательских объектов и чистых python функций внутри задач Dask.