У меня есть код, который создает потенциально бесконечный стек вызовов (упрощенно):
def listen(self, pipeline):
try:
for message in self.channel.consume(self.queue_name):
pipeline.process(message)
self.channel.basic_ack(delivery_tag=method_frame.delivery_tag)
except (pika.exceptions.StreamLostError,
pika.exceptions.ConnectionClosed,
pika.exceptions.ChannelClosed,
ConnectionResetError) as e:
logging.warning(f'Connection dropped for queue {self.queue_name}. Exception: {e}. Reconnecting...')
self._reconnect()
self.listen(pipeline)
Если возникнут какие-либо проблемы с сетью, он зарегистрирует ошибку, переподключится и будет двигаться дальше. Но это также добавит один дополнительный вызов в стек вызовов. Таким образом, моя трассировка стека при ошибке будет выглядеть так:
...
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/msworker/queue.py", line 81, in listen
self.channel.basic_ack(delivery_tag=method_frame.delivery_tag)
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 2113, in basic_ack
self._flush_output()
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/msworker/queue.py", line 81, in listen
self.channel.basic_ack(delivery_tag=method_frame.delivery_tag)
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 2113, in basic_ack
self._flush_output()
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable
self._produce()
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/utils/io_services_utils.py", line 820, in _produce
self._tx_buffers[0])
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
return sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
Как я могу перезапустить listen
функцию с нуля, без старых вызовов в стеке вызовов?
UPDATE
Чтобы избежать этой проблемы, правильно использовать вложенную функцию и перезапускать ее, но не саму себя:
def listen(self, pipeline):
try:
self._listen(self, pipeline)
except (pika.exceptions.StreamLostError,
pika.exceptions.ConnectionClosed,
pika.exceptions.ChannelClosed,
ConnectionResetError) as e:
logging.warning(f'Connection dropped for queue {self.queue_name}. Exception: {e}. Reconnecting...')
self._reconnect()
self._listen(self, pipeline)
def _listen(self, pipeline):
for message in self.channel.consume(self.queue_name):
pipeline.process(message)
Но все же есть ли способ перезапустить рекурсивную функцию с чистой стек вызовов?