Gremlin Python - ошибка «Сервер отключен - попробуйте подключиться заново» - PullRequest
1 голос
/ 03 августа 2020

У меня есть веб-приложение Flask, в котором я хочу поддерживать постоянное соединение с базой данных AWS Neptune. Это соединение устанавливается следующим образом:

from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection

neptune_endpt = 'db-instance-x.xxxxxxxxxx.xx-xxxxx-x.neptune.amazonaws.com'
remoteConn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')
self.g = traversal().withRemote(remoteConn)

Проблема, с которой я столкнулся, заключается в том, что соединение автоматически разрывается, если оно не используется, и я не могу найти способ определить, разорвалось ли соединение (так что Я могу повторно подключиться, используя приведенный выше фрагмент кода).

Я видел аналогичную проблему: Сервер Gremlin с удаленным подключением закрыт - как подключиться автоматически? однако и у этого вопроса нет решения. На этот аналогичный вопрос тоже нет ответа.

Я пробовал следующие два решения (оба не работали):

  1. Я настраиваю свое веб-приложение за четырьмя воркерами Gunicorn с таймаутом 100 секунд, надеясь, что перезапуск воркера позаботится о тайм-аутах Gremlin.
  2. Я попытался перехватить исключения, чтобы определить, разорвалось ли соединение. Каждый раз, когда я использую self.g для обхода моего графика, я пытаюсь «обновить sh» соединение, под этим я подразумеваю следующее:
def _refresh_neptune(self):
    try:
        self.g = traversal().withRemote(self.conn)
    except:
        self.conn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')
        self.g = traversal().withRemote(self.conn)

Здесь self.conn было инициализируется как:

self.conn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')

Есть ли способ обойти эту ошибку подключения?

Спасибо

Обновление : Добавлено сообщение об ошибке ниже:

  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 58, in toList
    return list(iter(self))
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 48, in __next__
    self.traversal_strategies.apply_strategies(self)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 573, in apply_strategies
    traversal_strategy.apply(traversal)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/remote_connec
tion.py", line 149, in apply
    remote_traversal = self.remote_connection.submit(traversal.bytecode)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/driver_remote
_connection.py", line 56, in submit
    results = result_set.all().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/resultset.py"
, line 90, in cb
    f.result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/usr/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/connection.py
", line 83, in _receive
    status_code = self._protocol.data_received(data, self._results)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/protocol.py",
 line 81, in data_received
    'message': 'Server disconnected - please try to reconnect', 'attributes': {}})
gremlin_python.driver.protocol.GremlinServerError: 500: Server disconnected - please try to reconnect

1 Ответ

0 голосов
/ 25 августа 2020

Я не уверен, что это лучший способ решить эту проблему, но я также использую gremlin- ​​python и Neptune, и у меня была такая же проблема. Я работал над этим, реализуя транспорт, который вы можете предоставить для DriverRemoteConnection.

DriverRemoteConnection(
    url=endpoint,
    traversal_source=self._traversal_source,
    transport_factory=Transport
)

gremlin- ​​python возвращает соединения в пул при исключении, а исключение, возвращаемое при закрытии соединения, - это GremlinServerError, которое также является поднимается для других ошибок.

gremlin_python / driver / connection.py # L69 - gremlin_python / driver / protocol.py # L80

Пользовательский транспорт такой же, как и TornadoTransport из gremlin- ​​python, но методы чтения и записи расширены до:

  • Повторно открывать закрытые соединения, если клиент веб-сокета закрыт
  • Поднять StreamClosedError, если клиент веб-сокета возвращает None из read_message

Мертвые соединения, которые добавляются обратно в пул, могут быть повторно открыты, а затем вы можете обработать StreamClosedError, чтобы применить некоторые повторные попытки logi c. Я сделал это, переопределив методы submit и submitAsyn c в DriverRemoteConnection, но вы могли поймать и повторить попытку где угодно.

class Transport(AbstractBaseTransport):
    def __init__(self):
        self._ws = None
        self._loop = ioloop.IOLoop(make_current=False)
        self._url = None

        # Because the transport will try to reopen the underlying ws connection
        # track if the closed() method has been called to prevent the transport
        # from reopening.
        self._explicit_closed = True

    @property
    def closed(self):
        return not self._ws.protocol

    def connect(self, url, headers=None):
        self._forced_closed = False

        # Set the endpoint URL
        self._url = httpclient.HTTPRequest(url, headers=headers) if headers else url

        # Open the connection
        self._connect()

    def write(self, message):
        # Before writing, try to ensure that the connection is open.
        if self.closed:
            self._connect()

        self._loop.run_sync(lambda: self._ws.write_message(message, binary=True))

    def read(self):
        result = self._loop.run_sync(self._ws.read_message)

        # If the read call returns None, the stream has closed.
        if result is None:
            self._ws.close()  # Ensure we close the stream
            raise StreamClosedError()

        return result

    def close(self):
        self._ws.close()
        self._loop.close()
        self._explicit_closed = True

    def _connect(self):
        # If close() was called explicitly on the transport, don't allow
        # subsequent calls to write() to reopen the connection.
        if self._explicit_closed:
            raise TransportClosedError(
                "Transport has been closed and can not be reopened."
            )

        # Check if the ws is closed, if it is not, close it.
        if self._ws and not self.closed:
            self._ws.close()

        # Open the ws connection
        self._ws = self._loop.run_sync(
            lambda: websocket.websocket_connect(url=self._url)
        )


class TransportClosedError(Exception):
    pass

Это также будет работать с пулом соединений gremlin-pythons.

Если вам не нужен пул, альтернативным подходом является установка размера пула на 1 и реализация некоторой формы поддержания активности, как это обсуждается здесь. TINKERPOP-2352

Похоже, ping / keep-alive веб-сокета в gremlin- ​​python еще не реализован TINKERPOP-1886 .

...