Как выполнить задачу сельдерея как обратный вызов RabbitMQ Consumer, работающий внутри потока? - PullRequest
0 голосов
/ 05 июня 2019

Идея такова: У меня kombu.mixins.ConsumerMixin объект работает в потоке. Для разных тем мне нужны разные потребители, каждый из которых выполняет разные задачи по сельдерею. Итак, я хочу передать задачу сельдерея в поток, который будет служить обратным вызовом потребителя.

Класс TopicConsumer:

class TopicConsumer(ConsumerMixin):
    def __init__(self, connection, exchange_name, binding_keys):
        super().__init__()
        self.connection = connection
        self.exchange = Exchange(exchange_name, type='topic')

        self.q_bindings = [binding(self.exchange, routing_key=k) for k in binding_keys]
        self.q_name = f"{exchange_name}:{'|'.join(str(key) for key in binding_keys)}"
        self.queue = Queue(name=self.q_name,
                           exchange=self.exchange,
                           bindings=self.q_bindings)

    def get_consumers(self, Consumer, channel):
        consumers = list()
        consumer = Consumer(queues=self.queue,
                            accept=['json'],
                            callbacks=[self._handle_message])
        consumers.append(consumer)
        return consumers

    def _handle_message(self, body, message):
        try: 
            message.ack()
        except MessageStateError:
            # message has already been processed, rejected or requeued
            pass
        else:
            self.process_message(message.payload)

    def process_message(self, data):
        '''Must be overwritten by subclass.'''
        raise NotImplementedError('Subclass responsibility.')


MessageConsumerThread:

import threading
from types import MethodType

from .TopicConsumer import TopicConsumer


class MessageConsumerThread(threading.Thread):
    def __init__(self, connection, exchange_name, binding_keys, msg_handler=None):
        super().__init__()
        self.connection = connection
        self.exchange_name = exchange_name
        self.binding_keys = binding_keys

        if msg_handler:
            self.register_message_handler(msg_handler)

    def register_message_handler(self, msg_handler):
        TopicConsumer.process_message = MethodType(msg_handler, TopicConsumer)
        self._create_consumer()

    def _create_consumer(self):
        self.consumer = TopicConsumer(self.connection, self.exchange_name, self.binding_keys)

    def run(self):
        self.consumer.run()

    def stop(self):
        self.consumer.should_stop = True

В основном приложении есть celery_task, вызываемый функцией.

def msg_handler(data):
    celery_task.delay(data)

Я хочу зарегистрировать msg_handler в качестве обратного вызова для обработки сообщений TopicConsumer.process_message(), используя types.MethodType (я также пытался setattr. Тоже не работает.)

А пока я попробовал это так:

with Connection(**connection_parameters) as conn:
    msg_consumer = MessageConsumerThread(conn,
                                         exchange_name='messenger',
                                         binding_keys=['key'])
    msg_consumer.register_message_handler(msg_handler)
    msg_consumer.start()

Если сделать так, как показано, я могу выполнить msg_handler с MessageConsumerThread объект. Но он не выполняется изнутри объекта TopicConsumer, где он мне нужен, потому что доступно тело сообщения, которое я хочу обработать. И теперь у меня есть идея о том, как это запустить.

У вас есть предложения? Заранее спасибо!

...