Карта Dask bag with OSError: [Errno 24] Слишком много открытых файлов - PullRequest
2 голосов
/ 09 мая 2019

Я пытаюсь распараллелить поэлементное вычисление с пользовательской функцией в пакете Dask на моей локальной машине, используя мои 8 ядер. В частности, список представляет собой список узлов графа G, созданного NetworkX (я для краткости оставил этот код выключенным, но я всегда могу его включить). Я хотел бы использовать эту сумку для запуска вычисления графа NetworkX для каждого узла в сумке, который первоначально получен в виде списка.

Список был создан с использованием:

from dask.distributed import Client
import dask.bag as db
import networkx as nx

client = Client()

def my_func(x, G):
    ego_graph = nx.ego_graph(G, x, radius=2)
    ego_graph_ls = nx.to_edgelist(ego_graph)
    return ego_graph_ls

starting_list = ['node1', 'node2', 'node3', 'node4']
my_bag = db.from_sequence(starting_list)
result = my_bag.map(my_func, G).compute()  # G is a nx.Graph()

Другими словами, для каждого узла в my_bag я хотел бы параллельно рассчитать список ребер для этого узла в общем графе.

Однако, когда я запускаю этот код, я получаю следующую ошибку (усеченную для пробела, но я могу предоставить всю вещь, если это необходимо):

2019-05-09 11:00:58,566 - base_events.py - default_exception_handler - ERROR - Exception in callback BaseAsyncIOLoop._handle_events(64, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(64, 1)>
Traceback (most recent call last):
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
    handler_func(fileobj, events)
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
    connection, address = sock.accept()
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/socket.py", line 205, in accept
    fd, addr = self._accept()
OSError: [Errno 24] Too many open files

При чтении документации API здесь предполагается, что это возможно. Я также читал посты типа , этот , в котором, возможно, Dask Delayed является предпочтительным способом распараллеливания цикла for (который может легко стать при использовании starting_list вместо этого).

Так в чем же причина этого OSError и это можно исправить в моем коде, или мне нужно попробовать другой подход?

1 Ответ

0 голосов
/ 09 мая 2019

Я перезапустил это выше, заменив последнюю строку на:

result = my_bag.map(my_func, G).compute(scheduler='processes')

и он работал без ошибок. Это было очень медленно, хотя поток задач на странице состояния не показывает распараллеливания.

...