Как создать несколько заданий asyncio и получить возвращаемые значения - PullRequest
0 голосов
/ 05 марта 2020

Я начинаю с asyncio, и я изо всех сил жду, когда множественные возвращаемые значения будут использоваться различными функциями.

Я хочу сделать что-то вроде потока, но использую asyncio, так как я использую asyn c библиотеки из Azure.

Мне нужно запустить в своем классе Thread некоторую функцию, и я хочу запустить их одновременно, и ждать их результатов и посмотреть, хорошие ли результаты (я в django тесте) ,

Вот мой текущий код:

class DeviceThread(threading.Thread):
    def __init__(self, test_case, device_id, module_id, cert_prim, cert_sec):
        threading.Thread.__init__(self)
        self.test_case = test_case
        self.device_id = device_id
        self.module_id = module_id
        self.cert_prim = cert_prim
        self.cert_sec = cert_sec


    async def get_twin(self, device):
        try:
            # 0 - Get the twin for the device
            twin = await device.get_twin()
            info(twin)
            return twin['desired']['test'] == "test1device"
        except Exception as e:
            info(format_exc())
            return False


    async def receive_message_cloud(self, device):
        try:
            # 1- Waiting for the first message from the server (Blocking call)
            message = await device.receive_message_cloud()
            return message.data.decode() == "testmessage1"
        except Exception as e:
            info(format_exc())
            return False


    async def send_message_cloud(self, device):
        try:
            # 2 -Sending a message to the backend
            await device.send_message_cloud("testmessage1")
            return True
        except Exception as e:
            info(format_exc())
            return False


    async def receive_direct_call_and_respond(self, device):
        try:
            # 3 - Receiving a direct call method from the backend
            method_request = await device.receive_method_request()
            # 4 - Sending result of direct call
            payload = {"test": "testok"}
            status = 200
            await device.send_method_response(method_request, status, payload)
            return (method_request.name == "testmethod1") and (method_request.payload == "testpayload1")
        except Exception as e:
            info(format_exc())
            return False


    async def update_twin_properties(self, device):
        try:
            # 5 - Updating twin properties
            new_properties = {'test', 'test2device'}
            device.patch_twin_reported_properties(new_properties)
            return True
        except Exception as e:
            info(format_exc())
            return False


    async def perform(self, device):
        # Creating the tasks that will execute in parrallel
        twin_get_res = asyncio.create_task(self.get_twin(device))
        rec_mess_cloud = asyncio.create_task(self.receive_message_cloud(device))
        send_mess_cloud = asyncio.create_task(self.send_message_cloud(device))
        rec_dir_call = asyncio.create_task(self.receive_direct_call_and_respond(device))
        up_twin_prop = asyncio.create_task(self.update_twin_properties(device))
        # Verify the execution of the routine when done
        self.test_case.assertTrue(await twin_get_res)
        self.test_case.assertTrue(await rec_mess_cloud)
        self.test_case.assertTrue(await send_mess_cloud)
        self.test_case.assertTrue(await rec_dir_call)
        self.test_case.assertTrue(await up_twin_prop)


    def run(self):
        try:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            # Getting writing the cert content to disk
            open("cert.pem", "w").write(self.cert_prim)
            # Creating a device
            device = IOTHubDevice()
            device.authenticate_device(self.device_id, "cert.pem")
            # Removing cert previously created
            remove("cert.pem")
            asyncio.run(self.perform(device))
        except Exception as e:
            info(format_exc())
            self.test_case.assertFalse(True)

Как вы уже видели, я в потоке и хочу проверить некоторую функцию Azure IOTHub.

Но я получаю эту ошибку:

RuntimeError: Задача привязана к будущему с другим l oop

Итак, мой вопрос: как я могу запустить все эти задачи в л oop и получите свои индивидуальные результаты?

Спасибо за ответ!

...