Вам необходимо добавить достаточно информации к каждому сообщению, чтобы получатель мог определить, как его интерпретировать. Ваши требования звучат очень похоже на AMP , поэтому вы можете использовать вместо этого AMP или использовать ту же структуру, что и AMP, для идентификации ваших сообщений. В частности:
- В запросах указывайте определенный ключ - например, AMP использует «_ask» для идентификации запросов. Это также дает им уникальное значение, которое дополнительно идентифицирует этот запрос на время жизни соединения.
- В ответах введите другой ключ - например, AMP использует для этого «_answer». Значение совпадает со значением из ключа «_ask» в запросе, для которого предназначен ответ.
Используя такой подход, вам просто нужно посмотреть, есть ли ключ «_ask» или «_answer», чтобы определить, получили ли вы новый запрос или ответ на предыдущий запрос.
По отдельной теме ваш класс asyncGatewayCalls
не должен основываться на потоках. У него нет очевидной причины использовать потоки, и при этом он также неправильно использует Twisted API, что приведет к неопределенному поведению. Большинство Twisted API можно использовать только в потоке, в котором вы назвали reactor.run
. Единственное исключение - reactor.callFromThread
, которое можно использовать для отправки сообщения в поток реактора из любого другого потока. asyncGatewayCalls
пытается выполнить запись в транспорт, однако это приведет к повреждению буфера, произвольным задержкам в отправляемых данных или, возможно, к худшим последствиям. Вместо этого вы можете написать asyncGatewayCalls
так:
from twisted.internet.task import LoopingCall
class asyncGatewayCalls(object):
def __init__(self, rpcfactory):
self.rpcfactory = rpcfactory
self.remoteMacList = [...]
def run():
self._call = LoopingCall(self._pokeMicro)
return self._call.start(10)
def _pokeMicro(self):
while True:
mac = self.remoteMacList[...]
if mac in self.rpcfactory.references:
proto = ...
dataToSend = ...
proto.transport.write(dataToSend)
break
factory = ...
r = asyncGatewayCalls(factory)
r.run()
reactor.listenTCP(7080, factory)
reactor.run()
Это дает вам однопоточное решение, которое должно вести себя так же, как вы предполагали для исходного класса asyncGatewayCalls
. Вместо того, чтобы спать в цикле в потоке, чтобы запланировать вызовы, он использует API планирования реактора (через класс LoopingCall более высокого уровня, который планирует вещи, которые будут вызываться повторно), чтобы гарантировать, что _pokeMicro
вызывается каждый раз десять секунд.