Я настроил получателя RabbitMQ следующим образом:
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.INFO,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueueConsumer(object):
"""The consumer class to manage connections to the AMQP server/queue"""
def __init__(self, queue, logger, parameters, thread_id=0):
self.channel = None
self.connection = None
self.queue_name = queue
self.logger = logger
self.consumer_id = 'Thread: %d' % (thread_id,)
self.parameters = pika.ConnectionParameters(**parameters)
def _on_queue_declared(self, frame):
self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
self.logger.info("{} Declared queue...".format(self.consumer_id))
except Exception as e:
self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))
def _on_channel_open(self, channel):
self.channel = channel
try:
self.channel.queue_declare(queue=self.queue_name,
exclusive=False,
durable=True,
auto_delete=False,
callback=self._on_queue_declared)
self.logger.info("{} Opened Channel....".format(self.consumer_id))
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
def _on_connected(self, connection):
connection.channel(self._on_channel_open)
def consume(self):
try:
self.connection = SelectConnection(self.parameters,
self._on_connected)
self.connection.ioloop.start()
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
self.connection.close()
self.connection.ioloop.start()
def decode(self, body):
try:
_body = body.decode('utf-8')
except AttributeError:
_body = body
return _body
def handle_delivery(self, channel, method, header, body):
try:
start_time = datetime.datetime.now()
_logger.info("Received...")
_logger.info("Content: %s" % body)
req = json.loads(self.decode(body))
# Do something
sleep(randint(10, 100))
time_taken = datetime.datetime.now() - start_time
_logger.info("[{}] Time Taken: {}.{}".format(
req.get("to_num"), time_taken.seconds, time_taken.microseconds))
except Exception as err:
_logger.exception(err)
if __name__ == "__main__":
workers = 3
pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
try:
pool = ThreadPoolExecutor(max_workers=workers)
start = 1
for thread_id in range(start, (workers + start)):
pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)
except Exception as err:
_logger.exception(err)
У меня также есть издатель очереди, как показано ниже:
import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.DEBUG,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueuePublisherClient(object):
def __init__(self, queue, request):
self.queue = queue
self.response = None
self.channel = None
self.request = request
self.corrId = str(uuid.uuid4())
self.callBackQueue = None
self.connection = None
parameters = pika.ConnectionParameters(host="0.0.0.0")
self.connection = SelectConnection(
parameters, self.on_response_connected
)
self.connection.ioloop.start()
def on_response(self, ch, method, props, body):
if self.corrId == props.correlation_id:
self.response = body
self.connection.close()
self.connection.ioloop.start()
def on_response_connected(self, connection):
_logger.info("Connected...\t(%s)" % self.queue)
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_connected(self, connection):
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
# _logger.info("Channel Opened...\t(%s)" % self.queue)
self.channel = channel
self.channel.queue_declare(queue=self.queue,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
self.channel.basic_publish(exchange="",
routing_key=self.queue,
properties=pika.BasicProperties(),
body=str(self.request))
self.connection.close()
_logger.info("Message Published...\t(%s)" % self.queue)
if __name__ == "__main__":
data = {
'text': 'This is a sample text',
'to_num': '+2547xxxxxxxx'
}
count = 10000
for index in range(count):
data['index'] = index
QueuePublisherClient("test_queue", json.dumps(data))
Когда я публикую 10000 сообщений в очередии потребитель не запускается, через rabbitmqctl list_queues
Я вижу, что test_queue имеет 10000 сообщений.Когда я запускаю потребителя, я запускаю rabbitmqctl list_queues
и вижу, что в очереди 0 сообщений.Однако потребитель все еще потребляет сообщения из очереди.Проблема в том, что когда я останавливаю потребителя через несколько секунд, а затем перезапускаю его, я не могу восстановить свои сообщения.Как я могу избежать этого?
Это всего лишь симуляция реальной ситуации, когда потребительский процесс перезапускается с помощью monit, и я страдаю от потери сообщений.