Я использую AMQP RPC (более или менее так, как это описано в документации RabbitMQ) для аутентификации моих служб в центральной службе аутентификации
Проблема, с которой я сталкиваюсь, состоит в том, что каждый из этих запросов аутентификации создает свою собственную исключительную очередь. Поэтому, если я получаю несколько сотен запросов, у меня внезапно появляются несколько сотен очередей, загромождающих мой интерфейс управления RabbitMQ. Поэтому я решил, что, поскольку каждый работник WSGI обрабатывает только один запрос за раз, я мог бы генерировать имена очередей, используя PID и идентификатор потока, и, таким образом, иметь только одну очередь на каждого работника / поток. Код аутентификации для приложения Django / DRF, которое у меня есть, выглядит примерно так:
class AMQPAuthentication(authentication.BaseAuthentication):
channel = None
connection = None
def __init__(self):
if not AMQPAuthentication.connection or not AMQPAuthentication.connection.is_open:
if self.connection is not None:
logger.error('AMQP connection closed. Reconnecting.')
self._reconnect()
self.connection = AMQPAuthentication.connection
self.channel = self.connection.channel()
queue_name = "WSGI-PID-{}-thread-{}".format(os.getpid(), threading.get_ident())
try:
self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
self.callback = self.queue.method.queue
self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)
except pika.exceptions.ConnectionClosed:
self._reconnect()
self.connection = AMQPAuthentication.connection
self.channel = self.connection.channel()
self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
self.callback = self.queue.method.queue
self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)
def _reconnect(self):
creds = pika.credentials.PlainCredentials(username=settings.AMQP_USER, password=settings.AMQP_PASS)
AMQPAuthentication.connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.AMQP_HOST, credentials=creds))
AMQPAuthentication.channel = AMQPAuthentication.connection.channel()
logger.info('connected to AMQP broker')
def _on_response(self, ch, method, props, body):
if self.msg_id == props.correlation_id:
self.response = body
self.channel.close()
def authenticate(self, request):
if request.META.get('HTTP_AUTHORIZATION'):
header = request.META.get('HTTP_AUTHORIZATION')
self.response = None
self.msg_id = str(uuid.uuid4())
props = pika.BasicProperties(reply_to=self.callback, correlation_id=self.msg_id)
self.channel.basic_publish(exchange='', routing_key=settings.AUTH_QUEUE_NAME, properties=props, body=str(header))
def authentication_timeout():
raise exceptions.AuthenticationFailed("Authentication timed out.")
# add timeout for response from the auth service hard-code 15 second timeout for now
tid = self.connection.add_timeout(15, authentication_timeout)
while self.response is None:
self.connection.process_data_events()
# once we have a response, cancel the timeout
self.connection.remove_timeout(tid)
try:
data = json.loads(self.response)
if 'id' in data:
user = User.objects.get(pk=int(data['id']))
else:
raise exceptions.AuthenticationFailed(json.dumps({"error": "Authentication failed"}))
except Exception as e:
raise exceptions.AuthenticationFailed(e)
return (user, api_key)
Но по какой-то причине всякий раз, когда я делаю это, первые запросы к одному и тому же работнику аутентифицируются просто отлично, но каждый последующий запрос истекает. Я попытался сделать очереди не AD и не эксклюзивными, я попытался добавить подтверждение сообщения, версия кода выше закрывает канал каждый раз, когда аутентификация завершается, но все равно не работает. Что я делаю неправильно? Кажется, я должен иметь возможность снова и снова связываться с одной и той же очередью и получать от нее сообщения аутентификации, но по какой-то причине только первый запрос получает сообщение обратного вызова.