Джанго: Как установить постоянную связь с rabbitmq? - PullRequest
2 голосов
/ 14 марта 2019

Я ищу способ публиковать сообщения на сервере rabbitmq из моего приложения django. Это не для разгрузки задач, поэтому я не хочу использовать сельдерей. Цель состоит в том, чтобы опубликовать на бирже с помощью приложения django и получить из этой очереди дочернее (не-django) приложение в контейнере докера.

Все это кажется очень простым, однако я не могу публиковать на бирже, не устанавливая и не закрывая соединение каждый раз, даже без явного призыва к этому.

В попытке решить эту проблему я определил класс с вложенным одноэлементным классом, который поддерживает соединение с сервером rabbitmq с помощью Pika. Идея заключалась в том, что вложенный синглтон будет создан только один раз, объявив соединение в то время. Каждый раз, когда что-то должно быть опубликовано в очереди, синглтон обрабатывает это.

import logging
import pika
import os

logger = logging.getLogger('django')

class PikaChannelSingleton:
    class __Singleton:
        channel = pika.adapters.blocking_connection.BlockingChannel

        def __init__(self):
            self.initialize_connection()

        def initialize_connection(self):
            logger.info('Attempting to establish RabbitMQ connection')

            credentials = pika.PlainCredentials(rmq_username, rmq_password)
            parameters = pika.ConnectionParameters(rmq_host, rmq_port, rmq_vhost, credentials, heartbeat=0)
            connection = pika.BlockingConnection(parameters)

            con_chan = connection.channel()
            con_chan.exchange_declare(exchange='xchng', exchange_type='topic', durable=True)
            self.channel = con_chan

        def send(self, routing_key, message):
            if self.channel.is_closed:
                PikaChannelSingleton.instance.initialize_connection()

            self.channel.basic_publish(exchange='xchng', routing_key=routing_key,
                                                                body=message)
    instance = None

    def __init__(self, *args, **kwargs):
        if not PikaChannelSingleton.instance:
            logger.info('Creating channel singleton')
            PikaChannelSingleton.instance = PikaChannelSingleton.__Singleton()

    @staticmethod
    def send(routing_key, message):
        PikaChannelSingleton.instance.send(routing_key, message)

rmq_connection = PikaChannelSingleton()

Затем я импортирую rmq_connection, где это необходимо, в приложении django. Все работает в игрушечных приложениях и в python repl, но новое соединение устанавливается каждый раз, когда вызывается функция send в приложении django. Затем соединение немедленно закрывается с сообщением «клиент неожиданно закрыл TCP-соединение». Сообщение действительно публикуется на бирже правильно.

Так что я уверен, что с django что-то происходит, и как оно обрабатывает процессы и тому подобное. Все еще остается вопрос: как я могу публиковать многочисленные сообщения в очереди, не восстанавливая соединение каждый раз?

1 Ответ

0 голосов
/ 14 июня 2019

Если я правильно понимаю, соединения не могут поддерживаться такими же в однопоточном контексте.Поскольку ваше приложение Django продолжает выполняться, клиент amqp не отправляет на канал heartbeats , и соединение прерывается.

Вы можете использовать SelectConnection вместо BlockingConnection, вероятно, нелегко в контексте Django.

Хорошим компромиссом может быть просто сбор сообщений в вашем синглтоне, но только отправка их всех сразу с BlockingConnection в самом конце вашего Djangoзапрос.

...