Dask работник переходит в замороженное состояние и не вернется - PullRequest
0 голосов
/ 10 мая 2019

У меня есть работник (работники) Dask, который "застревает".Когда я смотрю на стек вызовов для рабочего, он выглядит так:

Worker: tcp://127.0.0.1:59180
Key: _forecast-ee805cbdce4f41ca491bc4dc194c9793
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/threading.py", line 774, in __bootstrap self.__bootstrap_inner()
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run()
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs)
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker task.run()
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/site-packages/distributed/_concurrent_futures_thread.py", line 64, in run result = self.fn(*self.args, **self.kwargs)
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/site-packages/distributed/worker.py", line 2811, in apply_function result = function(*args, **kwargs)
File "/Users/wcox/Documents/ovforecast/src/python/ovforecast/forecast.py", line 173, in _forecast end_time, persist=False, testset=testset)
File "/Users/wcox/Documents/ovforecast/src/python/ovforecast/forecast.py", line 236, in _build_features log.debug('for:')
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/logging/__init__.py", line 1162, in debug self._log(DEBUG, msg, args, **kwargs)
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/logging/__init__.py", line 1293, in _log self.handle(record)
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/logging/__init__.py", line 1303, in handle self.callHandlers(record)
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/logging/__init__.py", line 1343, in callHandlers hdlr.handle(record)
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/logging/__init__.py", line 764, in handle self.acquire()
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/logging/__init__.py", line 715, in acquire self.lock.acquire()
File "/Users/wcox/miniconda3/envs/ovf/lib/python2.7/threading.py", line 174, in acquire rc = self.__block.acquire(blocking)

Похоже, что каким-то образом он попадает в тупиковое состояние, связанное с ведением журнала.Это периодически возникающая проблема, которая не возникает каждый раз, когда я выполняю свою работу.Остальные мои работники с удовольствием обрабатывают данные, но моя работа никогда не завершается из-за зависания конечного работника.

Мое приложение выглядит так:

from dask.distributed import Client, LocalCluster, as_completed

cluster = LocalCluster(processes=config.use_dask_local_processes,
                       n_workers=6,
                       threads_per_worker=1,
                       )
client = Client(cluster)
cluster.scale(config.dask_local_worker_instances)

work_futures = []

# For each group do work
for group in groups:
    fcast_futures.append(client.submit(_forecast, group))

# Wait till the work is done
for done_work in as_completed(fcast_futures, with_results=False):
    try:
        result = done_work.result()
    except Exception as error:
        log.exception(error)

Мой регистратор настроен наDEBUG и есть StreamHandler.Настройка выглядит следующим образом:

    logformat = logformat or default_logformat
    log = logging.getLogger()
    log.setLevel(logging.DEBUG)
    formatter = logging.Formatter(logformat)

    # Logging to the console
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    log.addHandler(handler)

Я использую dask == 1.2.0 на Python 2.7.

1 Ответ

0 голосов
/ 11 мая 2019

Это известная проблема с модулем регистрации в Python 2.

К сожалению, я не знаю хорошего обходного пути.

...