RabbitMQ Pika и веб-сокет Django каналов - PullRequest
1 голос
/ 07 мая 2020

Я использую Django Каналы и RabbitMQ pika, впервые. Я пытаюсь использовать из очереди RabbitMQ. Я использую Django Channels AsyncConsumer для групповой отправки всем, подключенным к веб-сокету.

User type 1: Может создать задачу

User type 2: Может принять задачу.

Пример использования: когда user type 1 создает задачу, она публикуется в rabbitmq. Когда он потребляется из очереди, он должен быть отправлен в группу во внешний интерфейс. И когда user type 2 принимает задачу, другие экземпляры user type 2 не могут принять то же самое, и мы снова потребляем из очереди и отправляем следующую задачу в очереди всем.

Я создал соединение в другой поток, используя sync_to_async. Я добавляю его в список в памяти из функции обратного вызова. И всякий раз, когда кто-то соглашается, я просто выталкиваю его из списка и подтверждаю очередь.

class AcceptTaskConsumer(AsyncConsumer):
    body = [] #IN MEMORY LIST 
    delivery = {} #To store ack delivery_tag 


    async def websocket_connect(self, event):
        print("AcceptTaskConsumer connected", event)
        AcceptTaskConsumer.get_task() #STARTS Queue listener in new thread
        self.room_group_name = "user_type_2"
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.send({
            "type": "websocket.accept"
        })

    async def websocket_receive(self, event):
        if event["text"] == "Hi": #If connecting first time
            if AcceptTaskConsumer.body:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": AcceptTaskConsumer.body[0]["body"]
                    }
                )
            else:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": "No New Tasks"
                    }
                )

        else: #When someone accepts a task-> ack and send next task in queue
            print(json.loads(event["text"])["id"])
            AcceptTaskConsumer.channel.basic_ack(delivery_tag=AcceptTaskConsumer.delivery[json.loads(event["text"])["id"]])
            AcceptTaskConsumer.delivery.pop(json.loads(event["text"])["id"])
            AcceptTaskConsumer.body.pop(0)
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    "type": "message",
                    "text": "No New Tasks"
                }
            )

            if AcceptTaskConsumer.body:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": AcceptTaskConsumer.body[0]["body"]
                    }
                )

    async def message(self, event):
        await self.send({
            "type": "websocket.send",
            "text": event["text"]
        })

    @classmethod
    @sync_to_async
    def get_task(cls): #pika consumer
        cls.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        cls.channel = cls.connection.channel()

        cls.channel.queue_declare(queue='task_', arguments={"x-max-priority": 3})

        cls.channel.basic_consume(
            queue='task_', on_message_callback=AcceptTaskConsumer.callback, auto_ack=False)
        cls.channel.start_consuming()

    @classmethod
    def callback(cls, ch, method, properties, body):
        task_obj = {"body": json.dumps(body.decode("utf-8")),
                    "delivery_tag": method.delivery_tag}
        AcceptTaskConsumer.body.append(task_obj)
        AcceptTaskConsumer.delivery[json.loads(json.loads(task_obj["body"]))["id"]] = method.delivery_tag
        cls.channel.stop_consuming()

    async def websocket_disconnect(self, event):
        print(event)
        await self.send({
            "type": "websocket.close"
        })

        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

Я почти уверен, что это неправильный способ сделать это, потому что он работает не так, как ожидалось

Я часто сталкиваюсь с ошибками, например.

  • 39 из 169 каналов превышают пропускную способность при групповой доставке
  • pika.exceptions.StreamLostError: Потеряно соединение с потоком: BrokenPipeError (32, 'Broken pipe ')

Я также пробовал запустить прослушиватель очереди, например этот ответ . Ничего не работает. У кого-нибудь из опытных есть мысли по этому поводу? Есть ли лучший способ решить эту проблему?

1 Ответ

2 голосов
/ 08 мая 2020

вы должны переместить логи c cosumering rabitMQ из потребителя веб-сокета.

Просто имейте django команду , которая запускает Rabbit Consumer, этот потребитель может принимать сообщения от RabbitMQ, а затем используйте send_group, чтобы отправить их по группам в каналы.

если ваша команда django вам нужно будет вызвать send_group, см. https://channels.readthedocs.io/en/latest/topics/channel_layers.html#using -outside-of-consumer

from channels.layers import get_channel_layer

channel_layer = get_channel_layer()

async_to_sync(
    channel_layer.group_send
)(
    "user_type_2",
    {"type": "message", "msg": 123}
)

Затем в потребителе веб-сокета вы должны подписаться на группы, которые пользователь хочет / имеет разрешение на получение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...