Не удалось поймать исключение KeyboardInterrupt после запуска dask.distributed Client / LocalClient - PullRequest
0 голосов
/ 11 октября 2018

Я пытаюсь использовать Ctrl + C, чтобы изящно остановить мой работающий код, включая локальный клиент dask.distrubted.Код ниже является примером моей настройки.Когда я использую Ctrl + C, метод stop () вызывается правильно, однако клиент dask, похоже, неправильно завершает / печатает трассировку до того, как даже достигнет метода self.dask.close ().

from dask.distributed import Client

class SomeService(object):
    def __init__(self):
        self.dask = None

    def run(self):
        # Setup local dask client
        self.dask = Client()

        while True:
            # Do something blocking

    def stop(self):
        # Close dask client
        print('Closing Dask Client...')
        self.dask.close()
        print('Dask Cient closed.')
        # Stop other stuff
        # ...
        print('SomeService has been stopped.')


if __name__ == '__main__':
    service = SomeService()
    try:
        service.run()
    except KeyboardInterrupt:
        print('Keyboard interrupt received.')
        service.stop()

Это вывод, который я получаю:

^CTraceback (most recent call last):
File "<string>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/forkserver.py", line 170, in main
rfds = [key.fileobj for (key, events) in selector.select()]
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/selectors.py", line 560, in select
kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt
Keyboard interrupt received.
Closing Dask Client...
Dask Client closed.
SomeService has been stopped.

Я старался изо всех сил, чтобы Google / Stack переполнил эту проблему, и единственное решение, которое я нашел, это предложения использовать пакет сигналов, чтобы вызвать обратный вызовSIGINT.Это решение, однако, не работает, если класс SomeService должен быть запущен в отдельном потоке, поскольку вы можете принудительно вызывать обратный вызов сигналов для сигналов в основном потоке.

Любые предложения приветствуются.Разве это не правильный способ хранения и управления Dask Client / LocalClient?

Дополнительная информация:

Python 3.5.1
dask==0.19.2
distributed==1.23.2
tornado==5.0.2

1 Ответ

0 голосов
/ 29 октября 2018

Используйте обработчик сигнала, который устанавливает флаг.Когда он доставляется, он вызывает исключение IOError (с элементом .errno, установленным на errno.EINTR):

import signal

done = False
def done_handler(signum, frame):
    done = True

if __name__ == '__main__':
    signal.signal(signal.SIGINT, done_handler)

    service = SomeService()

    while not done:
        try:
            service.run()
        except IOError:
            break

    service.stop()

В Python сигналы доставляются, а обработчики выполняются всегда в основном потоке, поэтому, если не перехвачено, исключение IOError должно распространяться до service.run().(В вашем примере KeyboardInterrupt делает.) Когда это исключение перехватывается, служба останавливается.

...