Как многопоточность AsyncConsumer с каналами Django - PullRequest
0 голосов
/ 12 сентября 2018

Я работаю с Django Channels в течение недели, и кое-что вызывает у меня параллелизм runworker.

Например, у меня есть этот клиент MQTT, который публикуется в каналах при получении сообщения, базовый.

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

Это хорошо. Я могу отправить, сколько я хочу, он будет отправлен в очередь Redis. На канал mqtt.

Затем я запускаю работника, который перенаправляет сообщения в очередь на mqtt с помощью:

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']

Вот тут и начинается проблема. Вот содержимое AsyncConsumer, читающего данные:

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

Я усыпил сон, чтобы смоделировать бизнес задачи. И вот куда я иду: асинхронный потребитель не многопоточный! Когда я отправляю два сообщения на канал, потребителю требуется 10 секунд для обработки второго сообщения, а не 5, если оно было многопоточным. Как показано ниже.

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

Любая информация по этому вопросу будет очень полезна, заранее спасибо!

РЕДАКТИРОВАТЬ : Единственный способ управлять им, который я нашел, - это создать исполнителя, который будет содержать рабочих для выполнения асинхронной работы. Но я не уверен в его эффективности для развертывания

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)

1 Ответ

0 голосов
/ 05 июля 2019

Это в настоящее время дизайн

Да, это предполагаемый дизайн, так как это самый безопасный способ (он предотвращает условия гонки, если вы не знаете об этом). Если вы счастливы запускать сообщения параллельно, просто раскручивайте свои собственные сопрограммы всякий раз, когда они вам нужны (используя asyncio.create_task), убедитесь, что вы их очищаете и ждете их при завершении работы. Слишком много накладных расходов, поэтому мы надеемся, что в будущем мы отправим для этого потребителю режим согласия, но пока все, что мы поставляем, это безопасный вариант.

https://github.com/django/channels/issues/1203

...