Вкл. возможность тайм-аута, если сервер недоступен"
Тайм-аут возможен, но это не позволит использовать аппаратно REQ/REP
два-шаговый танец, чтобы выжить, тем не менее, чтобы продолжить в надлежащем порядке, если одна сторона тайм-аута в противном случае обязательный шаг в d присваивается F inite S tate A автоматическая схема (dFSA не может использовать односторонние ярлыки, это двусторонний dFSA).
Гипотеза:
Еслисервер не работает, клиент сразу проходит через первый вызов socket.poll
(так как zmq
просто буферизует сообщение внутри). Он блокируется на 1 секунду при втором вызове socket.poll
и правильно пропускает блок recv_json
. Затем он зависает при вызове context.term()
.
Проверка:
Давайте рассмотрим код пошагово
def Test( SetImmediate = False ):
##################################################################################
import zmq, json, time; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
##################################################################################
ip = 'localhost'; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
port = 5555; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
addr = "tcp://{0}:{1}".format( ip, port ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
message = { 'value': 10 }; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
##################################################################################
context = zmq.Context(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; aReqSock = context.socket( zmq.REQ ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
pass; aReqSock.setsockopt( zmq.LINGER, 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
if SetImmediate:
aReqSock.setsockopt( zmq.IMMEDIATE, 1 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; aReqSock.connect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
pass; aReqSock.send_json( message, flags = zmq.NOBLOCK ) # .send()-s dispatches message the REP-side may .recv() at some later time
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
response = aReqSock.recv_json() # .recv() BLOCKS until ... if ever ...
print( response ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
##################################################################################
rc = aReqSock.disconnect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = aReqSock.close( linger = 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = context.term(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################
Это приводит к чему-то следующему:
>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER: 4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... == 2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~ None | Success
и
>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER: 4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~ None | Success
Что доказывает, что гипотеза не верна: с * 1051 нет проблемcontext.term()
, но со способом, как .connect( aTransportClass_Target )
против цели, которой нет, обрабатывается внутренне.
К моему удивлению, в тестируемой версии (v4.2.5).poll( zmq.POLLOUT )
отчеты содержат 2
элементов в направлении .POLLOUT
, уже присутствующих в сообщаемом пользователем состоянии TxQueue, без единого явного .send()
(поскольку .poll()
был запущен сразу после .connect()
).
Мне кажется, что это является некоторым несовместимостью с предыдущими версиями (как если бы он пытался сообщить о .connect()
-связанном "протоколе / идентичности""-телеметрия вместо того, чтобы сообщать только о сообщениях на уровне приложения пользователя).
В то время как я могу ошибаться, пытаясьвыясните какое-либо обоснование того, почему принципиально пустая очередь будет пытаться сообщить о сообщении, которое уже было в его .POLLOUT
-направлении, я надеюсь, что оно достаточно доказано, проблема не имеет ничего общего с .LINGER == 0
/ .term()
- экземпляр Context()
- экземпляр.
КЭД