Я работаю с Django Channels в течение недели, и кое-что вызывает у меня параллелизм runworker
.
Например, у меня есть этот клиент MQTT, который публикуется в каналах при получении сообщения, базовый.
async def treat_message(msg):
channel_layer = get_channel_layer()
payload = json.loads(msg.payload, encoding="utf-8")
await channel_layer.send("mqtt", {
"type": "value.change",
"message": payload
})
Это хорошо. Я могу отправить, сколько я хочу, он будет отправлен в очередь Redis. На канал mqtt
.
Затем я запускаю работника, который перенаправляет сообщения в очередь на mqtt
с помощью:
python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']
Вот тут и начинается проблема. Вот содержимое AsyncConsumer, читающего данные:
class MQTTConsumer(AsyncConsumer):
async def value_change(self, event):
await asyncio.sleep(5)
print("I received changes : {}".format(event["message"]))
Я усыпил сон, чтобы смоделировать бизнес задачи. И вот куда я иду: асинхронный потребитель не многопоточный! Когда я отправляю два сообщения на канал, потребителю требуется 10 секунд для обработки второго сообщения, а не 5, если оно было многопоточным. Как показано ниже.
2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}
Любая информация по этому вопросу будет очень полезна, заранее спасибо!
РЕДАКТИРОВАТЬ : Единственный способ управлять им, который я нашел, - это создать исполнителя, который будет содержать рабочих для выполнения асинхронной работы. Но я не уверен в его эффективности для развертывания
def handle_mqtt(event):
time.sleep(3)
logger.info("I received changes : {}".format(event["message"]))
class MQTTConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
async def value_change(self, event):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(self.executor, handle_mqtt, event)