У меня есть функция без сервера, которая пытается отправить некоторые данные в kafka. Иногда это работает, а иногда соединение просто обрывается, и данные теряются.
Причина этого в том, что библиотека для kafka не вызывает исключение, а вместо этого добавляет журналы ошибок. Поэтому я не могу добавить свой кусок кода в try:except
.
. Вот ошибка, которую я часто получаю в своих журналах:
<BrokerConnection node_id=11 host=... port=9092>: Error receiving network data closing socket
Traceback (most recent call last):
File "/var/task/kafka/conn.py", line 745, in _recv
data = self._sock.recv(SOCK_CHUNK_BYTES)
ConnectionResetError: [Errno 104] Connection reset by peer
и функция _recv
, упомянутая вышеимеет следующее определение:
Я все еще ищу решение, но добавляю код в try: кроме не работает.
def _recv(self):
responses = []
SOCK_CHUNK_BYTES = 4096
while True:
try:
data = self._sock.recv(SOCK_CHUNK_BYTES)
# We expect socket.recv to raise an exception if there is not
# enough data to read the full bytes_to_read
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
break
else:
responses.extend(self.receive_bytes(data))
if len(data) < SOCK_CHUNK_BYTES:
break
except SSLWantReadError:
break
except ConnectionError as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
break
log.exception('%s: Error receiving network data'
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
break
except BlockingIOError:
if six.PY3:
break
raise
return responses
Я ожидаю, когда соединение потеряно, исключениевызывается, и приложение прерывается, но вместо этого появляются только журналы ошибок, и приложение продолжает работу, даже если часть kafka выходит из строя.