У меня есть код NSQ, использующий официальный клиент Python NSQ для обработки данных, обработка которых занимает некоторое время.
Для предотвращения отсоединений я использую message.touch()
во всем коде, где он зацикливается, чтобы предотвратитьотключение.
Иногда текущий элемент в очереди удаляется успешно, но затем следующий элемент кажется отключенным и автоматически ставится в очередь, даже если он был обработан полностью без ошибок.
Я прикрепилжурнал ошибок, который появляется сразу после того, как текущий элемент заканчивается и следующий элемент просто начинается с неудачного message.touch()
.
Я использую lookupd_poll_interval= 15
с синхронной очередью.
Код скелета:
import nsq
def handler( message ):
data = json.loads( message.body.decode( "utf-8" ) )[ "data" ]
for item in data:
message.touch()
process_this_item( item )
message.finish()
return True
r = nsq.Reader( message_handler= handler,
lookupd_http_addresses= [ 'http://127.0.0.1:4161' ],
topic= 'test_channel_1',
channel= 'test_channel_1',
lookupd_poll_interval= 15 )
nsq.run()
Пожалуйста, дайте мне знать, как избежать этой проблемы.
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.client [nsq-worker-1:4150:test_channel_1:test_channel_1] ERROR: ConnectionClosedError('Stream is closed',)
2019-01-25 12:54:45:WARNING:nsq.reader [nsq-worker-1:4150:test_channel_1:test_channel_1] connection closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.client [nsq-worker-0:4150:test_channel_1:test_channel_1] ERROR: SendError: failed to send RDY 1 (Stream is closed)
2019-01-25 12:54:45:INFO:root Succesfully connected to database [ system_config ].
2019-01-25 12:54:46:INFO:root Successfully retrieved config details for config_id [ 2 ].
2019-01-25 12:54:46:INFO:root Hash for config_id [ 2 ] successfully obtained: [ xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx ]
2019-01-25 12:54:46:ERROR:nsq.client [nsq-worker-0:4150:test_channel_1:test_channel_1] ERROR: SendError: failed to send TOUCH b'0ad5b8177dcee000' (Stream is closed)