Python3: NSQ отключается после успешной обработки сообщения - PullRequest
0 голосов
/ 25 января 2019

У меня есть код NSQ, использующий официальный клиент Python NSQ для обработки данных, обработка которых занимает некоторое время.

Для предотвращения отсоединений я использую message.touch() во всем коде, где он зацикливается, чтобы предотвратитьотключение.

Иногда текущий элемент в очереди удаляется успешно, но затем следующий элемент кажется отключенным и автоматически ставится в очередь, даже если он был обработан полностью без ошибок.

Я прикрепилжурнал ошибок, который появляется сразу после того, как текущий элемент заканчивается и следующий элемент просто начинается с неудачного message.touch().

Я использую lookupd_poll_interval= 15 с синхронной очередью.

Код скелета:

import nsq

def handler( message ):
    data = json.loads( message.body.decode( "utf-8" ) )[ "data" ]
    for item in data:
        message.touch()
        process_this_item( item )

    message.finish()
    return True

r = nsq.Reader( message_handler= handler,
                lookupd_http_addresses= [ 'http://127.0.0.1:4161' ],
                topic= 'test_channel_1',
                channel= 'test_channel_1',
                lookupd_poll_interval= 15 )
nsq.run()

Пожалуйста, дайте мне знать, как избежать этой проблемы.

Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
    self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
    ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
    self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
    self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
    self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
    self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
    ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
    self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
    self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
    self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
    self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
    ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
    self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
    self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
    self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
    self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
    ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
    self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
    self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
    self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.client [nsq-worker-1:4150:test_channel_1:test_channel_1] ERROR: ConnectionClosedError('Stream is closed',)
2019-01-25 12:54:45:WARNING:nsq.reader [nsq-worker-1:4150:test_channel_1:test_channel_1] connection closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
    self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
    ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
    self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
    self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
    self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
    self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
    ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
    self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
    self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
    self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
2019-01-25 12:54:45:ERROR:nsq.client [nsq-worker-0:4150:test_channel_1:test_channel_1] ERROR: SendError: failed to send RDY 1 (Stream is closed)
2019-01-25 12:54:45:INFO:root Succesfully connected to database [ system_config ].
2019-01-25 12:54:46:INFO:root Successfully retrieved config details for config_id [ 2 ].
2019-01-25 12:54:46:INFO:root Hash for config_id [ 2 ] successfully obtained: [ xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx ]
2019-01-25 12:54:46:ERROR:nsq.client [nsq-worker-0:4150:test_channel_1:test_channel_1] ERROR: SendError: failed to send TOUCH b'0ad5b8177dcee000' (Stream is closed)
...