Как справиться с проблемами соединения с kafka, используя библиотеку python kafka? - PullRequest
0 голосов
/ 25 октября 2019

У меня есть функция без сервера, которая пытается отправить некоторые данные в kafka. Иногда это работает, а иногда соединение просто обрывается, и данные теряются.

Причина этого в том, что библиотека для kafka не вызывает исключение, а вместо этого добавляет журналы ошибок. Поэтому я не могу добавить свой кусок кода в try:except.

. Вот ошибка, которую я часто получаю в своих журналах:

<BrokerConnection node_id=11 host=... port=9092>: Error receiving network data closing socket
Traceback (most recent call last):
File "/var/task/kafka/conn.py", line 745, in _recv
data = self._sock.recv(SOCK_CHUNK_BYTES)
ConnectionResetError: [Errno 104] Connection reset by peer

и функция _recv, упомянутая вышеимеет следующее определение:

Я все еще ищу решение, но добавляю код в try: кроме не работает.

def _recv(self):
        responses = []
        SOCK_CHUNK_BYTES = 4096
        while True:
            try:
                data = self._sock.recv(SOCK_CHUNK_BYTES)
                # We expect socket.recv to raise an exception if there is not
                # enough data to read the full bytes_to_read
                # but if the socket is disconnected, we will get empty data
                # without an exception raised
                if not data:
                    log.error('%s: socket disconnected', self)
                    self.close(error=Errors.ConnectionError('socket disconnected'))
                    break
                else:
                    responses.extend(self.receive_bytes(data))
                    if len(data) < SOCK_CHUNK_BYTES:
                        break
            except SSLWantReadError:
                break
            except ConnectionError as e:
                if six.PY2 and e.errno == errno.EWOULDBLOCK:
                    break
                log.exception('%s: Error receiving network data'
                              ' closing socket', self)
                self.close(error=Errors.ConnectionError(e))
                break
            except BlockingIOError:
                if six.PY3:
                    break
                raise
        return responses

Я ожидаю, когда соединение потеряно, исключениевызывается, и приложение прерывается, но вместо этого появляются только журналы ошибок, и приложение продолжает работу, даже если часть kafka выходит из строя.

...