Идея такова:
У меня kombu.mixins.ConsumerMixin
объект работает в потоке. Для разных тем мне нужны разные потребители, каждый из которых выполняет разные задачи по сельдерею. Итак, я хочу передать задачу сельдерея в поток, который будет служить обратным вызовом потребителя.
Класс TopicConsumer:
class TopicConsumer(ConsumerMixin):
def __init__(self, connection, exchange_name, binding_keys):
super().__init__()
self.connection = connection
self.exchange = Exchange(exchange_name, type='topic')
self.q_bindings = [binding(self.exchange, routing_key=k) for k in binding_keys]
self.q_name = f"{exchange_name}:{'|'.join(str(key) for key in binding_keys)}"
self.queue = Queue(name=self.q_name,
exchange=self.exchange,
bindings=self.q_bindings)
def get_consumers(self, Consumer, channel):
consumers = list()
consumer = Consumer(queues=self.queue,
accept=['json'],
callbacks=[self._handle_message])
consumers.append(consumer)
return consumers
def _handle_message(self, body, message):
try:
message.ack()
except MessageStateError:
# message has already been processed, rejected or requeued
pass
else:
self.process_message(message.payload)
def process_message(self, data):
'''Must be overwritten by subclass.'''
raise NotImplementedError('Subclass responsibility.')
MessageConsumerThread:
import threading
from types import MethodType
from .TopicConsumer import TopicConsumer
class MessageConsumerThread(threading.Thread):
def __init__(self, connection, exchange_name, binding_keys, msg_handler=None):
super().__init__()
self.connection = connection
self.exchange_name = exchange_name
self.binding_keys = binding_keys
if msg_handler:
self.register_message_handler(msg_handler)
def register_message_handler(self, msg_handler):
TopicConsumer.process_message = MethodType(msg_handler, TopicConsumer)
self._create_consumer()
def _create_consumer(self):
self.consumer = TopicConsumer(self.connection, self.exchange_name, self.binding_keys)
def run(self):
self.consumer.run()
def stop(self):
self.consumer.should_stop = True
В основном приложении есть celery_task
, вызываемый функцией.
def msg_handler(data):
celery_task.delay(data)
Я хочу зарегистрировать msg_handler
в качестве обратного вызова для обработки сообщений TopicConsumer.process_message()
, используя types.MethodType
(я также пытался setattr
. Тоже не работает.)
А пока я попробовал это так:
with Connection(**connection_parameters) as conn:
msg_consumer = MessageConsumerThread(conn,
exchange_name='messenger',
binding_keys=['key'])
msg_consumer.register_message_handler(msg_handler)
msg_consumer.start()
Если сделать так, как показано, я могу выполнить msg_handler
с
MessageConsumerThread
объект. Но он не выполняется изнутри объекта TopicConsumer
, где он мне нужен, потому что доступно тело сообщения, которое я хочу обработать. И теперь у меня есть идея о том, как это запустить.
У вас есть предложения? Заранее спасибо!