Я ищу способ публиковать сообщения на сервере rabbitmq из моего приложения django. Это не для разгрузки задач, поэтому я не хочу использовать сельдерей. Цель состоит в том, чтобы опубликовать на бирже с помощью приложения django и получить из этой очереди дочернее (не-django) приложение в контейнере докера.
Все это кажется очень простым, однако я не могу публиковать на бирже, не устанавливая и не закрывая соединение каждый раз, даже без явного призыва к этому.
В попытке решить эту проблему я определил класс с вложенным одноэлементным классом, который поддерживает соединение с сервером rabbitmq с помощью Pika. Идея заключалась в том, что вложенный синглтон будет создан только один раз, объявив соединение в то время. Каждый раз, когда что-то должно быть опубликовано в очереди, синглтон обрабатывает это.
import logging
import pika
import os
logger = logging.getLogger('django')
class PikaChannelSingleton:
class __Singleton:
channel = pika.adapters.blocking_connection.BlockingChannel
def __init__(self):
self.initialize_connection()
def initialize_connection(self):
logger.info('Attempting to establish RabbitMQ connection')
credentials = pika.PlainCredentials(rmq_username, rmq_password)
parameters = pika.ConnectionParameters(rmq_host, rmq_port, rmq_vhost, credentials, heartbeat=0)
connection = pika.BlockingConnection(parameters)
con_chan = connection.channel()
con_chan.exchange_declare(exchange='xchng', exchange_type='topic', durable=True)
self.channel = con_chan
def send(self, routing_key, message):
if self.channel.is_closed:
PikaChannelSingleton.instance.initialize_connection()
self.channel.basic_publish(exchange='xchng', routing_key=routing_key,
body=message)
instance = None
def __init__(self, *args, **kwargs):
if not PikaChannelSingleton.instance:
logger.info('Creating channel singleton')
PikaChannelSingleton.instance = PikaChannelSingleton.__Singleton()
@staticmethod
def send(routing_key, message):
PikaChannelSingleton.instance.send(routing_key, message)
rmq_connection = PikaChannelSingleton()
Затем я импортирую rmq_connection, где это необходимо, в приложении django. Все работает в игрушечных приложениях и в python repl, но новое соединение устанавливается каждый раз, когда вызывается функция send в приложении django. Затем соединение немедленно закрывается с сообщением «клиент неожиданно закрыл TCP-соединение». Сообщение действительно публикуется на бирже правильно.
Так что я уверен, что с django что-то происходит, и как оно обрабатывает процессы и тому подобное. Все еще остается вопрос: как я могу публиковать многочисленные сообщения в очереди, не восстанавливая соединение каждый раз?