Сбой Dask с / из-за ошибки Tornado «слишком много файлов открыто» - PullRequest
0 голосов
/ 15 мая 2019

Я использую ноутбук Jupyter, запущенный от Anaconda.При попытке инициализировать распределенную среду Dask выдается следующая ошибка пакета Tornado:

tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
    yield w._start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
    response = yield self.instantiate()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
    self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 351, in start
    self.init_result_q = init_q = mp_context.Queue()
  File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 102, in Queue
    return Queue(maxsize, ctx=self.get_context())
  File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 41, in __init__
    self._reader, self._writer = connection.Pipe(duplex=False)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 517, in Pipe
    fd1, fd2 = os.pipe()
OSError: [Errno 24] Too many open files
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
    yield w._start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 143, in _start
    listen_args=self.listen_args)
  File "/anaconda3/lib/python3.7/site-packages/distributed/core.py", line 272, in listen
    self.listener.start()
  File "/anaconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 396, in start
    backlog=backlog)
  File "/anaconda3/lib/python3.7/site-packages/tornado/netutil.py", line 134, in bind_sockets
    sock = socket.socket(af, socktype, proto)
  File "/anaconda3/lib/python3.7/socket.py", line 151, in __init__
    _socket.socket.__init__(self, family, type, proto, fileno)
OSError: [Errno 24] Too many open files
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<timed exec> in <module>

/anaconda3/lib/python3.7/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, **kwargs)
    634             ext(self)
    635 
--> 636         self.start(timeout=timeout)
    637 
    638         from distributed.recreate_exceptions import ReplayExceptionClient

/anaconda3/lib/python3.7/site-packages/distributed/client.py in start(self, **kwargs)
    757             self._started = self._start(**kwargs)
    758         else:
--> 759             sync(self.loop, self._start, **kwargs)
    760 
    761     def __await__(self):

/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

/anaconda3/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
    820                 self.cluster = LocalCluster(loop=self.loop, asynchronous=True,
    821                                             **self._startup_kwargs)
--> 822                 yield self.cluster
    823             except (OSError, socket.error) as e:
    824                 if e.errno != errno.EADDRINUSE:

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/asyncio/tasks.py in _wrap_awaitable(awaitable)
    601     that will later be wrapped in a Task by ensure_future().
    602     """
--> 603     return (yield from awaitable.__await__())
    604 
    605 

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py in _start(self, ip, n_workers)
    189         self.scheduler.start(scheduler_address)
    190 
--> 191         yield [self._start_worker(**self.worker_kwargs) for i in range(n_workers)]
    192 
    193         self.status = 'running'

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in callback(f)
    881             for f in children:
    882                 try:
--> 883                     result_list.append(f.result())
    884                 except Exception as e:
    885                     if future.done():

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py in _start_worker(self, death_timeout, **kwargs)
    206               death_timeout=death_timeout,
    207               silence_logs=self.silence_logs, **kwargs)
--> 208         yield w._start()
    209 
    210         self.workers.append(w)

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/nanny.py in _start(self, addr_or_port)
    155 
    156         logger.info('        Start Nanny at: %r', self.address)
--> 157         response = yield self.instantiate()
    158         if response == 'running':
    159             assert self.worker_address

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/nanny.py in instantiate(self, comm)
    224                 result = yield gen.with_timeout(
    225                         timedelta(seconds=self.death_timeout),
--> 226                         self.process.start()
    227                 )
    228             except gen.TimeoutError:

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in wrapper(*args, **kwargs)
    324                 try:
    325                     orig_stack_contexts = stack_context._state.contexts
--> 326                     yielded = next(result)
    327                     if stack_context._state.contexts is not orig_stack_contexts:
    328                         yielded = _create_future()

/anaconda3/lib/python3.7/site-packages/distributed/nanny.py in start(self)
    350 
    351         self.init_result_q = init_q = mp_context.Queue()
--> 352         self.child_stop_q = mp_context.Queue()
    353         uid = uuid.uuid4().hex
    354 

/anaconda3/lib/python3.7/multiprocessing/context.py in Queue(self, maxsize)
    100         '''Returns a queue object'''
    101         from .queues import Queue
--> 102         return Queue(maxsize, ctx=self.get_context())
    103 
    104     def JoinableQueue(self, maxsize=0):

/anaconda3/lib/python3.7/multiprocessing/queues.py in __init__(self, maxsize, ctx)
     39             from .synchronize import SEM_VALUE_MAX as maxsize
     40         self._maxsize = maxsize
---> 41         self._reader, self._writer = connection.Pipe(duplex=False)
     42         self._rlock = ctx.Lock()
     43         self._opid = os.getpid()

/anaconda3/lib/python3.7/multiprocessing/connection.py in Pipe(duplex)
    515             c2 = Connection(s2.detach())
    516         else:
--> 517             fd1, fd2 = os.pipe()
    518             c1 = Connection(fd1, writable=False)
    519             c2 = Connection(fd2, readable=False)

OSError: [Errno 24] Too many open files

Похоже, проблема связана с «Tornado», как указано в [здесь] [https://github.com/dask/distributed/issues/1941]. Моя версияУ Anaconda есть Tornado 5.1.1 с Python 3.7.3 и Dask 1.25.1

Это код, который запускается:

%%time
import pandas as pd
import dask.dataframe as dd
import dask.distributed as dist

client = dist.Client()

Несколько недель назад я смог запустить небольшойраспространены примеры Dask, и я могу успешно запустить Dask, не вызывая клиента.Если проблема в Торнадо, есть ли обходной путь?

На случай, если кому-то интересно узнать об ограничениях файлов процесса MacOS - enter image description here

Ответы [ 2 ]

1 голос
/ 21 мая 2019

Вчера я обнаружил ту же проблему с основной веткой (20.05.2009) и обнаружил следующее: https://github.com/dask/distributed/issues/733. Для себя я просто посмотрел базовый скрипт для dask_scheduler и скопировал его в Pycharm:

    from distributed.cli.dask_scheduler import go
       if __name__ == '__main__':
         go()

Запускается, работает стабильно, и я подключил к нему работника из командной строки.

0 голосов
/ 16 мая 2019

Я нашел обходной путь. В MacOS похоже, что использование командной строки для изменения лимитов файлов подходит только для запуска с терминала. Кроме того, если вы перезапустите, ограничения будут сброшены до исходных значений (256 было по умолчанию на моем компьютере). Чтобы правильно установить ограничения, вы должны создать файл «limit.maxfiles.plist» в / etc / Library / LaunchDaemons и перезапустить. Я получил это от здесь . Это работает вокруг ошибки «слишком много файлов», но, вероятно, только откладывает проблему с Tornado.

...