RabbitMQ и RPC в питоне - PullRequest
       31

RabbitMQ и RPC в питоне

0 голосов
/ 30 октября 2018

Я использую AMQP RPC (более или менее так, как это описано в документации RabbitMQ) для аутентификации моих служб в центральной службе аутентификации

Проблема, с которой я сталкиваюсь, состоит в том, что каждый из этих запросов аутентификации создает свою собственную исключительную очередь. Поэтому, если я получаю несколько сотен запросов, у меня внезапно появляются несколько сотен очередей, загромождающих мой интерфейс управления RabbitMQ. Поэтому я решил, что, поскольку каждый работник WSGI обрабатывает только один запрос за раз, я мог бы генерировать имена очередей, используя PID и идентификатор потока, и, таким образом, иметь только одну очередь на каждого работника / поток. Код аутентификации для приложения Django / DRF, которое у меня есть, выглядит примерно так:

class AMQPAuthentication(authentication.BaseAuthentication):
    channel = None
    connection = None

    def __init__(self):
        if not AMQPAuthentication.connection or not AMQPAuthentication.connection.is_open:
            if self.connection is not None:
                logger.error('AMQP connection closed. Reconnecting.')
            self._reconnect()
        self.connection = AMQPAuthentication.connection
        self.channel = self.connection.channel()
        queue_name = "WSGI-PID-{}-thread-{}".format(os.getpid(), threading.get_ident())
        try:
            self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
            self.callback = self.queue.method.queue
            self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)
        except pika.exceptions.ConnectionClosed:
            self._reconnect()
            self.connection = AMQPAuthentication.connection
            self.channel = self.connection.channel()
            self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
            self.callback = self.queue.method.queue
            self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)

    def _reconnect(self):
        creds = pika.credentials.PlainCredentials(username=settings.AMQP_USER, password=settings.AMQP_PASS)
        AMQPAuthentication.connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.AMQP_HOST, credentials=creds))
        AMQPAuthentication.channel = AMQPAuthentication.connection.channel()
        logger.info('connected to AMQP broker')

    def _on_response(self, ch, method, props, body):
        if self.msg_id == props.correlation_id:
            self.response = body
            self.channel.close()

    def authenticate(self, request):
        if request.META.get('HTTP_AUTHORIZATION'):
            header = request.META.get('HTTP_AUTHORIZATION')
            self.response = None
            self.msg_id = str(uuid.uuid4())
            props = pika.BasicProperties(reply_to=self.callback, correlation_id=self.msg_id)
            self.channel.basic_publish(exchange='', routing_key=settings.AUTH_QUEUE_NAME, properties=props, body=str(header))

            def authentication_timeout():
                raise exceptions.AuthenticationFailed("Authentication timed out.")
            # add timeout for response from the auth service hard-code 15 second timeout for now
            tid = self.connection.add_timeout(15, authentication_timeout)
            while self.response is None:
                self.connection.process_data_events()
            # once we have a response, cancel the timeout
            self.connection.remove_timeout(tid)
            try:
                data = json.loads(self.response)
                if 'id' in data:
                    user = User.objects.get(pk=int(data['id']))
                else:
                    raise exceptions.AuthenticationFailed(json.dumps({"error": "Authentication failed"}))
            except Exception as e:
                raise exceptions.AuthenticationFailed(e)
            return (user, api_key)

Но по какой-то причине всякий раз, когда я делаю это, первые запросы к одному и тому же работнику аутентифицируются просто отлично, но каждый последующий запрос истекает. Я попытался сделать очереди не AD и не эксклюзивными, я попытался добавить подтверждение сообщения, версия кода выше закрывает канал каждый раз, когда аутентификация завершается, но все равно не работает. Что я делаю неправильно? Кажется, я должен иметь возможность снова и снова связываться с одной и той же очередью и получать от нее сообщения аутентификации, но по какой-то причине только первый запрос получает сообщение обратного вызова.

...