Вызовите метод asyn c из syn c callback в Python - PullRequest
0 голосов
/ 31 марта 2020

Следующий скрипт python использует температуру чтения из тега Ruuvi. В синхронном обратном вызове Ruuvi мы хотим вызвать метод asyn c (send_message_to_output). Следующий код во второй раз вызывается исключение

RuntimeError: Событие l oop закрывается

Как я могу заставить handle_data работать несколько раз?

import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor

async def main():
    device_client = IoTHubModuleClient.create_from_edge_environment()
    await device_client.connect()

    def handle_data(found_data):
        asyncio.get_event_loop().run_until_complete(device_client.send_message_to_output("some data", "ruuvi"))

    while True:
        RuuviTagSensor.get_datas(handle_data)
        time.sleep(5)

    await device_client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

1 Ответ

1 голос
/ 01 апреля 2020

Согласно вашему исключению, l oop почему-то закрыто. Я думаю, что это из-за функции run_until_complete в handle_data, которая вызывает реакцию, которая закрывает l oop.

. Поэтому я бы предложил попробовать следующее:

import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor

async def main(main_loop):
    tasks = list()
    device_client = IoTHubModuleClient.create_from_edge_environment()
    await device_client.connect()

    def handle_data(found_data):
        nonlocal main_loop
        nonlocal tasks
        tasks.append(main_loop.create_task(device_client.send_message_to_output("some data", "ruuvi")))

    while True:
        RuuviTagSensor.get_datas(handle_data)
        # We need to wait async in order to let the tasks run
        await asyncio.sleep(5)

    # This is just an insurance that all the tasks (messages to output) completed
    await asyncio.wait(tasks, timeout=5)
    await device_client.disconnect()

if __name__ == "__main__":
    # Creating and closing the loop here
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop)
    loop.close()

Альтернативным ( более сложным ) решением может быть использование функции, которая читает из очереди и вызывает функцию send_message_to_output:

import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor

async def main(main_loop):
    q = asyncio.Queue()
    stopping = asyncio.Event()

    device_client = IoTHubModuleClient.create_from_edge_environment()
    await device_client.connect()

    async def send_msg():
        nonlocal q
        nonlocal stopping
        nonlocal device_client
        while not stopping.is_set():
            msg, sender = await q.get()
            if msg is None and sender is None:
                break
            await device_client.send_message_to_output(msg, sender)

    def handle_data(found_data):
        nonlocal q
        nonlocal stopping
        if stopping.is_set():
            return
        q.put_nowait(("some data", "ruuvi"))

    while True:
        RuuviTagSensor.get_datas(handle_data)
        await asyncio.sleep(5)

    send_msg_task = main_loop.create_task(send_msg())

    await q.put((None, None))
    await stopping.set()
    await send_msg_task
    await device_client.disconnect()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop)
    loop.close()

Идея заключалась в том, чтобы отделить handle_data от send_msg. Таким образом, мне удается сделать send_msg асинхронной c функцией, которую теперь не нужно создавать loop или Task

...