PyModbus asyncio copier - невозможно перезапустить копию после повторного подключения - PullRequest
0 голосов
/ 16 июня 2020

Я изменил PymodBus Asyn c Пример клиента Asyncio и создал программу (см. Ниже) для копирования катушек и регистров из одного PL C в другой. Для каждого узла есть основной поток и поток (PL C). Модифицированный код работает неплохо. Но я не могу перезапустить копию после того, как мой клиент отключился / повторно подключился к серверу. Что мне нужно сделать? Спасибо!

   def run_with_already_running_loop(self):

    UTILS.LOGGER.info("Running Async client with asyncio loop already started")
    UTILS.LOGGER.info("------------------------------------------------------")

    def done(future):
        UTILS.LOGGER.info("Done !!!")

    def start_loop(loop):
        """
        Start Loop
        :param loop:
        :return:
        """
        asyncio.set_event_loop(loop)
        loop.run_forever()

    while True:
        for hostdef in self.M_HostList:
            if hostdef.host == None or hostdef.host.client.protocol == None:
                try:
                    if hostdef.host == None:
                        host = COPYHOST()
                        host.loop = asyncio.new_event_loop()
                        host.t = Thread(target=start_loop, args=[host.loop])
                        host.t.daemon = True
                        # Start the loop
                        host.t.start()
                    assert host.loop.is_running()
                    asyncio.set_event_loop(host.loop)
                    host.loop, host.client = ModbusClient(schedulers.ASYNC_IO, host=hostdef.IP, port=hostdef.port, loop=host.loop)
                    hostdef.host = host
                    host.future = asyncio.run_coroutine_threadsafe(self.start_async_test(hostdef, hostdef.host.client.protocol, hostdef.job, hostdef.time), loop=host.loop)
                    host.future.add_done_callback(done)
                    UTILS.LOGGER.info("Made host on {}".format(hostdef.key))
                except:
                    UTILS.LOGGER.info("Failed to make host on {}".format(hostdef.key))
                    pass
            try:
                self.manage_jobs(hostdef)
                # UTILS.LOGGER.info("@@@ {}".format(hostdef.key))
            except:
                pass    
        time.sleep(0.05)



async def start_async_test(self, hostdef, client, job, job_start_time):

    last_milli_time = 0

    while True:

        if client == None:
            await asyncio.sleep(1)
            continue

        current_milli_time = UTILS.UTILS.time_ms() 

        if job == None:
            if (current_milli_time-last_milli_time) > 1000:
                #UTILS.LOGGER.info("!!! {}".format(hostdef.key))
                last_milli_time = current_milli_time
        else:
            #UTILS.LOGGER.info("!!! {} {}".format(hostdef.key, job.state))
            pass



        if job != None and job.state == 3 and hostdef.oqueue.qsize() == 0:
            assert job_start_time != 0 
            job.state = 4
            #UTILS.LOGGER.info("FINISHING JOB: {}".format(job.SD.key))
            fjob = deepcopy(job)
            hostdef.oqueue.put(fjob)
            job = None
            job_start_time = 0
        if job == None and hostdef.iqueue.qsize() != 0:
            job = hostdef.iqueue.get()
            job.state = 1
            job_start_time = current_milli_time
            #UTILS.LOGGER.info("START JOB: {}".format(job.SD.key))


        if job != None and job.dir == 'D' and job.state == 1:
            # in case of destination we write

            job.state = 2

            if job.SD.type == '%M':
                rq = await client.write_coils(job.SD.start, job.buffer, unit=UNIT)
                job.state = 3
                if rq.function_code < 0x80:
                    job.Fault = False
                else:
                    job.Fault = True
                    assert False
                pass
            elif job.SD.type == '%MW':
                rq = await client.write_registers(job.SD.start, job.buffer, unit=UNIT)
                job.state = 3
                if rq.function_code < 0x80:
                    job.Fault = False
                else:
                    job.Fault = True
                    assert False
                pass
            else:
                assert False

        elif job != None and job.dir == 'S' and job.state == 1:
            # in case of source we read

            job.state = 2

            if job.SD.type == '%M':
                rr = await client.read_coils(job.SD.start, job.SD.size, unit=UNIT)
                job.state = 3
                if rr.function_code < 0x80:
                    job.Fault = False
                    job.buffer = rr.bits 
                else:
                    job.Fault = True
                    job.buffer = None
                    assert False
                pass
            elif job.SD.type == '%MW':
                rr = await client.read_holding_registers(job.SD.start, job.SD.size, unit=UNIT)
                job.state = 3
                if rr.function_code < 0x80:
                    job.Fault = False
                    job.buffer = rr.registers
                else:
                    job.Fault = True
                    job.buffer = None
                    assert False
            else:
                assert False

            await asyncio.sleep(0.01)
...