У меня есть модуль python, который использует Telnetlib для открытия соединения с демоном, и может отправлять команды и получать ответы на анализ.Это используется для целей тестирования.Однако иногда демон также отправляет асинхронные сообщения.Мое текущее решение состоит в том, чтобы иметь метод "wait_for_message", который запустит поток для прослушивания сокета telnet.Тем временем в основном потоке я посылаю определенную команду, которая, как я знаю, заставит демона отправить конкретное асинхронное сообщение.Затем я просто делаю thread.join () и жду его завершения.
import telnetlib
class Client():
def __init__(self, host, port):
self.connection = telnetlib.Telnet(host, port)
def get_info(self):
self.connection.write('getstate\r')
idx, _, _ = self.connection.expect(['good','bad','ugly'], timeout=1)
return idx
def wait_for_unexpected_message(self):
idx, _, _ = self.connection.expect(['error'])
Затем, когда я пишу тесты, я могу использовать модуль как часть системы автоматизации.
client = Client()
client.connect('192.168.0.45', 6000)
if client.get_info() != 0:
# do stuff
else:
# do other stuff
Это работает очень хорошо, пока я не хочу обрабатывать поступающие асинхронные сообщения. Я читал о новой библиотеке Python asyncio, но я не совсем понял, как сделать то, что мне нужно, или даже если я используюдело будет использовано asyncio.
Так что мой вопрос: есть ли лучший способ справиться с этим лучше с помощью asyncio?Мне нравится использовать telnetlib из-за ожидаемых функций.Использование простого сокета TCP не делает этого для меня.