Я изменил PyModbus Asyn c Пример клиента Asyncio и создал программу (см. Ниже) для копирования катушек и регистров с одного PL C на другой. Для каждого узла существует основной поток и поток (PL C). Модифицированный код работает неплохо. Но оказалось, что код работает значительно медленнее, если я настроил две пары узлов источника / назначения вместо одной пары и скопировал такое же количество пакетов MODBUS.
Копирование 140 кадров из N1 в N2 намного быстрее (с использованием трех потоков: основного, N1, N2), чем копирование 70 кадров из N1 в N2 и еще 70 кадров из N3 в N4 (с использованием пяти основных потоков, N1 , N2, N3, N4)
Я ожидал, что конфигурация с двумя парами работает быстрее. Что мне следует изменить или почему мои ожидания неверны? Спасибо!
def run_with_already_running_loop(self):
UTILS.LOGGER.info("Running Async client with asyncio loop already started")
UTILS.LOGGER.info("------------------------------------------------------")
def done(future):
UTILS.LOGGER.info("Done !!!")
def start_loop(loop):
"""
Start Loop
:param loop:
:return:
"""
asyncio.set_event_loop(loop)
loop.run_forever()
while True:
for hostdef in self.M_HostList:
if hostdef.host == None or hostdef.host.client.protocol == None:
try:
if hostdef.host == None:
host = COPYHOST()
host.loop = asyncio.new_event_loop()
host.t = Thread(target=start_loop, args=[host.loop])
host.t.daemon = True
# Start the loop
host.t.start()
assert host.loop.is_running()
asyncio.set_event_loop(host.loop)
host.loop, host.client = ModbusClient(schedulers.ASYNC_IO, host=hostdef.IP, port=hostdef.port, loop=host.loop)
hostdef.host = host
host.future = asyncio.run_coroutine_threadsafe(self.start_async_test(hostdef, hostdef.host.client.protocol, hostdef.job, hostdef.time), loop=host.loop)
host.future.add_done_callback(done)
UTILS.LOGGER.info("Made host on {}".format(hostdef.key))
except:
UTILS.LOGGER.info("Failed to make host on {}".format(hostdef.key))
pass
try:
self.manage_jobs(hostdef)
# UTILS.LOGGER.info("@@@ {}".format(hostdef.key))
except:
pass
time.sleep(0.05)
async def start_async_test(self, hostdef, client, job, job_start_time):
last_milli_time = 0
while True:
if client == None:
await asyncio.sleep(1)
continue
current_milli_time = UTILS.UTILS.time_ms()
if job == None:
if (current_milli_time-last_milli_time) > 1000:
#UTILS.LOGGER.info("!!! {}".format(hostdef.key))
last_milli_time = current_milli_time
else:
#UTILS.LOGGER.info("!!! {} {}".format(hostdef.key, job.state))
pass
if job != None and job.state == 3 and hostdef.oqueue.qsize() == 0:
assert job_start_time != 0
job.state = 4
#UTILS.LOGGER.info("FINISHING JOB: {}".format(job.SD.key))
fjob = deepcopy(job)
hostdef.oqueue.put(fjob)
job = None
job_start_time = 0
if job == None and hostdef.iqueue.qsize() != 0:
job = hostdef.iqueue.get()
job.state = 1
job_start_time = current_milli_time
#UTILS.LOGGER.info("START JOB: {}".format(job.SD.key))
if job != None and job.dir == 'D' and job.state == 1:
# in case of destination we write
job.state = 2
if job.SD.type == '%M':
rq = await client.write_coils(job.SD.start, job.buffer, unit=UNIT)
job.state = 3
if rq.function_code < 0x80:
job.Fault = False
else:
job.Fault = True
assert False
pass
elif job.SD.type == '%MW':
rq = await client.write_registers(job.SD.start, job.buffer, unit=UNIT)
job.state = 3
if rq.function_code < 0x80:
job.Fault = False
else:
job.Fault = True
assert False
pass
else:
assert False
elif job != None and job.dir == 'S' and job.state == 1:
# in case of source we read
job.state = 2
if job.SD.type == '%M':
rr = await client.read_coils(job.SD.start, job.SD.size, unit=UNIT)
job.state = 3
if rr.function_code < 0x80:
job.Fault = False
job.buffer = rr.bits
else:
job.Fault = True
job.buffer = None
assert False
pass
elif job.SD.type == '%MW':
rr = await client.read_holding_registers(job.SD.start, job.SD.size, unit=UNIT)
job.state = 3
if rr.function_code < 0x80:
job.Fault = False
job.buffer = rr.registers
else:
job.Fault = True
job.buffer = None
assert False
else:
assert False
await asyncio.sleep(0.01)