Многопроцессорная Gremlin "OSError: [Errno 9] Плохой дескриптор файла" - PullRequest
0 голосов
/ 01 мая 2020

Я пытаюсь вычислить элемент для каждой вершины в моем графе, используя gremlinpython. Это слишком медленно, чтобы последовательно перебирать каждую вершину. Несмотря на то, что пакетная обработка может помочь обеспечить ускорение, сначала я решил попробовать парализовать запрос.

В целом: 1. получить полный набор вершин, 2. разбить их на num_cores = x, 3. перебрать каждая вложенная вершина установлена ​​параллельно.

Но я получаю сообщение об ошибке «OSError: [Errno 9] Bad file descriptor». Приведенный ниже код является моей последней попыткой решить эту проблему.

import multiprocessing
from gremlin_python.structure.graph import Graph
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.traversal import lt


def create_traversal_object():
    graph = Graph()
    g = graph.traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))
    return g

g = create_traversal_object()

num_cores = 1
vertex_lsts = np.array_split(g.V().limit(30).id().toList(), num_cores)


class FeatureClass():

    def __init__(self, g, vertex_list):
        self.g = g
        self.vertex_list = vertex_list

    def orchestrator(self):
        for vertex_id in self.vertex_list:
            self.compute_number_of_names(float(vertex_id))

    def get_names(self, vertex_id):
        return self.g.V(vertex_id).inE().values('benef_nm').dedup().toList()


class Simulation(multiprocessing.Process):
    def __init__(self, id, worker, *args, **kwargs):
        # must call this before anything else
        multiprocessing.Process.__init__(self)
        self.id = id
        self.worker = worker
        self.args = args
        self.kwargs = kwargs
        sys.stdout.write('[%d] created\n' % (self.id))

    def run(self):
        sys.stdout.write('[%d] running ...  process id: %s\n' % (self.id, os.getpid()))
        self.worker.orchestrator()
        sys.stdout.write('[%d] completed\n' % (self.id))

list_of_objects = [FeatureClass(create_traversal_object(), vertex_lst) for vertex_lst in vertex_lsts]
list_of_sim = [Simulation(id=k, worker=obj) for k, obj in enumerate(list_of_objects)]  

for sim in list_of_sim:
    sim.start()

Вот полная трассировка стека, похоже, это проблема с tornado, который использует gremlinpython.

Process Simulation-1:
Traceback (most recent call last):
  File "/Users/greatora/anaconda3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "<ipython-input-4-b3177477fabe>", line 42, in run
    self.worker.orchestrator()
  File "<ipython-input-4-b3177477fabe>", line 23, in orchestrator
    self.compute_number_of_names(float(vertex_id))
  File "<ipython-input-4-b3177477fabe>", line 26, in compute_number_of_names
    print(self.g.V(vertex_id).inE().values('benef_nm').dedup().count().next())
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/process/traversal.py", line 88, in next
    return self.__next__()
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/process/traversal.py", line 47, in __next__
    self.traversal_strategies.apply_strategies(self)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/process/traversal.py", line 512, in apply_strategies
    traversal_strategy.apply(traversal)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/remote_connection.py", line 148, in apply
    remote_traversal = self.remote_connection.submit(traversal.bytecode)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/driver_remote_connection.py", line 53, in submit
    result_set = self._client.submit(bytecode)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/client.py", line 108, in submit
    return self.submitAsync(message, bindings=bindings).result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/connection.py", line 63, in cb
    f.result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/protocol.py", line 74, in write
    self._transport.write(message)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/tornado/transport.py", line 37, in write
    lambda: self._ws.write_message(message, binary=True))
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/tornado/ioloop.py", line 453, in run_sync
    self.start()
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/tornado/ioloop.py", line 863, in start
    event_pairs = self._impl.poll(poll_timeout)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/tornado/platform/kqueue.py", line 66, in poll
    kevents = self._kqueue.control(None, 1000, timeout)
OSError: [Errno 9] Bad file descriptor

Я использую Pythton3.7, gremlinpython == 3.4.6, MacOS.

1 Ответ

0 голосов
/ 04 мая 2020

Я все еще не совсем уверен, в чем проблема, но это работает.

import multiprocessing
from multiprocessing import Pool
import itertools

def graph_function(vertex_id_list):

    graph = Graph()
    g = graph.traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))

    res = []
    for vertex_id in vertex_id_list:
        res.append(g.V(str(vertex_id)).inE().values('benef_nm').dedup().toList())
    return res



num_cores = 4
vertex_lst = g.V().limit(30).id().toList()
vertex_lsts = np.array_split(vertex_lst, num_cores)
with Pool(processes=num_cores) as pool:
    results = pool.map(graph_function, vertex_lsts)
    results = [*itertools.chain.from_iterable(results)]
...